/ tools / approval.py
approval.py
   1  """Dangerous command approval -- detection, prompting, and per-session state.
   2  
   3  This module is the single source of truth for the dangerous command system:
   4  - Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command)
   5  - Per-session approval state (thread-safe, keyed by session_key)
   6  - Approval prompting (CLI interactive + gateway async)
   7  - Smart approval via auxiliary LLM (auto-approve low-risk commands)
   8  - Permanent allowlist persistence (config.yaml)
   9  """
  10  
  11  import contextvars
  12  import logging
  13  import os
  14  import re
  15  import sys
  16  import threading
  17  import time
  18  import unicodedata
  19  from typing import Optional
  20  from hermes_cli.config import cfg_get
  21  
  22  from utils import is_truthy_value
  23  
  24  logger = logging.getLogger(__name__)
  25  
  26  # Per-thread/per-task gateway session identity.
  27  # Gateway runs agent turns concurrently in executor threads, so reading a
  28  # process-global env var for session identity is racy. Keep env fallback for
  29  # legacy single-threaded callers, but prefer the context-local value when set.
  30  _approval_session_key: contextvars.ContextVar[str] = contextvars.ContextVar(
  31      "approval_session_key",
  32      default="",
  33  )
  34  
  35  
  36  def _fire_approval_hook(hook_name: str, **kwargs) -> None:
  37      """Invoke a plugin lifecycle hook for the approval system.
  38  
  39      Lazy-imports the plugin manager to avoid circular imports (approval.py is
  40      imported very early, long before plugins are discovered). Never raises --
  41      plugin errors are logged and swallowed.
  42  
  43      Only fires for the two approval-specific hooks in VALID_HOOKS:
  44      pre_approval_request, post_approval_response.
  45      """
  46      try:
  47          from hermes_cli.plugins import invoke_hook
  48      except Exception:
  49          # Plugin system not available in this execution context
  50          # (e.g. bare tool-only imports, minimal test environments).
  51          return
  52      try:
  53          invoke_hook(hook_name, **kwargs)
  54      except Exception as exc:
  55          # invoke_hook() already swallows per-callback errors, so reaching here
  56          # means the dispatch layer itself failed. Log and move on -- approval
  57          # flow is safety-critical, plugin observability is not.
  58          logger.debug("Approval hook %s dispatch failed: %s", hook_name, exc)
  59  
  60  
  61  
  62  def set_current_session_key(session_key: str) -> contextvars.Token[str]:
  63      """Bind the active approval session key to the current context."""
  64      return _approval_session_key.set(session_key or "")
  65  
  66  
  67  def reset_current_session_key(token: contextvars.Token[str]) -> None:
  68      """Restore the prior approval session key context."""
  69      _approval_session_key.reset(token)
  70  
  71  
  72  def get_current_session_key(default: str = "default") -> str:
  73      """Return the active session key, preferring context-local state.
  74  
  75      Resolution order:
  76      1. approval-specific contextvars (set by gateway before agent.run)
  77      2. session_context contextvars (set by _set_session_env)
  78      3. os.environ fallback (CLI, cron, tests)
  79      """
  80      session_key = _approval_session_key.get()
  81      if session_key:
  82          return session_key
  83      from gateway.session_context import get_session_env
  84      return get_session_env("HERMES_SESSION_KEY", default)
  85  
  86  # Sensitive write targets that should trigger approval even when referenced
  87  # via shell expansions like $HOME or $HERMES_HOME.
  88  _SSH_SENSITIVE_PATH = r'(?:~|\$home|\$\{home\})/\.ssh(?:/|$)'
  89  _HERMES_ENV_PATH = (
  90      r'(?:~\/\.hermes/|'
  91      r'(?:\$home|\$\{home\})/\.hermes/|'
  92      r'(?:\$hermes_home|\$\{hermes_home\})/)'
  93      r'\.env\b'
  94  )
  95  _PROJECT_ENV_PATH = r'(?:(?:/|\.{1,2}/)?(?:[^\s/"\'`]+/)*\.env(?:\.[^/\s"\'`]+)*)'
  96  _PROJECT_CONFIG_PATH = r'(?:(?:/|\.{1,2}/)?(?:[^\s/"\'`]+/)*config\.yaml)'
  97  _SHELL_RC_FILES = (
  98      r'(?:~|\$home|\$\{home\})/\.'
  99      r'(?:bashrc|zshrc|profile|bash_profile|zprofile)\b'
 100  )
 101  _CREDENTIAL_FILES = (
 102      r'(?:~|\$home|\$\{home\})/\.'
 103      r'(?:netrc|pgpass|npmrc|pypirc)\b'
 104  )
 105  _SENSITIVE_WRITE_TARGET = (
 106      r'(?:/etc/|/dev/sd|'
 107      rf'{_SSH_SENSITIVE_PATH}|'
 108      rf'{_HERMES_ENV_PATH}|'
 109      rf'{_SHELL_RC_FILES}|'
 110      rf'{_CREDENTIAL_FILES})'
 111  )
 112  _PROJECT_SENSITIVE_WRITE_TARGET = rf'(?:{_PROJECT_ENV_PATH}|{_PROJECT_CONFIG_PATH})'
 113  _COMMAND_TAIL = r'(?:\s*(?:&&|\|\||;).*)?$'
 114  
 115  # =========================================================================
 116  # Hardline (unconditional) blocklist
 117  # =========================================================================
 118  #
 119  # Commands so catastrophic they should NEVER run via the agent, regardless
 120  # of --yolo, /yolo, approvals.mode=off, or cron approve mode.  This is a
 121  # floor below yolo: opting into yolo is the user trusting the agent with
 122  # their files and services, not trusting it to wipe the disk or power the
 123  # box off.
 124  #
 125  # Hardline only applies to environments that can actually damage the host
 126  # (local, ssh, container-host cron).  Containerized backends (docker,
 127  # singularity, modal, daytona) already bypass the dangerous-command layer
 128  # because nothing they do can touch the host, so we leave that behavior
 129  # alone.
 130  #
 131  # The list is deliberately tiny — only things with no recovery path:
 132  # filesystem destruction rooted at /, raw block device overwrites, kernel
 133  # shutdown/reboot, and denial-of-service commands that take the host down.
 134  # Recoverable-but-costly operations (git reset --hard, rm -rf /tmp/x,
 135  # chmod -R 777, curl|sh) stay in DANGEROUS_PATTERNS where yolo can pass
 136  # them through — that's what yolo is for.
 137  #
 138  # Inspired by Mercury Agent's permission-hardened blocklist
 139  # (https://github.com/cosmicstack-labs/mercury-agent).
 140  
 141  # Regex fragment matching the *start* of a command (i.e. positions where
 142  # a shell would begin parsing a new command).  Used by shutdown/reboot
 143  # patterns so they don't fire on "echo reboot" or "grep 'shutdown' log".
 144  # Matches: start of string, after command separators (; && || | newline),
 145  # after subshell openers ( `$(` or backtick ), optionally consuming
 146  # leading wrapper commands (sudo, env VAR=VAL, exec, nohup, setsid).
 147  _CMDPOS = (
 148      r'(?:^|[;&|\n`]|\$\()'         # start position
 149      r'\s*'                          # optional whitespace
 150      r'(?:sudo\s+(?:-[^\s]+\s+)*)?'  # optional sudo with flags
 151      r'(?:env\s+(?:\w+=\S*\s+)*)?'   # optional env with VAR=VAL pairs
 152      r'(?:(?:exec|nohup|setsid|time)\s+)*'  # optional wrapper commands
 153      r'\s*'
 154  )
 155  
 156  HARDLINE_PATTERNS = [
 157      # rm recursive targeting the root filesystem or protected roots
 158      (r'\brm\s+(-[^\s]*\s+)*(/|/\*|/ \*)(\s|$)', "recursive delete of root filesystem"),
 159      (r'\brm\s+(-[^\s]*\s+)*(/home|/home/\*|/root|/root/\*|/etc|/etc/\*|/usr|/usr/\*|/var|/var/\*|/bin|/bin/\*|/sbin|/sbin/\*|/boot|/boot/\*|/lib|/lib/\*)(\s|$)', "recursive delete of system directory"),
 160      (r'\brm\s+(-[^\s]*\s+)*(~|\$HOME)(/?|/\*)?(\s|$)', "recursive delete of home directory"),
 161      # Filesystem format
 162      (r'\bmkfs(\.[a-z0-9]+)?\b', "format filesystem (mkfs)"),
 163      # Raw block device overwrites (dd + redirection)
 164      (r'\bdd\b[^\n]*\bof=/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*', "dd to raw block device"),
 165      (r'>\s*/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*\b', "redirect to raw block device"),
 166      # Fork bomb (classic shell form)
 167      (r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
 168      # Kill every process on the system
 169      (r'\bkill\s+(-[^\s]+\s+)*-1\b', "kill all processes"),
 170      # System shutdown / reboot — anchor to command position (start of line,
 171      # after a command separator, or after sudo/env wrappers) so we don't
 172      # false-positive on "echo reboot" or "grep 'shutdown' logs".
 173      # _CMDPOS matches start-of-command positions.
 174      (_CMDPOS + r'(shutdown|reboot|halt|poweroff)\b', "system shutdown/reboot"),
 175      (_CMDPOS + r'init\s+[06]\b', "init 0/6 (shutdown/reboot)"),
 176      (_CMDPOS + r'systemctl\s+(poweroff|reboot|halt|kexec)\b', "systemctl poweroff/reboot"),
 177      (_CMDPOS + r'telinit\s+[06]\b', "telinit 0/6 (shutdown/reboot)"),
 178  ]
 179  
 180  # Pre-compiled variant used by the hot-path matcher. Building these at module
 181  # load eliminates the ~2.6 ms cold-cache re.compile fan-out on the first
 182  # terminal() call per process (12 HARDLINE + 47 DANGEROUS patterns, each
 183  # potentially evicted from Python's 512-entry ``re._cache`` by unrelated
 184  # regex work elsewhere in the agent). DANGEROUS_PATTERNS_COMPILED is built
 185  # at the end of this module after DANGEROUS_PATTERNS is defined.
 186  _RE_FLAGS = re.IGNORECASE | re.DOTALL
 187  HARDLINE_PATTERNS_COMPILED = [
 188      (re.compile(pattern, _RE_FLAGS), description)
 189      for pattern, description in HARDLINE_PATTERNS
 190  ]
 191  
 192  
 193  def detect_hardline_command(command: str) -> tuple:
 194      """Check if a command matches the unconditional hardline blocklist.
 195  
 196      Returns:
 197          (is_hardline, description) or (False, None)
 198      """
 199      normalized = _normalize_command_for_detection(command).lower()
 200      for pattern_re, description in HARDLINE_PATTERNS_COMPILED:
 201          if pattern_re.search(normalized):
 202              return (True, description)
 203      return (False, None)
 204  
 205  
 206  def _hardline_block_result(description: str) -> dict:
 207      """Build the standard block result for a hardline match."""
 208      return {
 209          "approved": False,
 210          "hardline": True,
 211          "message": (
 212              f"BLOCKED (hardline): {description}. "
 213              "This command is on the unconditional blocklist and cannot "
 214              "be executed via the agent — not even with --yolo, /yolo, "
 215              "approvals.mode=off, or cron approve mode. If you genuinely "
 216              "need to run it, run it yourself in a terminal outside the "
 217              "agent."
 218          ),
 219      }
 220  
 221  
 222  # =========================================================================
 223  # Dangerous command patterns
 224  # =========================================================================
 225  
 226  DANGEROUS_PATTERNS = [
 227      (r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
 228      (r'\brm\s+-[^\s]*r', "recursive delete"),
 229      (r'\brm\s+--recursive\b', "recursive delete (long flag)"),
 230      (r'\bchmod\s+(-[^\s]*\s+)*(777|666|o\+[rwx]*w|a\+[rwx]*w)\b', "world/other-writable permissions"),
 231      (r'\bchmod\s+--recursive\b.*(777|666|o\+[rwx]*w|a\+[rwx]*w)', "recursive world/other-writable (long flag)"),
 232      (r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
 233      (r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"),
 234      (r'\bmkfs\b', "format filesystem"),
 235      (r'\bdd\s+.*if=', "disk copy"),
 236      (r'>\s*/dev/sd', "write to block device"),
 237      (r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
 238      (r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
 239      (r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
 240      (r'>\s*/etc/', "overwrite system config"),
 241      (r'\bsystemctl\s+(-[^\s]+\s+)*(stop|restart|disable|mask)\b', "stop/restart system service"),
 242      (r'\bkill\s+-9\s+-1\b', "kill all processes"),
 243      (r'\bpkill\s+-9\b', "force kill processes"),
 244      (r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
 245      # Any shell invocation via -c or combined flags like -lc, -ic, etc.
 246      (r'\b(bash|sh|zsh|ksh)\s+-[^\s]*c(\s+|$)', "shell command via -c/-lc flag"),
 247      (r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"),
 248      (r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"),
 249      (r'\b(bash|sh|zsh|ksh)\s+<\s*<?\s*\(\s*(curl|wget)\b', "execute remote script via process substitution"),
 250      (rf'\btee\b.*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via tee"),
 251      (rf'>>?\s*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via redirection"),
 252      (rf'\btee\b.*["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config via tee"),
 253      (rf'>>?\s*["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config via redirection"),
 254      (r'\bxargs\s+.*\brm\b', "xargs with rm"),
 255      (r'\bfind\b.*-exec\s+(/\S*/)?rm\b', "find -exec rm"),
 256      (r'\bfind\b.*-delete\b', "find -delete"),
 257      # Gateway lifecycle protection: prevent the agent from killing its own
 258      # gateway process.  These commands trigger a gateway restart/stop that
 259      # terminates all running agents mid-work.
 260      (r'\bhermes\s+gateway\s+(stop|restart)\b', "stop/restart hermes gateway (kills running agents)"),
 261      (r'\bhermes\s+update\b', "hermes update (restarts gateway, kills running agents)"),
 262      # Gateway protection: never start gateway outside systemd management
 263      (r'gateway\s+run\b.*(&\s*$|&\s*;|\bdisown\b|\bsetsid\b)', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
 264      (r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
 265      # Self-termination protection: prevent agent from killing its own process
 266      (r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"),
 267      # Self-termination via kill + command substitution (pgrep/pidof).
 268      # The name-based pattern above catches `pkill hermes` but not
 269      # `kill -9 $(pgrep -f hermes)` because the substitution is opaque
 270      # to regex at detection time. Catch the structural pattern instead.
 271      (r'\bkill\b.*\$\(\s*pgrep\b', "kill process via pgrep expansion (self-termination)"),
 272      (r'\bkill\b.*`\s*pgrep\b', "kill process via backtick pgrep expansion (self-termination)"),
 273      # File copy/move/edit into sensitive system paths
 274      (r'\b(cp|mv|install)\b.*\s/etc/', "copy/move file into /etc/"),
 275      (rf'\b(cp|mv|install)\b.*\s["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config file"),
 276      (r'\bsed\s+-[^\s]*i.*\s/etc/', "in-place edit of system config"),
 277      (r'\bsed\s+--in-place\b.*\s/etc/', "in-place edit of system config (long flag)"),
 278      # Script execution via heredoc — bypasses the -e/-c flag patterns above.
 279      # `python3 << 'EOF'` feeds arbitrary code via stdin without -c/-e flags.
 280      (r'\b(python[23]?|perl|ruby|node)\s+<<', "script execution via heredoc"),
 281      # Git destructive operations that can lose uncommitted work or rewrite
 282      # shared history. Not captured by rm/chmod/etc patterns.
 283      (r'\bgit\s+reset\s+--hard\b', "git reset --hard (destroys uncommitted changes)"),
 284      (r'\bgit\s+push\b.*--force\b', "git force push (rewrites remote history)"),
 285      (r'\bgit\s+push\b.*-f\b', "git force push short flag (rewrites remote history)"),
 286      (r'\bgit\s+clean\s+-[^\s]*f', "git clean with force (deletes untracked files)"),
 287      (r'\bgit\s+branch\s+-D\b', "git branch force delete"),
 288      # Script execution after chmod +x — catches the two-step pattern where
 289      # a script is first made executable then immediately run. The script
 290      # content may contain dangerous commands that individual patterns miss.
 291      (r'\bchmod\s+\+x\b.*[;&|]+\s*\./', "chmod +x followed by immediate execution"),
 292  ]
 293  
 294  
 295  # Pre-compiled variant (same rationale as HARDLINE_PATTERNS_COMPILED above).
 296  DANGEROUS_PATTERNS_COMPILED = [
 297      (re.compile(pattern, _RE_FLAGS), description)
 298      for pattern, description in DANGEROUS_PATTERNS
 299  ]
 300  
 301  
 302  def _legacy_pattern_key(pattern: str) -> str:
 303      """Reproduce the old regex-derived approval key for backwards compatibility."""
 304      return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
 305  
 306  
 307  _PATTERN_KEY_ALIASES: dict[str, set[str]] = {}
 308  for _pattern, _description in DANGEROUS_PATTERNS:
 309      _legacy_key = _legacy_pattern_key(_pattern)
 310      _canonical_key = _description
 311      _PATTERN_KEY_ALIASES.setdefault(_canonical_key, set()).update({_canonical_key, _legacy_key})
 312      _PATTERN_KEY_ALIASES.setdefault(_legacy_key, set()).update({_legacy_key, _canonical_key})
 313  
 314  
 315  def _approval_key_aliases(pattern_key: str) -> set[str]:
 316      """Return all approval keys that should match this pattern.
 317  
 318      New approvals use the human-readable description string, but older
 319      command_allowlist entries and session approvals may still contain the
 320      historical regex-derived key.
 321      """
 322      return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key})
 323  
 324  
 325  # =========================================================================
 326  # Detection
 327  # =========================================================================
 328  
 329  def _normalize_command_for_detection(command: str) -> str:
 330      """Normalize a command string before dangerous-pattern matching.
 331  
 332      Strips ANSI escape sequences (full ECMA-48 via tools.ansi_strip),
 333      null bytes, and normalizes Unicode fullwidth characters so that
 334      obfuscation techniques cannot bypass the pattern-based detection.
 335      """
 336      from tools.ansi_strip import strip_ansi
 337  
 338      # Strip all ANSI escape sequences (CSI, OSC, DCS, 8-bit C1, etc.)
 339      command = strip_ansi(command)
 340      # Strip null bytes
 341      command = command.replace('\x00', '')
 342      # Normalize Unicode (fullwidth Latin, halfwidth Katakana, etc.)
 343      command = unicodedata.normalize('NFKC', command)
 344      return command
 345  
 346  
 347  def detect_dangerous_command(command: str) -> tuple:
 348      """Check if a command matches any dangerous patterns.
 349  
 350      Returns:
 351          (is_dangerous, pattern_key, description) or (False, None, None)
 352      """
 353      command_lower = _normalize_command_for_detection(command).lower()
 354      for pattern_re, description in DANGEROUS_PATTERNS_COMPILED:
 355          if pattern_re.search(command_lower):
 356              pattern_key = description
 357              return (True, pattern_key, description)
 358      return (False, None, None)
 359  
 360  
 361  # =========================================================================
 362  # Per-session approval state (thread-safe)
 363  # =========================================================================
 364  
 365  _lock = threading.Lock()
 366  _pending: dict[str, dict] = {}
 367  _session_approved: dict[str, set] = {}
 368  _session_yolo: set[str] = set()
 369  _permanent_approved: set = set()
 370  
 371  # =========================================================================
 372  # Blocking gateway approval (mirrors CLI's synchronous input() flow)
 373  # =========================================================================
 374  # Per-session QUEUE of pending approvals.  Multiple threads (parallel
 375  # subagents, execute_code RPC handlers) can block concurrently — each gets
 376  # its own threading.Event.  /approve resolves the oldest, /approve all
 377  # resolves every pending approval in the session.
 378  
 379  
 380  class _ApprovalEntry:
 381      """One pending dangerous-command approval inside a gateway session."""
 382      __slots__ = ("event", "data", "result")
 383  
 384      def __init__(self, data: dict):
 385          self.event = threading.Event()
 386          self.data = data          # command, description, pattern_keys, …
 387          self.result: Optional[str] = None  # "once"|"session"|"always"|"deny"
 388  
 389  
 390  _gateway_queues: dict[str, list] = {}        # session_key → [_ApprovalEntry, …]
 391  _gateway_notify_cbs: dict[str, object] = {}  # session_key → callable(approval_data)
 392  
 393  
 394  def register_gateway_notify(session_key: str, cb) -> None:
 395      """Register a per-session callback for sending approval requests to the user.
 396  
 397      The callback signature is ``cb(approval_data: dict) -> None`` where
 398      *approval_data* contains ``command``, ``description``, and
 399      ``pattern_keys``.  The callback bridges sync→async (runs in the agent
 400      thread, must schedule the actual send on the event loop).
 401      """
 402      with _lock:
 403          _gateway_notify_cbs[session_key] = cb
 404  
 405  
 406  def unregister_gateway_notify(session_key: str) -> None:
 407      """Unregister the per-session gateway approval callback.
 408  
 409      Signals ALL blocked threads for this session so they don't hang forever
 410      (e.g. when the agent run finishes or is interrupted).
 411      """
 412      with _lock:
 413          _gateway_notify_cbs.pop(session_key, None)
 414          entries = _gateway_queues.pop(session_key, [])
 415      for entry in entries:
 416          entry.event.set()
 417  
 418  
 419  def resolve_gateway_approval(session_key: str, choice: str,
 420                               resolve_all: bool = False) -> int:
 421      """Called by the gateway's /approve or /deny handler to unblock
 422      waiting agent thread(s).
 423  
 424      When *resolve_all* is True every pending approval in the session is
 425      resolved at once (``/approve all``).  Otherwise only the oldest one
 426      is resolved (FIFO).
 427  
 428      Returns the number of approvals resolved (0 means nothing was pending).
 429      """
 430      with _lock:
 431          queue = _gateway_queues.get(session_key)
 432          if not queue:
 433              return 0
 434          if resolve_all:
 435              targets = list(queue)
 436              queue.clear()
 437          else:
 438              targets = [queue.pop(0)]
 439          if not queue:
 440              _gateway_queues.pop(session_key, None)
 441  
 442      for entry in targets:
 443          entry.result = choice
 444          entry.event.set()
 445      return len(targets)
 446  
 447  
 448  def has_blocking_approval(session_key: str) -> bool:
 449      """Check if a session has one or more blocking gateway approvals waiting."""
 450      with _lock:
 451          return bool(_gateway_queues.get(session_key))
 452  
 453  
 454  def submit_pending(session_key: str, approval: dict):
 455      """Store a pending approval request for a session."""
 456      with _lock:
 457          _pending[session_key] = approval
 458  
 459  
 460  def approve_session(session_key: str, pattern_key: str):
 461      """Approve a pattern for this session only."""
 462      with _lock:
 463          _session_approved.setdefault(session_key, set()).add(pattern_key)
 464  
 465  
 466  def enable_session_yolo(session_key: str) -> None:
 467      """Enable YOLO bypass for a single session key."""
 468      if not session_key:
 469          return
 470      with _lock:
 471          _session_yolo.add(session_key)
 472  
 473  
 474  def disable_session_yolo(session_key: str) -> None:
 475      """Disable YOLO bypass for a single session key."""
 476      if not session_key:
 477          return
 478      with _lock:
 479          _session_yolo.discard(session_key)
 480  
 481  
 482  def clear_session(session_key: str) -> None:
 483      """Remove all approval and yolo state for a given session."""
 484      if not session_key:
 485          return
 486      with _lock:
 487          _session_approved.pop(session_key, None)
 488          _session_yolo.discard(session_key)
 489          _pending.pop(session_key, None)
 490          entries = _gateway_queues.pop(session_key, [])
 491      for entry in entries:
 492          # Session-boundary cleanup should cancel any blocked approval waits
 493          # immediately so the old run can unwind instead of idling until timeout.
 494          entry.result = "deny"
 495          entry.event.set()
 496  
 497  
 498  def is_session_yolo_enabled(session_key: str) -> bool:
 499      """Return True when YOLO bypass is enabled for a specific session."""
 500      if not session_key:
 501          return False
 502      with _lock:
 503          return session_key in _session_yolo
 504  
 505  
 506  def is_current_session_yolo_enabled() -> bool:
 507      """Return True when the active approval session has YOLO bypass enabled."""
 508      return is_session_yolo_enabled(get_current_session_key(default=""))
 509  
 510  
 511  def is_approved(session_key: str, pattern_key: str) -> bool:
 512      """Check if a pattern is approved (session-scoped or permanent).
 513  
 514      Accept both the current canonical key and the legacy regex-derived key so
 515      existing command_allowlist entries continue to work after key migrations.
 516      """
 517      aliases = _approval_key_aliases(pattern_key)
 518      with _lock:
 519          if any(alias in _permanent_approved for alias in aliases):
 520              return True
 521          session_approvals = _session_approved.get(session_key, set())
 522          return any(alias in session_approvals for alias in aliases)
 523  
 524  
 525  def approve_permanent(pattern_key: str):
 526      """Add a pattern to the permanent allowlist."""
 527      with _lock:
 528          _permanent_approved.add(pattern_key)
 529  
 530  
 531  def load_permanent(patterns: set):
 532      """Bulk-load permanent allowlist entries from config."""
 533      with _lock:
 534          _permanent_approved.update(patterns)
 535  
 536  
 537  
 538  # =========================================================================
 539  # Config persistence for permanent allowlist
 540  # =========================================================================
 541  
 542  def load_permanent_allowlist() -> set:
 543      """Load permanently allowed command patterns from config.
 544  
 545      Also syncs them into the approval module so is_approved() works for
 546      patterns added via 'always' in a previous session.
 547      """
 548      try:
 549          from hermes_cli.config import load_config
 550          config = load_config()
 551          patterns = set(config.get("command_allowlist", []) or [])
 552          if patterns:
 553              load_permanent(patterns)
 554          return patterns
 555      except Exception as e:
 556          logger.warning("Failed to load permanent allowlist: %s", e)
 557          return set()
 558  
 559  
 560  def save_permanent_allowlist(patterns: set):
 561      """Save permanently allowed command patterns to config."""
 562      try:
 563          from hermes_cli.config import load_config, save_config
 564          config = load_config()
 565          config["command_allowlist"] = list(patterns)
 566          save_config(config)
 567      except Exception as e:
 568          logger.warning("Could not save allowlist: %s", e)
 569  
 570  
 571  # =========================================================================
 572  # Approval prompting + orchestration
 573  # =========================================================================
 574  
 575  def prompt_dangerous_approval(command: str, description: str,
 576                                timeout_seconds: int | None = None,
 577                                allow_permanent: bool = True,
 578                                approval_callback=None) -> str:
 579      """Prompt the user to approve a dangerous command (CLI only).
 580  
 581      Args:
 582          allow_permanent: When False, hide the [a]lways option (used when
 583              tirith warnings are present, since broad permanent allowlisting
 584              is inappropriate for content-level security findings).
 585          approval_callback: Optional callback registered by the CLI for
 586              prompt_toolkit integration. Signature:
 587              (command, description, *, allow_permanent=True) -> str.
 588  
 589      Returns: 'once', 'session', 'always', or 'deny'
 590      """
 591      if timeout_seconds is None:
 592          timeout_seconds = _get_approval_timeout()
 593  
 594      if approval_callback is not None:
 595          try:
 596              return approval_callback(command, description,
 597                                       allow_permanent=allow_permanent)
 598          except Exception as e:
 599              logger.error("Approval callback failed: %s", e, exc_info=True)
 600              return "deny"
 601  
 602      # Fail-closed guard: if prompt_toolkit owns the terminal (interactive
 603      # CLI session) and no approval callback is registered on this thread,
 604      # the input() fallback below would spawn a daemon thread whose read
 605      # can never see Enter -- the user's keystrokes go to prompt_toolkit,
 606      # not input(), producing an invisible 60s deadlock (issue #15216).
 607      # Deny fast and log loudly instead so the caller can surface a real
 608      # error to the agent. Any thread that needs interactive approval must
 609      # install a callback via tools.terminal_tool.set_approval_callback()
 610      # before reaching this point (see delegate_tool.py, run_agent.py
 611      # _execute_tool_calls_concurrent / _spawn_background_review for the
 612      # established pattern).
 613      try:
 614          from prompt_toolkit.application.current import get_app_or_none
 615          if get_app_or_none() is not None:
 616              logger.warning(
 617                  "Dangerous-command approval requested on a thread with no "
 618                  "approval callback while prompt_toolkit is active; denying "
 619                  "to avoid stdin deadlock. command=%r description=%r",
 620                  command, description,
 621              )
 622              return "deny"
 623      except Exception:
 624          # prompt_toolkit not installed, or detection failed -- fall through
 625          # to the legacy input() path (safe in non-TUI contexts: scripts,
 626          # tests, sshd, etc.).
 627          pass
 628  
 629      os.environ["HERMES_SPINNER_PAUSE"] = "1"
 630      try:
 631          while True:
 632              print()
 633              print(f"  ⚠️  DANGEROUS COMMAND: {description}")
 634              print(f"      {command}")
 635              print()
 636              if allow_permanent:
 637                  print("      [o]nce  |  [s]ession  |  [a]lways  |  [d]eny")
 638              else:
 639                  print("      [o]nce  |  [s]ession  |  [d]eny")
 640              print()
 641              sys.stdout.flush()
 642  
 643              result = {"choice": ""}
 644  
 645              def get_input():
 646                  try:
 647                      prompt = "      Choice [o/s/a/D]: " if allow_permanent else "      Choice [o/s/D]: "
 648                      result["choice"] = input(prompt).strip().lower()
 649                  except (EOFError, OSError):
 650                      result["choice"] = ""
 651  
 652              thread = threading.Thread(target=get_input, daemon=True)
 653              thread.start()
 654              thread.join(timeout=timeout_seconds)
 655  
 656              if thread.is_alive():
 657                  print("\n      ⏱ Timeout - denying command")
 658                  return "deny"
 659  
 660              choice = result["choice"]
 661              if choice in ('o', 'once'):
 662                  print("      ✓ Allowed once")
 663                  return "once"
 664              elif choice in ('s', 'session'):
 665                  print("      ✓ Allowed for this session")
 666                  return "session"
 667              elif choice in ('a', 'always'):
 668                  if not allow_permanent:
 669                      print("      ✓ Allowed for this session")
 670                      return "session"
 671                  print("      ✓ Added to permanent allowlist")
 672                  return "always"
 673              else:
 674                  print("      ✗ Denied")
 675                  return "deny"
 676  
 677      except (EOFError, KeyboardInterrupt):
 678          print("\n      ✗ Cancelled")
 679          return "deny"
 680      finally:
 681          if "HERMES_SPINNER_PAUSE" in os.environ:
 682              del os.environ["HERMES_SPINNER_PAUSE"]
 683          print()
 684          sys.stdout.flush()
 685  
 686  
 687  def _normalize_approval_mode(mode) -> str:
 688      """Normalize approval mode values loaded from YAML/config.
 689  
 690      YAML 1.1 treats bare words like `off` as booleans, so a config entry like
 691      `approvals:\n  mode: off` is parsed as False unless quoted. Treat that as the
 692      intended string mode instead of falling back to manual approvals.
 693      """
 694      if isinstance(mode, bool):
 695          return "off" if mode is False else "manual"
 696      if isinstance(mode, str):
 697          normalized = mode.strip().lower()
 698          return normalized or "manual"
 699      return "manual"
 700  
 701  
 702  def _get_approval_config() -> dict:
 703      """Read the approvals config block. Returns a dict with 'mode', 'timeout', etc."""
 704      try:
 705          from hermes_cli.config import load_config
 706          config = load_config()
 707          return config.get("approvals", {}) or {}
 708      except Exception as e:
 709          logger.warning("Failed to load approval config: %s", e)
 710          return {}
 711  
 712  
 713  def _get_approval_mode() -> str:
 714      """Read the approval mode from config. Returns 'manual', 'smart', or 'off'."""
 715      mode = _get_approval_config().get("mode", "manual")
 716      return _normalize_approval_mode(mode)
 717  
 718  
 719  def _get_approval_timeout() -> int:
 720      """Read the approval timeout from config. Defaults to 60 seconds."""
 721      try:
 722          return int(_get_approval_config().get("timeout", 60))
 723      except (ValueError, TypeError):
 724          return 60
 725  
 726  
 727  def _get_cron_approval_mode() -> str:
 728      """Read the cron approval mode from config. Returns 'deny' or 'approve'."""
 729      try:
 730          from hermes_cli.config import load_config
 731          config = load_config()
 732          mode = str(cfg_get(config, "approvals", "cron_mode", default="deny")).lower().strip()
 733          if mode in ("approve", "off", "allow", "yes"):
 734              return "approve"
 735          return "deny"
 736      except Exception:
 737          return "deny"
 738  
 739  
 740  def _smart_approve(command: str, description: str) -> str:
 741      """Use the auxiliary LLM to assess risk and decide approval.
 742  
 743      Returns 'approve' if the LLM determines the command is safe,
 744      'deny' if genuinely dangerous, or 'escalate' if uncertain.
 745  
 746      Inspired by OpenAI Codex's Smart Approvals guardian subagent
 747      (openai/codex#13860).
 748      """
 749      try:
 750          from agent.auxiliary_client import call_llm
 751  
 752          prompt = f"""You are a security reviewer for an AI coding agent. A terminal command was flagged by pattern matching as potentially dangerous.
 753  
 754  Command: {command}
 755  Flagged reason: {description}
 756  
 757  Assess the ACTUAL risk of this command. Many flagged commands are false positives — for example, `python -c "print('hello')"` is flagged as "script execution via -c flag" but is completely harmless.
 758  
 759  Rules:
 760  - APPROVE if the command is clearly safe (benign script execution, safe file operations, development tools, package installs, git operations, etc.)
 761  - DENY if the command could genuinely damage the system (recursive delete of important paths, overwriting system files, fork bombs, wiping disks, dropping databases, etc.)
 762  - ESCALATE if you're uncertain
 763  
 764  Respond with exactly one word: APPROVE, DENY, or ESCALATE"""
 765  
 766          response = call_llm(
 767              task="approval",
 768              messages=[{"role": "user", "content": prompt}],
 769              temperature=0,
 770              max_tokens=16,
 771          )
 772  
 773          answer = (response.choices[0].message.content or "").strip().upper()
 774  
 775          if "APPROVE" in answer:
 776              return "approve"
 777          elif "DENY" in answer:
 778              return "deny"
 779          else:
 780              return "escalate"
 781  
 782      except Exception as e:
 783          logger.debug("Smart approvals: LLM call failed (%s), escalating", e)
 784          return "escalate"
 785  
 786  
 787  def check_dangerous_command(command: str, env_type: str,
 788                              approval_callback=None) -> dict:
 789      """Check if a command is dangerous and handle approval.
 790  
 791      This is the main entry point called by terminal_tool before executing
 792      any command. It orchestrates detection, session checks, and prompting.
 793  
 794      Args:
 795          command: The shell command to check.
 796          env_type: Terminal backend type ('local', 'ssh', 'docker', etc.).
 797          approval_callback: Optional CLI callback for interactive prompts.
 798  
 799      Returns:
 800          {"approved": True/False, "message": str or None, ...}
 801      """
 802      if env_type in ("docker", "singularity", "modal", "daytona", "vercel_sandbox"):
 803          return {"approved": True, "message": None}
 804  
 805      # Hardline floor: commands with no recovery path (rm -rf /, mkfs, dd
 806      # to raw device, shutdown/reboot, fork bomb, kill -1) are blocked
 807      # unconditionally, BEFORE the yolo bypass.  Opting into yolo is
 808      # trusting the agent with your files and services, not trusting it
 809      # to wipe the disk or power the box off.
 810      is_hardline, hardline_desc = detect_hardline_command(command)
 811      if is_hardline:
 812          logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200])
 813          return _hardline_block_result(hardline_desc)
 814  
 815      # --yolo: bypass all approval prompts. Gateway /yolo is session-scoped;
 816      # CLI --yolo remains process-scoped via the env var for local use.
 817      if is_truthy_value(os.getenv("HERMES_YOLO_MODE")) or is_current_session_yolo_enabled():
 818          return {"approved": True, "message": None}
 819  
 820      is_dangerous, pattern_key, description = detect_dangerous_command(command)
 821      if not is_dangerous:
 822          return {"approved": True, "message": None}
 823  
 824      session_key = get_current_session_key()
 825      if is_approved(session_key, pattern_key):
 826          return {"approved": True, "message": None}
 827  
 828      is_cli = os.getenv("HERMES_INTERACTIVE")
 829      is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
 830  
 831      if not is_cli and not is_gateway:
 832          # Cron sessions: respect cron_mode config
 833          if os.getenv("HERMES_CRON_SESSION"):
 834              if _get_cron_approval_mode() == "deny":
 835                  return {
 836                      "approved": False,
 837                      "message": (
 838                          f"BLOCKED: Command flagged as dangerous ({description}) "
 839                          "but cron jobs run without a user present to approve it. "
 840                          "Find an alternative approach that avoids this command. "
 841                          "To allow dangerous commands in cron jobs, set "
 842                          "approvals.cron_mode: approve in config.yaml."
 843                      ),
 844                  }
 845          return {"approved": True, "message": None}
 846  
 847      if is_gateway or os.getenv("HERMES_EXEC_ASK"):
 848          submit_pending(session_key, {
 849              "command": command,
 850              "pattern_key": pattern_key,
 851              "description": description,
 852          })
 853          return {
 854              "approved": False,
 855              "pattern_key": pattern_key,
 856              "status": "approval_required",
 857              "command": command,
 858              "description": description,
 859              "message": (
 860                  f"⚠️ This command is potentially dangerous ({description}). "
 861                  f"Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
 862              ),
 863          }
 864  
 865      choice = prompt_dangerous_approval(command, description,
 866                                         approval_callback=approval_callback)
 867  
 868      if choice == "deny":
 869          return {
 870              "approved": False,
 871              "message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.",
 872              "pattern_key": pattern_key,
 873              "description": description,
 874          }
 875  
 876      if choice == "session":
 877          approve_session(session_key, pattern_key)
 878      elif choice == "always":
 879          approve_session(session_key, pattern_key)
 880          approve_permanent(pattern_key)
 881          save_permanent_allowlist(_permanent_approved)
 882  
 883      return {"approved": True, "message": None}
 884  
 885  
 886  # =========================================================================
 887  # Combined pre-exec guard (tirith + dangerous command detection)
 888  # =========================================================================
 889  
 890  def _format_tirith_description(tirith_result: dict) -> str:
 891      """Build a human-readable description from tirith findings.
 892  
 893      Includes severity, title, and description for each finding so users
 894      can make an informed approval decision.
 895      """
 896      findings = tirith_result.get("findings") or []
 897      if not findings:
 898          summary = tirith_result.get("summary") or "security issue detected"
 899          return f"Security scan: {summary}"
 900  
 901      parts = []
 902      for f in findings:
 903          severity = f.get("severity", "")
 904          title = f.get("title", "")
 905          desc = f.get("description", "")
 906          if title and desc:
 907              parts.append(f"[{severity}] {title}: {desc}" if severity else f"{title}: {desc}")
 908          elif title:
 909              parts.append(f"[{severity}] {title}" if severity else title)
 910      if not parts:
 911          summary = tirith_result.get("summary") or "security issue detected"
 912          return f"Security scan: {summary}"
 913  
 914      return "Security scan — " + "; ".join(parts)
 915  
 916  
 917  def check_all_command_guards(command: str, env_type: str,
 918                               approval_callback=None) -> dict:
 919      """Run all pre-exec security checks and return a single approval decision.
 920  
 921      Gathers findings from tirith and dangerous-command detection, then
 922      presents them as a single combined approval request. This prevents
 923      a gateway force=True replay from bypassing one check when only the
 924      other was shown to the user.
 925      """
 926      # Skip containers for both checks
 927      if env_type in ("docker", "singularity", "modal", "daytona", "vercel_sandbox"):
 928          return {"approved": True, "message": None}
 929  
 930      # Hardline floor: unconditional block for catastrophic commands
 931      # (rm -rf /, mkfs, dd to raw device, shutdown/reboot, fork bomb,
 932      # kill -1). Applies BEFORE yolo / mode=off / cron approve-mode so
 933      # no session-level setting can bypass it.
 934      is_hardline, hardline_desc = detect_hardline_command(command)
 935      if is_hardline:
 936          logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200])
 937          return _hardline_block_result(hardline_desc)
 938  
 939      # --yolo or approvals.mode=off: bypass all approval prompts.
 940      # Gateway /yolo is session-scoped; CLI --yolo remains process-scoped.
 941      approval_mode = _get_approval_mode()
 942      if is_truthy_value(os.getenv("HERMES_YOLO_MODE")) or is_current_session_yolo_enabled() or approval_mode == "off":
 943          return {"approved": True, "message": None}
 944  
 945      is_cli = os.getenv("HERMES_INTERACTIVE")
 946      is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
 947      is_ask = os.getenv("HERMES_EXEC_ASK")
 948  
 949      # Preserve the existing non-interactive behavior: outside CLI/gateway/ask
 950      # flows, we do not block on approvals and we skip external guard work.
 951      if not is_cli and not is_gateway and not is_ask:
 952          # Cron sessions: respect cron_mode config
 953          if os.getenv("HERMES_CRON_SESSION"):
 954              if _get_cron_approval_mode() == "deny":
 955                  # Run detection to get a description for the block message
 956                  is_dangerous, _pk, description = detect_dangerous_command(command)
 957                  if is_dangerous:
 958                      return {
 959                          "approved": False,
 960                          "message": (
 961                              f"BLOCKED: Command flagged as dangerous ({description}) "
 962                              "but cron jobs run without a user present to approve it. "
 963                              "Find an alternative approach that avoids this command. "
 964                              "To allow dangerous commands in cron jobs, set "
 965                              "approvals.cron_mode: approve in config.yaml."
 966                          ),
 967                      }
 968          return {"approved": True, "message": None}
 969  
 970      # --- Phase 1: Gather findings from both checks ---
 971  
 972      # Tirith check — wrapper guarantees no raise for expected failures.
 973      # Only catch ImportError (module not installed).
 974      tirith_result = {"action": "allow", "findings": [], "summary": ""}
 975      try:
 976          from tools.tirith_security import check_command_security
 977          tirith_result = check_command_security(command)
 978      except ImportError:
 979          pass  # tirith module not installed — allow
 980  
 981      # Dangerous command check (detection only, no approval)
 982      is_dangerous, pattern_key, description = detect_dangerous_command(command)
 983  
 984      # --- Phase 2: Decide ---
 985  
 986      # Collect warnings that need approval
 987      warnings = []  # list of (pattern_key, description, is_tirith)
 988  
 989      session_key = get_current_session_key()
 990  
 991      # Tirith block/warn → approvable warning with rich findings.
 992      # Previously, tirith "block" was a hard block with no approval prompt.
 993      # Now both block and warn go through the approval flow so users can
 994      # inspect the explanation and approve if they understand the risk.
 995      if tirith_result["action"] in ("block", "warn"):
 996          findings = tirith_result.get("findings") or []
 997          rule_id = findings[0].get("rule_id", "unknown") if findings else "unknown"
 998          tirith_key = f"tirith:{rule_id}"
 999          tirith_desc = _format_tirith_description(tirith_result)
1000          if not is_approved(session_key, tirith_key):
1001              warnings.append((tirith_key, tirith_desc, True))
1002  
1003      if is_dangerous:
1004          if not is_approved(session_key, pattern_key):
1005              warnings.append((pattern_key, description, False))
1006  
1007      # Nothing to warn about
1008      if not warnings:
1009          return {"approved": True, "message": None}
1010  
1011      # --- Phase 2.5: Smart approval (auxiliary LLM risk assessment) ---
1012      # When approvals.mode=smart, ask the aux LLM before prompting the user.
1013      # Inspired by OpenAI Codex's Smart Approvals guardian subagent
1014      # (openai/codex#13860).
1015      if approval_mode == "smart":
1016          combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
1017          verdict = _smart_approve(command, combined_desc_for_llm)
1018          if verdict == "approve":
1019              # Auto-approve and grant session-level approval for these patterns
1020              for key, _, _ in warnings:
1021                  approve_session(session_key, key)
1022              logger.debug("Smart approval: auto-approved '%s' (%s)",
1023                           command[:60], combined_desc_for_llm)
1024              return {"approved": True, "message": None,
1025                      "smart_approved": True,
1026                      "description": combined_desc_for_llm}
1027          elif verdict == "deny":
1028              combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
1029              return {
1030                  "approved": False,
1031                  "message": f"BLOCKED by smart approval: {combined_desc_for_llm}. "
1032                             "The command was assessed as genuinely dangerous. Do NOT retry.",
1033                  "smart_denied": True,
1034              }
1035          # verdict == "escalate" → fall through to manual prompt
1036  
1037      # --- Phase 3: Approval ---
1038  
1039      # Combine descriptions for a single approval prompt
1040      combined_desc = "; ".join(desc for _, desc, _ in warnings)
1041      primary_key = warnings[0][0]
1042      all_keys = [key for key, _, _ in warnings]
1043      has_tirith = any(is_t for _, _, is_t in warnings)
1044  
1045      # Gateway/async approval — block the agent thread until the user
1046      # responds with /approve or /deny, mirroring the CLI's synchronous
1047      # input() flow.  The agent never sees "approval_required"; it either
1048      # gets the command output (approved) or a definitive "BLOCKED" message.
1049      if is_gateway or is_ask:
1050          notify_cb = None
1051          with _lock:
1052              notify_cb = _gateway_notify_cbs.get(session_key)
1053  
1054          if notify_cb is not None:
1055              # --- Blocking gateway approval (queue-based) ---
1056              # Each call gets its own _ApprovalEntry so parallel subagents
1057              # and execute_code threads can block concurrently.
1058              approval_data = {
1059                  "command": command,
1060                  "pattern_key": primary_key,
1061                  "pattern_keys": all_keys,
1062                  "description": combined_desc,
1063              }
1064              entry = _ApprovalEntry(approval_data)
1065              with _lock:
1066                  _gateway_queues.setdefault(session_key, []).append(entry)
1067  
1068              # Notify plugins that an approval is being requested. Fires before
1069              # the gateway notify callback so observers (e.g. macOS notifier
1070              # plugins, audit logs, Slack alerts) get the event in real time.
1071              _fire_approval_hook(
1072                  "pre_approval_request",
1073                  command=command,
1074                  description=combined_desc,
1075                  pattern_key=primary_key,
1076                  pattern_keys=list(all_keys),
1077                  session_key=session_key,
1078                  surface="gateway",
1079              )
1080  
1081              # Notify the user (bridges sync agent thread → async gateway)
1082              try:
1083                  notify_cb(approval_data)
1084              except Exception as exc:
1085                  logger.warning("Gateway approval notify failed: %s", exc)
1086                  with _lock:
1087                      queue = _gateway_queues.get(session_key, [])
1088                      if entry in queue:
1089                          queue.remove(entry)
1090                      if not queue:
1091                          _gateway_queues.pop(session_key, None)
1092                  return {
1093                      "approved": False,
1094                      "message": "BLOCKED: Failed to send approval request to user. Do NOT retry.",
1095                      "pattern_key": primary_key,
1096                      "description": combined_desc,
1097                  }
1098  
1099              # Block until the user responds or timeout (default 5 min).
1100              # Poll in short slices so we can fire activity heartbeats every
1101              # ~10s to the agent's inactivity tracker.  Without this, the
1102              # blocking event.wait() never touches activity, and the
1103              # gateway's inactivity watchdog (agent.gateway_timeout, default
1104              # 1800s) kills the agent while the user is still responding to
1105              # the approval prompt.  Mirrors the _wait_for_process() cadence
1106              # in tools/environments/base.py.
1107              timeout = _get_approval_config().get("gateway_timeout", 300)
1108              try:
1109                  timeout = int(timeout)
1110              except (ValueError, TypeError):
1111                  timeout = 300
1112  
1113              try:
1114                  from tools.environments.base import touch_activity_if_due
1115              except Exception:  # pragma: no cover
1116                  touch_activity_if_due = None
1117  
1118              _now = time.monotonic()
1119              _deadline = _now + max(timeout, 0)
1120              _activity_state = {"last_touch": _now, "start": _now}
1121              resolved = False
1122              while True:
1123                  _remaining = _deadline - time.monotonic()
1124                  if _remaining <= 0:
1125                      break
1126                  # 1s poll slice — the event is set immediately when the
1127                  # user responds, so slice length only controls heartbeat
1128                  # cadence, not user-visible responsiveness.
1129                  if entry.event.wait(timeout=min(1.0, _remaining)):
1130                      resolved = True
1131                      break
1132                  if touch_activity_if_due is not None:
1133                      touch_activity_if_due(
1134                          _activity_state, "waiting for user approval"
1135                      )
1136  
1137              # Clean up this entry from the queue
1138              with _lock:
1139                  queue = _gateway_queues.get(session_key, [])
1140                  if entry in queue:
1141                      queue.remove(entry)
1142                  if not queue:
1143                      _gateway_queues.pop(session_key, None)
1144  
1145              choice = entry.result
1146              # Normalize outcome for the post hook. Unresolved (timeout) and
1147              # None both mean the user never responded; report that explicitly
1148              # so plugins can distinguish timeout from explicit deny.
1149              _outcome = (
1150                  "timeout" if not resolved
1151                  else (choice if choice else "timeout")
1152              )
1153              _fire_approval_hook(
1154                  "post_approval_response",
1155                  command=command,
1156                  description=combined_desc,
1157                  pattern_key=primary_key,
1158                  pattern_keys=list(all_keys),
1159                  session_key=session_key,
1160                  surface="gateway",
1161                  choice=_outcome,
1162              )
1163  
1164              if not resolved or choice is None or choice == "deny":
1165                  reason = "timed out" if not resolved else "denied by user"
1166                  return {
1167                      "approved": False,
1168                      "message": f"BLOCKED: Command {reason}. Do NOT retry this command.",
1169                      "pattern_key": primary_key,
1170                      "description": combined_desc,
1171                  }
1172  
1173              # User approved — persist based on scope (same logic as CLI)
1174              for key, _, is_tirith in warnings:
1175                  if choice == "session" or (choice == "always" and is_tirith):
1176                      approve_session(session_key, key)
1177                  elif choice == "always":
1178                      approve_session(session_key, key)
1179                      approve_permanent(key)
1180                      save_permanent_allowlist(_permanent_approved)
1181                  # choice == "once": no persistence — command allowed this
1182                  # single time only, matching the CLI's behavior.
1183  
1184              return {"approved": True, "message": None,
1185                      "user_approved": True, "description": combined_desc}
1186  
1187          # Fallback: no gateway callback registered (e.g. cron, batch).
1188          # Return approval_required for backward compat.
1189          submit_pending(session_key, {
1190              "command": command,
1191              "pattern_key": primary_key,
1192              "pattern_keys": all_keys,
1193              "description": combined_desc,
1194          })
1195          return {
1196              "approved": False,
1197              "pattern_key": primary_key,
1198              "status": "approval_required",
1199              "command": command,
1200              "description": combined_desc,
1201              "message": (
1202                  f"⚠️ {combined_desc}. Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
1203              ),
1204          }
1205  
1206      # CLI interactive: single combined prompt
1207      # Hide [a]lways when any tirith warning is present
1208      _fire_approval_hook(
1209          "pre_approval_request",
1210          command=command,
1211          description=combined_desc,
1212          pattern_key=primary_key,
1213          pattern_keys=list(all_keys),
1214          session_key=session_key,
1215          surface="cli",
1216      )
1217      choice = prompt_dangerous_approval(command, combined_desc,
1218                                         allow_permanent=not has_tirith,
1219                                         approval_callback=approval_callback)
1220      _fire_approval_hook(
1221          "post_approval_response",
1222          command=command,
1223          description=combined_desc,
1224          pattern_key=primary_key,
1225          pattern_keys=list(all_keys),
1226          session_key=session_key,
1227          surface="cli",
1228          choice=choice,
1229      )
1230  
1231      if choice == "deny":
1232          return {
1233              "approved": False,
1234              "message": "BLOCKED: User denied. Do NOT retry.",
1235              "pattern_key": primary_key,
1236              "description": combined_desc,
1237          }
1238  
1239      # Persist approval for each warning individually
1240      for key, _, is_tirith in warnings:
1241          if choice == "session" or (choice == "always" and is_tirith):
1242              # tirith: session only (no permanent broad allowlisting)
1243              approve_session(session_key, key)
1244          elif choice == "always":
1245              # dangerous patterns: permanent allowed
1246              approve_session(session_key, key)
1247              approve_permanent(key)
1248              save_permanent_allowlist(_permanent_approved)
1249  
1250      return {"approved": True, "message": None,
1251              "user_approved": True, "description": combined_desc}
1252  
1253  
1254  # Load permanent allowlist from config on module import
1255  load_permanent_allowlist()