test_shell_hooks.py
1 """Tests for the shell-hooks subprocess bridge (agent.shell_hooks). 2 3 These tests focus on the pure translation layer — JSON serialisation, 4 JSON parsing, matcher behaviour, block-schema correctness, and the 5 subprocess runner's graceful error handling. Consent prompts are 6 covered in ``test_shell_hooks_consent.py``. 7 """ 8 9 from __future__ import annotations 10 11 import json 12 import os 13 import stat 14 from pathlib import Path 15 from typing import Any, Dict 16 17 import pytest 18 19 from agent import shell_hooks 20 21 22 # ── helpers ─────────────────────────────────────────────────────────────── 23 24 25 def _write_script(tmp_path: Path, name: str, body: str) -> Path: 26 path = tmp_path / name 27 path.write_text(body) 28 path.chmod(0o755) 29 return path 30 31 32 def _allowlist_pair(monkeypatch, tmp_path, event: str, command: str) -> None: 33 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_home")) 34 shell_hooks._record_approval(event, command) 35 36 37 @pytest.fixture(autouse=True) 38 def _reset_registration_state(): 39 shell_hooks.reset_for_tests() 40 yield 41 shell_hooks.reset_for_tests() 42 43 44 # ── _parse_response ─────────────────────────────────────────────────────── 45 46 47 class TestParseResponse: 48 def test_block_claude_code_style(self): 49 r = shell_hooks._parse_response( 50 "pre_tool_call", 51 '{"decision": "block", "reason": "nope"}', 52 ) 53 assert r == {"action": "block", "message": "nope"} 54 55 def test_block_canonical_style(self): 56 r = shell_hooks._parse_response( 57 "pre_tool_call", 58 '{"action": "block", "message": "nope"}', 59 ) 60 assert r == {"action": "block", "message": "nope"} 61 62 def test_block_canonical_wins_over_claude_style(self): 63 r = shell_hooks._parse_response( 64 "pre_tool_call", 65 '{"action": "block", "message": "canonical", ' 66 '"decision": "block", "reason": "claude"}', 67 ) 68 assert r == {"action": "block", "message": "canonical"} 69 70 def test_empty_stdout_returns_none(self): 71 assert shell_hooks._parse_response("pre_tool_call", "") is None 72 assert shell_hooks._parse_response("pre_tool_call", " ") is None 73 74 def test_invalid_json_returns_none(self): 75 assert shell_hooks._parse_response("pre_tool_call", "not json") is None 76 77 def test_non_dict_json_returns_none(self): 78 assert shell_hooks._parse_response("pre_tool_call", "[1, 2]") is None 79 80 def test_non_block_pre_tool_call_returns_none(self): 81 r = shell_hooks._parse_response("pre_tool_call", '{"decision": "allow"}') 82 assert r is None 83 84 def test_pre_llm_call_context_passthrough(self): 85 r = shell_hooks._parse_response( 86 "pre_llm_call", '{"context": "today is Friday"}', 87 ) 88 assert r == {"context": "today is Friday"} 89 90 def test_subagent_stop_context_passthrough(self): 91 r = shell_hooks._parse_response( 92 "subagent_stop", '{"context": "child role=leaf"}', 93 ) 94 assert r == {"context": "child role=leaf"} 95 96 def test_pre_llm_call_block_ignored(self): 97 """Only pre_tool_call honors block directives.""" 98 r = shell_hooks._parse_response( 99 "pre_llm_call", '{"decision": "block", "reason": "no"}', 100 ) 101 assert r is None 102 103 104 # ── _serialize_payload ──────────────────────────────────────────────────── 105 106 107 class TestSerializePayload: 108 def test_basic_pre_tool_call_schema(self): 109 raw = shell_hooks._serialize_payload( 110 "pre_tool_call", 111 { 112 "tool_name": "terminal", 113 "args": {"command": "ls"}, 114 "session_id": "sess-1", 115 "task_id": "t-1", 116 "tool_call_id": "c-1", 117 }, 118 ) 119 payload = json.loads(raw) 120 assert payload["hook_event_name"] == "pre_tool_call" 121 assert payload["tool_name"] == "terminal" 122 assert payload["tool_input"] == {"command": "ls"} 123 assert payload["session_id"] == "sess-1" 124 assert "cwd" in payload 125 # task_id / tool_call_id end up under extra 126 assert payload["extra"]["task_id"] == "t-1" 127 assert payload["extra"]["tool_call_id"] == "c-1" 128 129 def test_args_not_dict_becomes_null(self): 130 raw = shell_hooks._serialize_payload( 131 "pre_tool_call", {"args": ["not", "a", "dict"]}, 132 ) 133 payload = json.loads(raw) 134 assert payload["tool_input"] is None 135 136 def test_parent_session_id_used_when_no_session_id(self): 137 raw = shell_hooks._serialize_payload( 138 "subagent_stop", {"parent_session_id": "p-1"}, 139 ) 140 payload = json.loads(raw) 141 assert payload["session_id"] == "p-1" 142 143 def test_unserialisable_extras_stringified(self): 144 class Weird: 145 def __repr__(self) -> str: 146 return "<weird>" 147 148 raw = shell_hooks._serialize_payload( 149 "on_session_start", {"obj": Weird()}, 150 ) 151 payload = json.loads(raw) 152 assert payload["extra"]["obj"] == "<weird>" 153 154 155 # ── Matcher behaviour ───────────────────────────────────────────────────── 156 157 158 class TestMatcher: 159 def test_no_matcher_fires_for_any_tool(self): 160 spec = shell_hooks.ShellHookSpec( 161 event="pre_tool_call", command="echo", matcher=None, 162 ) 163 assert spec.matches_tool("terminal") 164 assert spec.matches_tool("write_file") 165 166 def test_single_name_matcher(self): 167 spec = shell_hooks.ShellHookSpec( 168 event="pre_tool_call", command="echo", matcher="terminal", 169 ) 170 assert spec.matches_tool("terminal") 171 assert not spec.matches_tool("web_search") 172 173 def test_alternation_matcher(self): 174 spec = shell_hooks.ShellHookSpec( 175 event="pre_tool_call", command="echo", matcher="terminal|file", 176 ) 177 assert spec.matches_tool("terminal") 178 assert spec.matches_tool("file") 179 assert not spec.matches_tool("web") 180 181 def test_invalid_regex_falls_back_to_literal(self): 182 spec = shell_hooks.ShellHookSpec( 183 event="pre_tool_call", command="echo", matcher="foo[bar", 184 ) 185 assert spec.matches_tool("foo[bar") 186 assert not spec.matches_tool("foo") 187 188 def test_matcher_ignored_when_no_tool_name(self): 189 spec = shell_hooks.ShellHookSpec( 190 event="pre_tool_call", command="echo", matcher="terminal", 191 ) 192 assert not spec.matches_tool(None) 193 194 def test_matcher_leading_whitespace_stripped(self): 195 """YAML quirks can introduce leading/trailing whitespace — must 196 not silently break the matcher.""" 197 spec = shell_hooks.ShellHookSpec( 198 event="pre_tool_call", command="echo", matcher=" terminal ", 199 ) 200 assert spec.matcher == "terminal" 201 assert spec.matches_tool("terminal") 202 203 def test_matcher_trailing_newline_stripped(self): 204 spec = shell_hooks.ShellHookSpec( 205 event="pre_tool_call", command="echo", matcher="terminal\n", 206 ) 207 assert spec.matches_tool("terminal") 208 209 def test_whitespace_only_matcher_becomes_none(self): 210 """A matcher that's pure whitespace is treated as 'no matcher'.""" 211 spec = shell_hooks.ShellHookSpec( 212 event="pre_tool_call", command="echo", matcher=" ", 213 ) 214 assert spec.matcher is None 215 assert spec.matches_tool("anything") 216 217 218 # ── End-to-end subprocess behaviour ─────────────────────────────────────── 219 220 221 class TestCallbackSubprocess: 222 def test_timeout_returns_none(self, tmp_path): 223 # Script that sleeps forever; we set a 1s timeout. 224 script = _write_script( 225 tmp_path, "slow.sh", 226 "#!/usr/bin/env bash\nsleep 60\n", 227 ) 228 spec = shell_hooks.ShellHookSpec( 229 event="post_tool_call", command=str(script), timeout=1, 230 ) 231 cb = shell_hooks._make_callback(spec) 232 assert cb(tool_name="terminal") is None 233 234 def test_malformed_json_stdout_returns_none(self, tmp_path): 235 script = _write_script( 236 tmp_path, "bad_json.sh", 237 "#!/usr/bin/env bash\necho 'not json at all'\n", 238 ) 239 spec = shell_hooks.ShellHookSpec( 240 event="pre_tool_call", command=str(script), 241 ) 242 cb = shell_hooks._make_callback(spec) 243 # Matcher is None so the callback fires for any tool. 244 assert cb(tool_name="terminal") is None 245 246 def test_non_zero_exit_with_block_stdout_still_blocks(self, tmp_path): 247 """A script that signals failure via exit code AND prints a block 248 directive must still block — scripts should be free to mix exit 249 codes with parseable output.""" 250 script = _write_script( 251 tmp_path, "exit1_block.sh", 252 "#!/usr/bin/env bash\n" 253 'printf \'{"decision": "block", "reason": "via exit 1"}\\n\'\n' 254 "exit 1\n", 255 ) 256 spec = shell_hooks.ShellHookSpec( 257 event="pre_tool_call", command=str(script), 258 ) 259 cb = shell_hooks._make_callback(spec) 260 assert cb(tool_name="terminal") == {"action": "block", "message": "via exit 1"} 261 262 def test_block_translation_end_to_end(self, tmp_path): 263 """v1 schema-bug regression gate. 264 265 Shell hook returns the Claude-Code-style payload and the bridge 266 must translate it to the canonical Hermes block shape so that 267 get_pre_tool_call_block_message() surfaces the block. 268 """ 269 script = _write_script( 270 tmp_path, "blocker.sh", 271 "#!/usr/bin/env bash\n" 272 'printf \'{"decision": "block", "reason": "no terminal"}\\n\'\n', 273 ) 274 spec = shell_hooks.ShellHookSpec( 275 event="pre_tool_call", 276 command=str(script), 277 matcher="terminal", 278 ) 279 cb = shell_hooks._make_callback(spec) 280 result = cb(tool_name="terminal", args={"command": "rm -rf /"}) 281 assert result == {"action": "block", "message": "no terminal"} 282 283 def test_block_aggregation_through_plugin_manager(self, tmp_path, monkeypatch): 284 """Registering via register_from_config makes 285 get_pre_tool_call_block_message surface the block — the real 286 end-to-end control flow used by run_agent._invoke_tool.""" 287 from hermes_cli import plugins 288 289 script = _write_script( 290 tmp_path, "block.sh", 291 "#!/usr/bin/env bash\n" 292 'printf \'{"decision": "block", "reason": "blocked-by-shell"}\\n\'\n', 293 ) 294 295 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) 296 monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1") 297 298 # Fresh manager 299 plugins._plugin_manager = plugins.PluginManager() 300 301 cfg = { 302 "hooks": { 303 "pre_tool_call": [ 304 {"matcher": "terminal", "command": str(script)}, 305 ], 306 }, 307 } 308 registered = shell_hooks.register_from_config(cfg, accept_hooks=True) 309 assert len(registered) == 1 310 311 msg = plugins.get_pre_tool_call_block_message( 312 tool_name="terminal", 313 args={"command": "rm"}, 314 ) 315 assert msg == "blocked-by-shell" 316 317 def test_matcher_regex_filters_callback(self, tmp_path, monkeypatch): 318 """A matcher set to 'terminal' must not fire for 'web_search'.""" 319 calls = tmp_path / "calls.log" 320 script = _write_script( 321 tmp_path, "log.sh", 322 f"#!/usr/bin/env bash\n" 323 f"echo \"$(cat -)\" >> {calls}\n" 324 f"printf '{{}}\\n'\n", 325 ) 326 spec = shell_hooks.ShellHookSpec( 327 event="pre_tool_call", 328 command=str(script), 329 matcher="terminal", 330 ) 331 cb = shell_hooks._make_callback(spec) 332 cb(tool_name="terminal", args={"command": "ls"}) 333 cb(tool_name="web_search", args={"q": "x"}) 334 cb(tool_name="file_read", args={"path": "x"}) 335 assert calls.exists() 336 # Only the terminal call wrote to the log 337 assert calls.read_text().count("pre_tool_call") == 1 338 339 def test_payload_schema_delivered(self, tmp_path): 340 capture = tmp_path / "payload.json" 341 script = _write_script( 342 tmp_path, "capture.sh", 343 f"#!/usr/bin/env bash\ncat - > {capture}\nprintf '{{}}\\n'\n", 344 ) 345 spec = shell_hooks.ShellHookSpec( 346 event="pre_tool_call", command=str(script), 347 ) 348 cb = shell_hooks._make_callback(spec) 349 cb( 350 tool_name="terminal", 351 args={"command": "echo hi"}, 352 session_id="sess-77", 353 task_id="task-77", 354 ) 355 payload = json.loads(capture.read_text()) 356 assert payload["hook_event_name"] == "pre_tool_call" 357 assert payload["tool_name"] == "terminal" 358 assert payload["tool_input"] == {"command": "echo hi"} 359 assert payload["session_id"] == "sess-77" 360 assert "cwd" in payload 361 assert payload["extra"]["task_id"] == "task-77" 362 363 def test_pre_llm_call_context_flows_through(self, tmp_path): 364 script = _write_script( 365 tmp_path, "ctx.sh", 366 "#!/usr/bin/env bash\n" 367 'printf \'{"context": "env-note"}\\n\'\n', 368 ) 369 spec = shell_hooks.ShellHookSpec( 370 event="pre_llm_call", command=str(script), 371 ) 372 cb = shell_hooks._make_callback(spec) 373 result = cb( 374 session_id="s1", user_message="hello", 375 conversation_history=[], is_first_turn=True, 376 model="gpt-4", platform="cli", 377 ) 378 assert result == {"context": "env-note"} 379 380 def test_shlex_handles_paths_with_spaces(self, tmp_path): 381 dir_with_space = tmp_path / "path with space" 382 dir_with_space.mkdir() 383 script = _write_script( 384 dir_with_space, "ok.sh", 385 "#!/usr/bin/env bash\nprintf '{}\\n'\n", 386 ) 387 # Quote the path so shlex keeps it as a single token. 388 spec = shell_hooks.ShellHookSpec( 389 event="post_tool_call", 390 command=f'"{script}"', 391 ) 392 cb = shell_hooks._make_callback(spec) 393 # No crash = shlex parsed it correctly. 394 assert cb(tool_name="terminal") is None # empty object parses to None 395 396 def test_missing_binary_logged_not_raised(self, tmp_path): 397 spec = shell_hooks.ShellHookSpec( 398 event="on_session_start", 399 command=str(tmp_path / "does-not-exist"), 400 ) 401 cb = shell_hooks._make_callback(spec) 402 # Must not raise — agent loop should continue. 403 assert cb(session_id="s") is None 404 405 def test_non_executable_binary_logged_not_raised(self, tmp_path): 406 path = tmp_path / "no-exec" 407 path.write_text("#!/usr/bin/env bash\necho hi\n") 408 # Intentionally do NOT chmod +x. 409 spec = shell_hooks.ShellHookSpec( 410 event="on_session_start", command=str(path), 411 ) 412 cb = shell_hooks._make_callback(spec) 413 assert cb(session_id="s") is None 414 415 416 # ── config parsing ──────────────────────────────────────────────────────── 417 418 419 class TestParseHooksBlock: 420 def test_valid_entry(self): 421 specs = shell_hooks._parse_hooks_block({ 422 "pre_tool_call": [ 423 {"matcher": "terminal", "command": "/tmp/hook.sh", "timeout": 30}, 424 ], 425 }) 426 assert len(specs) == 1 427 assert specs[0].event == "pre_tool_call" 428 assert specs[0].matcher == "terminal" 429 assert specs[0].command == "/tmp/hook.sh" 430 assert specs[0].timeout == 30 431 432 def test_unknown_event_skipped(self, caplog): 433 specs = shell_hooks._parse_hooks_block({ 434 "pre_tools_call": [ # typo 435 {"command": "/tmp/hook.sh"}, 436 ], 437 }) 438 assert specs == [] 439 440 def test_missing_command_skipped(self): 441 specs = shell_hooks._parse_hooks_block({ 442 "pre_tool_call": [{"matcher": "terminal"}], 443 }) 444 assert specs == [] 445 446 def test_timeout_clamped_to_max(self): 447 specs = shell_hooks._parse_hooks_block({ 448 "post_tool_call": [ 449 {"command": "/tmp/slow.sh", "timeout": 9999}, 450 ], 451 }) 452 assert specs[0].timeout == shell_hooks.MAX_TIMEOUT_SECONDS 453 454 def test_non_int_timeout_defaulted(self): 455 specs = shell_hooks._parse_hooks_block({ 456 "post_tool_call": [ 457 {"command": "/tmp/x.sh", "timeout": "thirty"}, 458 ], 459 }) 460 assert specs[0].timeout == shell_hooks.DEFAULT_TIMEOUT_SECONDS 461 462 def test_non_list_event_skipped(self): 463 specs = shell_hooks._parse_hooks_block({ 464 "pre_tool_call": "not a list", 465 }) 466 assert specs == [] 467 468 def test_none_hooks_block(self): 469 assert shell_hooks._parse_hooks_block(None) == [] 470 assert shell_hooks._parse_hooks_block("string") == [] 471 assert shell_hooks._parse_hooks_block([]) == [] 472 473 def test_non_tool_event_matcher_warns_and_drops(self, caplog): 474 """matcher: is only honored for pre/post_tool_call; must warn 475 and drop on other events so the spec reflects runtime.""" 476 import logging 477 cfg = {"pre_llm_call": [{"matcher": "terminal", "command": "/bin/echo"}]} 478 with caplog.at_level(logging.WARNING, logger=shell_hooks.logger.name): 479 specs = shell_hooks._parse_hooks_block(cfg) 480 assert len(specs) == 1 and specs[0].matcher is None 481 assert any( 482 "only honored for pre_tool_call" in r.getMessage() 483 and "pre_llm_call" in r.getMessage() 484 for r in caplog.records 485 ) 486 487 488 # ── Idempotent registration ─────────────────────────────────────────────── 489 490 491 class TestIdempotentRegistration: 492 def test_double_call_registers_once(self, tmp_path, monkeypatch): 493 from hermes_cli import plugins 494 495 script = _write_script(tmp_path, "h.sh", 496 "#!/usr/bin/env bash\nprintf '{}\\n'\n") 497 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) 498 monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1") 499 500 plugins._plugin_manager = plugins.PluginManager() 501 502 cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} 503 504 first = shell_hooks.register_from_config(cfg, accept_hooks=True) 505 second = shell_hooks.register_from_config(cfg, accept_hooks=True) 506 assert len(first) == 1 507 assert second == [] 508 # Only one callback on the manager 509 mgr = plugins.get_plugin_manager() 510 assert len(mgr._hooks.get("on_session_start", [])) == 1 511 512 def test_same_command_different_matcher_registers_both( 513 self, tmp_path, monkeypatch, 514 ): 515 """Same script used for different matchers under one event must 516 register both callbacks — dedupe keys on (event, matcher, command).""" 517 from hermes_cli import plugins 518 519 script = _write_script(tmp_path, "h.sh", 520 "#!/usr/bin/env bash\nprintf '{}\\n'\n") 521 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) 522 monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1") 523 524 plugins._plugin_manager = plugins.PluginManager() 525 526 cfg = { 527 "hooks": { 528 "pre_tool_call": [ 529 {"matcher": "terminal", "command": str(script)}, 530 {"matcher": "web_search", "command": str(script)}, 531 ], 532 }, 533 } 534 535 registered = shell_hooks.register_from_config(cfg, accept_hooks=True) 536 assert len(registered) == 2 537 mgr = plugins.get_plugin_manager() 538 assert len(mgr._hooks.get("pre_tool_call", [])) == 2 539 540 541 # ── Allowlist concurrency ───────────────────────────────────────────────── 542 543 544 class TestAllowlistConcurrency: 545 """Regression tests for the Codex#1 finding: simultaneous 546 _record_approval() calls used to collide on a fixed tmp path and 547 silently lose entries under read-modify-write races.""" 548 549 def test_parallel_record_approval_does_not_lose_entries( 550 self, tmp_path, monkeypatch, 551 ): 552 import threading 553 554 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) 555 556 N = 32 557 barrier = threading.Barrier(N) 558 errors: list = [] 559 560 def worker(i: int) -> None: 561 try: 562 barrier.wait(timeout=5) 563 shell_hooks._record_approval( 564 "on_session_start", f"/bin/hook-{i}.sh", 565 ) 566 except Exception as exc: # pragma: no cover 567 errors.append(exc) 568 569 threads = [threading.Thread(target=worker, args=(i,)) for i in range(N)] 570 for t in threads: 571 t.start() 572 for t in threads: 573 t.join() 574 575 assert not errors, f"worker errors: {errors}" 576 577 data = shell_hooks.load_allowlist() 578 commands = {e["command"] for e in data["approvals"]} 579 assert commands == {f"/bin/hook-{i}.sh" for i in range(N)}, ( 580 f"expected all {N} entries, got {len(commands)}" 581 ) 582 583 def test_non_posix_fallback_does_not_self_deadlock( 584 self, tmp_path, monkeypatch, 585 ): 586 """Regression: on platforms without fcntl, the fallback lock must 587 be separate from _registered_lock. register_from_config holds 588 _registered_lock while calling _record_approval (via the consent 589 prompt path), so a shared non-reentrant lock would self-deadlock.""" 590 import threading 591 592 monkeypatch.setattr(shell_hooks, "fcntl", None) 593 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) 594 595 completed = threading.Event() 596 errors: list = [] 597 598 def target() -> None: 599 try: 600 with shell_hooks._registered_lock: 601 shell_hooks._record_approval( 602 "on_session_start", "/bin/x.sh", 603 ) 604 completed.set() 605 except Exception as exc: # pragma: no cover 606 errors.append(exc) 607 completed.set() 608 609 t = threading.Thread(target=target, daemon=True) 610 t.start() 611 if not completed.wait(timeout=3.0): 612 pytest.fail( 613 "non-POSIX fallback self-deadlocked — " 614 "_locked_update_approvals must not reuse _registered_lock", 615 ) 616 t.join(timeout=1.0) 617 assert not errors, f"errors: {errors}" 618 assert shell_hooks._is_allowlisted( 619 "on_session_start", "/bin/x.sh", 620 ) 621 622 def test_save_allowlist_failure_logs_actionable_warning( 623 self, tmp_path, monkeypatch, caplog, 624 ): 625 """Persistence failures must log the path, errno, and 626 re-prompt consequence so "hermes keeps asking" is debuggable.""" 627 import logging 628 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) 629 monkeypatch.setattr( 630 shell_hooks.tempfile, "mkstemp", 631 lambda *a, **kw: (_ for _ in ()).throw(OSError(28, "No space")), 632 ) 633 with caplog.at_level(logging.WARNING, logger=shell_hooks.logger.name): 634 shell_hooks.save_allowlist({"approvals": []}) 635 msg = next( 636 (r.getMessage() for r in caplog.records 637 if "Failed to persist" in r.getMessage()), "", 638 ) 639 assert "shell-hooks-allowlist.json" in msg 640 assert "No space" in msg 641 assert "re-prompt" in msg 642 643 def test_script_is_executable_handles_interpreter_prefix(self, tmp_path): 644 """For ``python3 hook.py`` and similar the interpreter reads 645 the script, so X_OK on the script itself is not required — 646 only R_OK. Bare invocations still require X_OK.""" 647 script = tmp_path / "hook.py" 648 script.write_text("print()\n") # readable, NOT executable 649 650 # Interpreter prefix: R_OK is enough. 651 assert shell_hooks.script_is_executable(f"python3 {script}") 652 assert shell_hooks.script_is_executable(f"/usr/bin/env python3 {script}") 653 654 # Bare invocation on the same non-X_OK file: not runnable. 655 assert not shell_hooks.script_is_executable(str(script)) 656 657 # Flip +x; bare invocation is now runnable too. 658 script.chmod(0o755) 659 assert shell_hooks.script_is_executable(str(script)) 660 661 def test_command_script_path_resolution(self): 662 """Regression: ``_command_script_path`` used to return the first 663 shlex token, which picked the interpreter (``python3``, ``bash``, 664 ``/usr/bin/env``) instead of the actual script for any 665 interpreter-prefixed command. That broke 666 ``hermes hooks doctor``'s executability check and silently 667 disabled mtime drift detection for such hooks.""" 668 cases = [ 669 # bare path 670 ("/path/hook.sh", "/path/hook.sh"), 671 ("/bin/echo hi", "/bin/echo"), 672 ("~/hook.sh", "~/hook.sh"), 673 ("hook.sh", "hook.sh"), 674 # interpreter prefix 675 ("python3 /path/hook.py", "/path/hook.py"), 676 ("bash /path/hook.sh", "/path/hook.sh"), 677 ("bash ~/hook.sh", "~/hook.sh"), 678 ("python3 -u /path/hook.py", "/path/hook.py"), 679 ("nice -n 10 /path/hook.sh", "/path/hook.sh"), 680 # /usr/bin/env shebang form — must find the *script*, not env 681 ("/usr/bin/env python3 /path/hook.py", "/path/hook.py"), 682 ("/usr/bin/env bash /path/hook.sh", "/path/hook.sh"), 683 # no path-like tokens → fallback to first token 684 ("my-binary --verbose", "my-binary"), 685 ("python3 -c 'print(1)'", "python3"), 686 # unparseable (unbalanced quotes) → return command as-is 687 ("python3 'unterminated", "python3 'unterminated"), 688 # empty 689 ("", ""), 690 ] 691 for command, expected in cases: 692 got = shell_hooks._command_script_path(command) 693 assert got == expected, f"{command!r} -> {got!r}, expected {expected!r}" 694 695 def test_save_allowlist_uses_unique_tmp_paths(self, tmp_path, monkeypatch): 696 """Two save_allowlist calls in flight must use distinct tmp files 697 so the loser's os.replace does not ENOENT on the winner's sweep.""" 698 monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) 699 p = shell_hooks.allowlist_path() 700 p.parent.mkdir(parents=True, exist_ok=True) 701 702 tmp_paths_seen: list = [] 703 real_mkstemp = shell_hooks.tempfile.mkstemp 704 705 def spying_mkstemp(*args, **kwargs): 706 fd, path = real_mkstemp(*args, **kwargs) 707 tmp_paths_seen.append(path) 708 return fd, path 709 710 monkeypatch.setattr(shell_hooks.tempfile, "mkstemp", spying_mkstemp) 711 712 shell_hooks.save_allowlist({"approvals": [{"event": "a", "command": "x"}]}) 713 shell_hooks.save_allowlist({"approvals": [{"event": "b", "command": "y"}]}) 714 715 assert len(tmp_paths_seen) == 2 716 assert tmp_paths_seen[0] != tmp_paths_seen[1]