approval.py
1 """Dangerous command approval -- detection, prompting, and per-session state. 2 3 This module is the single source of truth for the dangerous command system: 4 - Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command) 5 - Per-session approval state (thread-safe, keyed by session_key) 6 - Approval prompting (CLI interactive + gateway async) 7 - Smart approval via auxiliary LLM (auto-approve low-risk commands) 8 - Permanent allowlist persistence (config.yaml) 9 """ 10 11 import contextvars 12 import logging 13 import os 14 import re 15 import sys 16 import threading 17 import time 18 import unicodedata 19 from typing import Optional 20 from hermes_cli.config import cfg_get 21 22 from utils import is_truthy_value 23 24 logger = logging.getLogger(__name__) 25 26 # Per-thread/per-task gateway session identity. 27 # Gateway runs agent turns concurrently in executor threads, so reading a 28 # process-global env var for session identity is racy. Keep env fallback for 29 # legacy single-threaded callers, but prefer the context-local value when set. 30 _approval_session_key: contextvars.ContextVar[str] = contextvars.ContextVar( 31 "approval_session_key", 32 default="", 33 ) 34 35 36 def _fire_approval_hook(hook_name: str, **kwargs) -> None: 37 """Invoke a plugin lifecycle hook for the approval system. 38 39 Lazy-imports the plugin manager to avoid circular imports (approval.py is 40 imported very early, long before plugins are discovered). Never raises -- 41 plugin errors are logged and swallowed. 42 43 Only fires for the two approval-specific hooks in VALID_HOOKS: 44 pre_approval_request, post_approval_response. 45 """ 46 try: 47 from hermes_cli.plugins import invoke_hook 48 except Exception: 49 # Plugin system not available in this execution context 50 # (e.g. bare tool-only imports, minimal test environments). 51 return 52 try: 53 invoke_hook(hook_name, **kwargs) 54 except Exception as exc: 55 # invoke_hook() already swallows per-callback errors, so reaching here 56 # means the dispatch layer itself failed. Log and move on -- approval 57 # flow is safety-critical, plugin observability is not. 58 logger.debug("Approval hook %s dispatch failed: %s", hook_name, exc) 59 60 61 62 def set_current_session_key(session_key: str) -> contextvars.Token[str]: 63 """Bind the active approval session key to the current context.""" 64 return _approval_session_key.set(session_key or "") 65 66 67 def reset_current_session_key(token: contextvars.Token[str]) -> None: 68 """Restore the prior approval session key context.""" 69 _approval_session_key.reset(token) 70 71 72 def get_current_session_key(default: str = "default") -> str: 73 """Return the active session key, preferring context-local state. 74 75 Resolution order: 76 1. approval-specific contextvars (set by gateway before agent.run) 77 2. session_context contextvars (set by _set_session_env) 78 3. os.environ fallback (CLI, cron, tests) 79 """ 80 session_key = _approval_session_key.get() 81 if session_key: 82 return session_key 83 from gateway.session_context import get_session_env 84 return get_session_env("HERMES_SESSION_KEY", default) 85 86 # Sensitive write targets that should trigger approval even when referenced 87 # via shell expansions like $HOME or $HERMES_HOME. 88 _SSH_SENSITIVE_PATH = r'(?:~|\$home|\$\{home\})/\.ssh(?:/|$)' 89 _HERMES_ENV_PATH = ( 90 r'(?:~\/\.hermes/|' 91 r'(?:\$home|\$\{home\})/\.hermes/|' 92 r'(?:\$hermes_home|\$\{hermes_home\})/)' 93 r'\.env\b' 94 ) 95 _PROJECT_ENV_PATH = r'(?:(?:/|\.{1,2}/)?(?:[^\s/"\'`]+/)*\.env(?:\.[^/\s"\'`]+)*)' 96 _PROJECT_CONFIG_PATH = r'(?:(?:/|\.{1,2}/)?(?:[^\s/"\'`]+/)*config\.yaml)' 97 _SHELL_RC_FILES = ( 98 r'(?:~|\$home|\$\{home\})/\.' 99 r'(?:bashrc|zshrc|profile|bash_profile|zprofile)\b' 100 ) 101 _CREDENTIAL_FILES = ( 102 r'(?:~|\$home|\$\{home\})/\.' 103 r'(?:netrc|pgpass|npmrc|pypirc)\b' 104 ) 105 _SENSITIVE_WRITE_TARGET = ( 106 r'(?:/etc/|/dev/sd|' 107 rf'{_SSH_SENSITIVE_PATH}|' 108 rf'{_HERMES_ENV_PATH}|' 109 rf'{_SHELL_RC_FILES}|' 110 rf'{_CREDENTIAL_FILES})' 111 ) 112 _PROJECT_SENSITIVE_WRITE_TARGET = rf'(?:{_PROJECT_ENV_PATH}|{_PROJECT_CONFIG_PATH})' 113 _COMMAND_TAIL = r'(?:\s*(?:&&|\|\||;).*)?$' 114 115 # ========================================================================= 116 # Hardline (unconditional) blocklist 117 # ========================================================================= 118 # 119 # Commands so catastrophic they should NEVER run via the agent, regardless 120 # of --yolo, /yolo, approvals.mode=off, or cron approve mode. This is a 121 # floor below yolo: opting into yolo is the user trusting the agent with 122 # their files and services, not trusting it to wipe the disk or power the 123 # box off. 124 # 125 # Hardline only applies to environments that can actually damage the host 126 # (local, ssh, container-host cron). Containerized backends (docker, 127 # singularity, modal, daytona) already bypass the dangerous-command layer 128 # because nothing they do can touch the host, so we leave that behavior 129 # alone. 130 # 131 # The list is deliberately tiny — only things with no recovery path: 132 # filesystem destruction rooted at /, raw block device overwrites, kernel 133 # shutdown/reboot, and denial-of-service commands that take the host down. 134 # Recoverable-but-costly operations (git reset --hard, rm -rf /tmp/x, 135 # chmod -R 777, curl|sh) stay in DANGEROUS_PATTERNS where yolo can pass 136 # them through — that's what yolo is for. 137 # 138 # Inspired by Mercury Agent's permission-hardened blocklist 139 # (https://github.com/cosmicstack-labs/mercury-agent). 140 141 # Regex fragment matching the *start* of a command (i.e. positions where 142 # a shell would begin parsing a new command). Used by shutdown/reboot 143 # patterns so they don't fire on "echo reboot" or "grep 'shutdown' log". 144 # Matches: start of string, after command separators (; && || | newline), 145 # after subshell openers ( `$(` or backtick ), optionally consuming 146 # leading wrapper commands (sudo, env VAR=VAL, exec, nohup, setsid). 147 _CMDPOS = ( 148 r'(?:^|[;&|\n`]|\$\()' # start position 149 r'\s*' # optional whitespace 150 r'(?:sudo\s+(?:-[^\s]+\s+)*)?' # optional sudo with flags 151 r'(?:env\s+(?:\w+=\S*\s+)*)?' # optional env with VAR=VAL pairs 152 r'(?:(?:exec|nohup|setsid|time)\s+)*' # optional wrapper commands 153 r'\s*' 154 ) 155 156 HARDLINE_PATTERNS = [ 157 # rm recursive targeting the root filesystem or protected roots 158 (r'\brm\s+(-[^\s]*\s+)*(/|/\*|/ \*)(\s|$)', "recursive delete of root filesystem"), 159 (r'\brm\s+(-[^\s]*\s+)*(/home|/home/\*|/root|/root/\*|/etc|/etc/\*|/usr|/usr/\*|/var|/var/\*|/bin|/bin/\*|/sbin|/sbin/\*|/boot|/boot/\*|/lib|/lib/\*)(\s|$)', "recursive delete of system directory"), 160 (r'\brm\s+(-[^\s]*\s+)*(~|\$HOME)(/?|/\*)?(\s|$)', "recursive delete of home directory"), 161 # Filesystem format 162 (r'\bmkfs(\.[a-z0-9]+)?\b', "format filesystem (mkfs)"), 163 # Raw block device overwrites (dd + redirection) 164 (r'\bdd\b[^\n]*\bof=/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*', "dd to raw block device"), 165 (r'>\s*/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*\b', "redirect to raw block device"), 166 # Fork bomb (classic shell form) 167 (r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"), 168 # Kill every process on the system 169 (r'\bkill\s+(-[^\s]+\s+)*-1\b', "kill all processes"), 170 # System shutdown / reboot — anchor to command position (start of line, 171 # after a command separator, or after sudo/env wrappers) so we don't 172 # false-positive on "echo reboot" or "grep 'shutdown' logs". 173 # _CMDPOS matches start-of-command positions. 174 (_CMDPOS + r'(shutdown|reboot|halt|poweroff)\b', "system shutdown/reboot"), 175 (_CMDPOS + r'init\s+[06]\b', "init 0/6 (shutdown/reboot)"), 176 (_CMDPOS + r'systemctl\s+(poweroff|reboot|halt|kexec)\b', "systemctl poweroff/reboot"), 177 (_CMDPOS + r'telinit\s+[06]\b', "telinit 0/6 (shutdown/reboot)"), 178 ] 179 180 # Pre-compiled variant used by the hot-path matcher. Building these at module 181 # load eliminates the ~2.6 ms cold-cache re.compile fan-out on the first 182 # terminal() call per process (12 HARDLINE + 47 DANGEROUS patterns, each 183 # potentially evicted from Python's 512-entry ``re._cache`` by unrelated 184 # regex work elsewhere in the agent). DANGEROUS_PATTERNS_COMPILED is built 185 # at the end of this module after DANGEROUS_PATTERNS is defined. 186 _RE_FLAGS = re.IGNORECASE | re.DOTALL 187 HARDLINE_PATTERNS_COMPILED = [ 188 (re.compile(pattern, _RE_FLAGS), description) 189 for pattern, description in HARDLINE_PATTERNS 190 ] 191 192 193 def detect_hardline_command(command: str) -> tuple: 194 """Check if a command matches the unconditional hardline blocklist. 195 196 Returns: 197 (is_hardline, description) or (False, None) 198 """ 199 normalized = _normalize_command_for_detection(command).lower() 200 for pattern_re, description in HARDLINE_PATTERNS_COMPILED: 201 if pattern_re.search(normalized): 202 return (True, description) 203 return (False, None) 204 205 206 def _hardline_block_result(description: str) -> dict: 207 """Build the standard block result for a hardline match.""" 208 return { 209 "approved": False, 210 "hardline": True, 211 "message": ( 212 f"BLOCKED (hardline): {description}. " 213 "This command is on the unconditional blocklist and cannot " 214 "be executed via the agent — not even with --yolo, /yolo, " 215 "approvals.mode=off, or cron approve mode. If you genuinely " 216 "need to run it, run it yourself in a terminal outside the " 217 "agent." 218 ), 219 } 220 221 222 # ========================================================================= 223 # Dangerous command patterns 224 # ========================================================================= 225 226 DANGEROUS_PATTERNS = [ 227 (r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"), 228 (r'\brm\s+-[^\s]*r', "recursive delete"), 229 (r'\brm\s+--recursive\b', "recursive delete (long flag)"), 230 (r'\bchmod\s+(-[^\s]*\s+)*(777|666|o\+[rwx]*w|a\+[rwx]*w)\b', "world/other-writable permissions"), 231 (r'\bchmod\s+--recursive\b.*(777|666|o\+[rwx]*w|a\+[rwx]*w)', "recursive world/other-writable (long flag)"), 232 (r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"), 233 (r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"), 234 (r'\bmkfs\b', "format filesystem"), 235 (r'\bdd\s+.*if=', "disk copy"), 236 (r'>\s*/dev/sd', "write to block device"), 237 (r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"), 238 (r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"), 239 (r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"), 240 (r'>\s*/etc/', "overwrite system config"), 241 (r'\bsystemctl\s+(-[^\s]+\s+)*(stop|restart|disable|mask)\b', "stop/restart system service"), 242 (r'\bkill\s+-9\s+-1\b', "kill all processes"), 243 (r'\bpkill\s+-9\b', "force kill processes"), 244 (r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"), 245 # Any shell invocation via -c or combined flags like -lc, -ic, etc. 246 (r'\b(bash|sh|zsh|ksh)\s+-[^\s]*c(\s+|$)', "shell command via -c/-lc flag"), 247 (r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"), 248 (r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"), 249 (r'\b(bash|sh|zsh|ksh)\s+<\s*<?\s*\(\s*(curl|wget)\b', "execute remote script via process substitution"), 250 (rf'\btee\b.*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via tee"), 251 (rf'>>?\s*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via redirection"), 252 (rf'\btee\b.*["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config via tee"), 253 (rf'>>?\s*["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config via redirection"), 254 (r'\bxargs\s+.*\brm\b', "xargs with rm"), 255 (r'\bfind\b.*-exec\s+(/\S*/)?rm\b', "find -exec rm"), 256 (r'\bfind\b.*-delete\b', "find -delete"), 257 # Gateway lifecycle protection: prevent the agent from killing its own 258 # gateway process. These commands trigger a gateway restart/stop that 259 # terminates all running agents mid-work. 260 (r'\bhermes\s+gateway\s+(stop|restart)\b', "stop/restart hermes gateway (kills running agents)"), 261 (r'\bhermes\s+update\b', "hermes update (restarts gateway, kills running agents)"), 262 # Gateway protection: never start gateway outside systemd management 263 (r'gateway\s+run\b.*(&\s*$|&\s*;|\bdisown\b|\bsetsid\b)', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"), 264 (r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"), 265 # Self-termination protection: prevent agent from killing its own process 266 (r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"), 267 # Self-termination via kill + command substitution (pgrep/pidof). 268 # The name-based pattern above catches `pkill hermes` but not 269 # `kill -9 $(pgrep -f hermes)` because the substitution is opaque 270 # to regex at detection time. Catch the structural pattern instead. 271 (r'\bkill\b.*\$\(\s*pgrep\b', "kill process via pgrep expansion (self-termination)"), 272 (r'\bkill\b.*`\s*pgrep\b', "kill process via backtick pgrep expansion (self-termination)"), 273 # File copy/move/edit into sensitive system paths 274 (r'\b(cp|mv|install)\b.*\s/etc/', "copy/move file into /etc/"), 275 (rf'\b(cp|mv|install)\b.*\s["\']?{_PROJECT_SENSITIVE_WRITE_TARGET}["\']?{_COMMAND_TAIL}', "overwrite project env/config file"), 276 (r'\bsed\s+-[^\s]*i.*\s/etc/', "in-place edit of system config"), 277 (r'\bsed\s+--in-place\b.*\s/etc/', "in-place edit of system config (long flag)"), 278 # Script execution via heredoc — bypasses the -e/-c flag patterns above. 279 # `python3 << 'EOF'` feeds arbitrary code via stdin without -c/-e flags. 280 (r'\b(python[23]?|perl|ruby|node)\s+<<', "script execution via heredoc"), 281 # Git destructive operations that can lose uncommitted work or rewrite 282 # shared history. Not captured by rm/chmod/etc patterns. 283 (r'\bgit\s+reset\s+--hard\b', "git reset --hard (destroys uncommitted changes)"), 284 (r'\bgit\s+push\b.*--force\b', "git force push (rewrites remote history)"), 285 (r'\bgit\s+push\b.*-f\b', "git force push short flag (rewrites remote history)"), 286 (r'\bgit\s+clean\s+-[^\s]*f', "git clean with force (deletes untracked files)"), 287 (r'\bgit\s+branch\s+-D\b', "git branch force delete"), 288 # Script execution after chmod +x — catches the two-step pattern where 289 # a script is first made executable then immediately run. The script 290 # content may contain dangerous commands that individual patterns miss. 291 (r'\bchmod\s+\+x\b.*[;&|]+\s*\./', "chmod +x followed by immediate execution"), 292 ] 293 294 295 # Pre-compiled variant (same rationale as HARDLINE_PATTERNS_COMPILED above). 296 DANGEROUS_PATTERNS_COMPILED = [ 297 (re.compile(pattern, _RE_FLAGS), description) 298 for pattern, description in DANGEROUS_PATTERNS 299 ] 300 301 302 def _legacy_pattern_key(pattern: str) -> str: 303 """Reproduce the old regex-derived approval key for backwards compatibility.""" 304 return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20] 305 306 307 _PATTERN_KEY_ALIASES: dict[str, set[str]] = {} 308 for _pattern, _description in DANGEROUS_PATTERNS: 309 _legacy_key = _legacy_pattern_key(_pattern) 310 _canonical_key = _description 311 _PATTERN_KEY_ALIASES.setdefault(_canonical_key, set()).update({_canonical_key, _legacy_key}) 312 _PATTERN_KEY_ALIASES.setdefault(_legacy_key, set()).update({_legacy_key, _canonical_key}) 313 314 315 def _approval_key_aliases(pattern_key: str) -> set[str]: 316 """Return all approval keys that should match this pattern. 317 318 New approvals use the human-readable description string, but older 319 command_allowlist entries and session approvals may still contain the 320 historical regex-derived key. 321 """ 322 return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key}) 323 324 325 # ========================================================================= 326 # Detection 327 # ========================================================================= 328 329 def _normalize_command_for_detection(command: str) -> str: 330 """Normalize a command string before dangerous-pattern matching. 331 332 Strips ANSI escape sequences (full ECMA-48 via tools.ansi_strip), 333 null bytes, and normalizes Unicode fullwidth characters so that 334 obfuscation techniques cannot bypass the pattern-based detection. 335 """ 336 from tools.ansi_strip import strip_ansi 337 338 # Strip all ANSI escape sequences (CSI, OSC, DCS, 8-bit C1, etc.) 339 command = strip_ansi(command) 340 # Strip null bytes 341 command = command.replace('\x00', '') 342 # Normalize Unicode (fullwidth Latin, halfwidth Katakana, etc.) 343 command = unicodedata.normalize('NFKC', command) 344 return command 345 346 347 def detect_dangerous_command(command: str) -> tuple: 348 """Check if a command matches any dangerous patterns. 349 350 Returns: 351 (is_dangerous, pattern_key, description) or (False, None, None) 352 """ 353 command_lower = _normalize_command_for_detection(command).lower() 354 for pattern_re, description in DANGEROUS_PATTERNS_COMPILED: 355 if pattern_re.search(command_lower): 356 pattern_key = description 357 return (True, pattern_key, description) 358 return (False, None, None) 359 360 361 # ========================================================================= 362 # Per-session approval state (thread-safe) 363 # ========================================================================= 364 365 _lock = threading.Lock() 366 _pending: dict[str, dict] = {} 367 _session_approved: dict[str, set] = {} 368 _session_yolo: set[str] = set() 369 _permanent_approved: set = set() 370 371 # ========================================================================= 372 # Blocking gateway approval (mirrors CLI's synchronous input() flow) 373 # ========================================================================= 374 # Per-session QUEUE of pending approvals. Multiple threads (parallel 375 # subagents, execute_code RPC handlers) can block concurrently — each gets 376 # its own threading.Event. /approve resolves the oldest, /approve all 377 # resolves every pending approval in the session. 378 379 380 class _ApprovalEntry: 381 """One pending dangerous-command approval inside a gateway session.""" 382 __slots__ = ("event", "data", "result") 383 384 def __init__(self, data: dict): 385 self.event = threading.Event() 386 self.data = data # command, description, pattern_keys, … 387 self.result: Optional[str] = None # "once"|"session"|"always"|"deny" 388 389 390 _gateway_queues: dict[str, list] = {} # session_key → [_ApprovalEntry, …] 391 _gateway_notify_cbs: dict[str, object] = {} # session_key → callable(approval_data) 392 393 394 def register_gateway_notify(session_key: str, cb) -> None: 395 """Register a per-session callback for sending approval requests to the user. 396 397 The callback signature is ``cb(approval_data: dict) -> None`` where 398 *approval_data* contains ``command``, ``description``, and 399 ``pattern_keys``. The callback bridges sync→async (runs in the agent 400 thread, must schedule the actual send on the event loop). 401 """ 402 with _lock: 403 _gateway_notify_cbs[session_key] = cb 404 405 406 def unregister_gateway_notify(session_key: str) -> None: 407 """Unregister the per-session gateway approval callback. 408 409 Signals ALL blocked threads for this session so they don't hang forever 410 (e.g. when the agent run finishes or is interrupted). 411 """ 412 with _lock: 413 _gateway_notify_cbs.pop(session_key, None) 414 entries = _gateway_queues.pop(session_key, []) 415 for entry in entries: 416 entry.event.set() 417 418 419 def resolve_gateway_approval(session_key: str, choice: str, 420 resolve_all: bool = False) -> int: 421 """Called by the gateway's /approve or /deny handler to unblock 422 waiting agent thread(s). 423 424 When *resolve_all* is True every pending approval in the session is 425 resolved at once (``/approve all``). Otherwise only the oldest one 426 is resolved (FIFO). 427 428 Returns the number of approvals resolved (0 means nothing was pending). 429 """ 430 with _lock: 431 queue = _gateway_queues.get(session_key) 432 if not queue: 433 return 0 434 if resolve_all: 435 targets = list(queue) 436 queue.clear() 437 else: 438 targets = [queue.pop(0)] 439 if not queue: 440 _gateway_queues.pop(session_key, None) 441 442 for entry in targets: 443 entry.result = choice 444 entry.event.set() 445 return len(targets) 446 447 448 def has_blocking_approval(session_key: str) -> bool: 449 """Check if a session has one or more blocking gateway approvals waiting.""" 450 with _lock: 451 return bool(_gateway_queues.get(session_key)) 452 453 454 def submit_pending(session_key: str, approval: dict): 455 """Store a pending approval request for a session.""" 456 with _lock: 457 _pending[session_key] = approval 458 459 460 def approve_session(session_key: str, pattern_key: str): 461 """Approve a pattern for this session only.""" 462 with _lock: 463 _session_approved.setdefault(session_key, set()).add(pattern_key) 464 465 466 def enable_session_yolo(session_key: str) -> None: 467 """Enable YOLO bypass for a single session key.""" 468 if not session_key: 469 return 470 with _lock: 471 _session_yolo.add(session_key) 472 473 474 def disable_session_yolo(session_key: str) -> None: 475 """Disable YOLO bypass for a single session key.""" 476 if not session_key: 477 return 478 with _lock: 479 _session_yolo.discard(session_key) 480 481 482 def clear_session(session_key: str) -> None: 483 """Remove all approval and yolo state for a given session.""" 484 if not session_key: 485 return 486 with _lock: 487 _session_approved.pop(session_key, None) 488 _session_yolo.discard(session_key) 489 _pending.pop(session_key, None) 490 entries = _gateway_queues.pop(session_key, []) 491 for entry in entries: 492 # Session-boundary cleanup should cancel any blocked approval waits 493 # immediately so the old run can unwind instead of idling until timeout. 494 entry.result = "deny" 495 entry.event.set() 496 497 498 def is_session_yolo_enabled(session_key: str) -> bool: 499 """Return True when YOLO bypass is enabled for a specific session.""" 500 if not session_key: 501 return False 502 with _lock: 503 return session_key in _session_yolo 504 505 506 def is_current_session_yolo_enabled() -> bool: 507 """Return True when the active approval session has YOLO bypass enabled.""" 508 return is_session_yolo_enabled(get_current_session_key(default="")) 509 510 511 def is_approved(session_key: str, pattern_key: str) -> bool: 512 """Check if a pattern is approved (session-scoped or permanent). 513 514 Accept both the current canonical key and the legacy regex-derived key so 515 existing command_allowlist entries continue to work after key migrations. 516 """ 517 aliases = _approval_key_aliases(pattern_key) 518 with _lock: 519 if any(alias in _permanent_approved for alias in aliases): 520 return True 521 session_approvals = _session_approved.get(session_key, set()) 522 return any(alias in session_approvals for alias in aliases) 523 524 525 def approve_permanent(pattern_key: str): 526 """Add a pattern to the permanent allowlist.""" 527 with _lock: 528 _permanent_approved.add(pattern_key) 529 530 531 def load_permanent(patterns: set): 532 """Bulk-load permanent allowlist entries from config.""" 533 with _lock: 534 _permanent_approved.update(patterns) 535 536 537 538 # ========================================================================= 539 # Config persistence for permanent allowlist 540 # ========================================================================= 541 542 def load_permanent_allowlist() -> set: 543 """Load permanently allowed command patterns from config. 544 545 Also syncs them into the approval module so is_approved() works for 546 patterns added via 'always' in a previous session. 547 """ 548 try: 549 from hermes_cli.config import load_config 550 config = load_config() 551 patterns = set(config.get("command_allowlist", []) or []) 552 if patterns: 553 load_permanent(patterns) 554 return patterns 555 except Exception as e: 556 logger.warning("Failed to load permanent allowlist: %s", e) 557 return set() 558 559 560 def save_permanent_allowlist(patterns: set): 561 """Save permanently allowed command patterns to config.""" 562 try: 563 from hermes_cli.config import load_config, save_config 564 config = load_config() 565 config["command_allowlist"] = list(patterns) 566 save_config(config) 567 except Exception as e: 568 logger.warning("Could not save allowlist: %s", e) 569 570 571 # ========================================================================= 572 # Approval prompting + orchestration 573 # ========================================================================= 574 575 def prompt_dangerous_approval(command: str, description: str, 576 timeout_seconds: int | None = None, 577 allow_permanent: bool = True, 578 approval_callback=None) -> str: 579 """Prompt the user to approve a dangerous command (CLI only). 580 581 Args: 582 allow_permanent: When False, hide the [a]lways option (used when 583 tirith warnings are present, since broad permanent allowlisting 584 is inappropriate for content-level security findings). 585 approval_callback: Optional callback registered by the CLI for 586 prompt_toolkit integration. Signature: 587 (command, description, *, allow_permanent=True) -> str. 588 589 Returns: 'once', 'session', 'always', or 'deny' 590 """ 591 if timeout_seconds is None: 592 timeout_seconds = _get_approval_timeout() 593 594 if approval_callback is not None: 595 try: 596 return approval_callback(command, description, 597 allow_permanent=allow_permanent) 598 except Exception as e: 599 logger.error("Approval callback failed: %s", e, exc_info=True) 600 return "deny" 601 602 # Fail-closed guard: if prompt_toolkit owns the terminal (interactive 603 # CLI session) and no approval callback is registered on this thread, 604 # the input() fallback below would spawn a daemon thread whose read 605 # can never see Enter -- the user's keystrokes go to prompt_toolkit, 606 # not input(), producing an invisible 60s deadlock (issue #15216). 607 # Deny fast and log loudly instead so the caller can surface a real 608 # error to the agent. Any thread that needs interactive approval must 609 # install a callback via tools.terminal_tool.set_approval_callback() 610 # before reaching this point (see delegate_tool.py, run_agent.py 611 # _execute_tool_calls_concurrent / _spawn_background_review for the 612 # established pattern). 613 try: 614 from prompt_toolkit.application.current import get_app_or_none 615 if get_app_or_none() is not None: 616 logger.warning( 617 "Dangerous-command approval requested on a thread with no " 618 "approval callback while prompt_toolkit is active; denying " 619 "to avoid stdin deadlock. command=%r description=%r", 620 command, description, 621 ) 622 return "deny" 623 except Exception: 624 # prompt_toolkit not installed, or detection failed -- fall through 625 # to the legacy input() path (safe in non-TUI contexts: scripts, 626 # tests, sshd, etc.). 627 pass 628 629 os.environ["HERMES_SPINNER_PAUSE"] = "1" 630 try: 631 while True: 632 print() 633 print(f" ⚠️ DANGEROUS COMMAND: {description}") 634 print(f" {command}") 635 print() 636 if allow_permanent: 637 print(" [o]nce | [s]ession | [a]lways | [d]eny") 638 else: 639 print(" [o]nce | [s]ession | [d]eny") 640 print() 641 sys.stdout.flush() 642 643 result = {"choice": ""} 644 645 def get_input(): 646 try: 647 prompt = " Choice [o/s/a/D]: " if allow_permanent else " Choice [o/s/D]: " 648 result["choice"] = input(prompt).strip().lower() 649 except (EOFError, OSError): 650 result["choice"] = "" 651 652 thread = threading.Thread(target=get_input, daemon=True) 653 thread.start() 654 thread.join(timeout=timeout_seconds) 655 656 if thread.is_alive(): 657 print("\n ⏱ Timeout - denying command") 658 return "deny" 659 660 choice = result["choice"] 661 if choice in ('o', 'once'): 662 print(" ✓ Allowed once") 663 return "once" 664 elif choice in ('s', 'session'): 665 print(" ✓ Allowed for this session") 666 return "session" 667 elif choice in ('a', 'always'): 668 if not allow_permanent: 669 print(" ✓ Allowed for this session") 670 return "session" 671 print(" ✓ Added to permanent allowlist") 672 return "always" 673 else: 674 print(" ✗ Denied") 675 return "deny" 676 677 except (EOFError, KeyboardInterrupt): 678 print("\n ✗ Cancelled") 679 return "deny" 680 finally: 681 if "HERMES_SPINNER_PAUSE" in os.environ: 682 del os.environ["HERMES_SPINNER_PAUSE"] 683 print() 684 sys.stdout.flush() 685 686 687 def _normalize_approval_mode(mode) -> str: 688 """Normalize approval mode values loaded from YAML/config. 689 690 YAML 1.1 treats bare words like `off` as booleans, so a config entry like 691 `approvals:\n mode: off` is parsed as False unless quoted. Treat that as the 692 intended string mode instead of falling back to manual approvals. 693 """ 694 if isinstance(mode, bool): 695 return "off" if mode is False else "manual" 696 if isinstance(mode, str): 697 normalized = mode.strip().lower() 698 return normalized or "manual" 699 return "manual" 700 701 702 def _get_approval_config() -> dict: 703 """Read the approvals config block. Returns a dict with 'mode', 'timeout', etc.""" 704 try: 705 from hermes_cli.config import load_config 706 config = load_config() 707 return config.get("approvals", {}) or {} 708 except Exception as e: 709 logger.warning("Failed to load approval config: %s", e) 710 return {} 711 712 713 def _get_approval_mode() -> str: 714 """Read the approval mode from config. Returns 'manual', 'smart', or 'off'.""" 715 mode = _get_approval_config().get("mode", "manual") 716 return _normalize_approval_mode(mode) 717 718 719 def _get_approval_timeout() -> int: 720 """Read the approval timeout from config. Defaults to 60 seconds.""" 721 try: 722 return int(_get_approval_config().get("timeout", 60)) 723 except (ValueError, TypeError): 724 return 60 725 726 727 def _get_cron_approval_mode() -> str: 728 """Read the cron approval mode from config. Returns 'deny' or 'approve'.""" 729 try: 730 from hermes_cli.config import load_config 731 config = load_config() 732 mode = str(cfg_get(config, "approvals", "cron_mode", default="deny")).lower().strip() 733 if mode in ("approve", "off", "allow", "yes"): 734 return "approve" 735 return "deny" 736 except Exception: 737 return "deny" 738 739 740 def _smart_approve(command: str, description: str) -> str: 741 """Use the auxiliary LLM to assess risk and decide approval. 742 743 Returns 'approve' if the LLM determines the command is safe, 744 'deny' if genuinely dangerous, or 'escalate' if uncertain. 745 746 Inspired by OpenAI Codex's Smart Approvals guardian subagent 747 (openai/codex#13860). 748 """ 749 try: 750 from agent.auxiliary_client import call_llm 751 752 prompt = f"""You are a security reviewer for an AI coding agent. A terminal command was flagged by pattern matching as potentially dangerous. 753 754 Command: {command} 755 Flagged reason: {description} 756 757 Assess the ACTUAL risk of this command. Many flagged commands are false positives — for example, `python -c "print('hello')"` is flagged as "script execution via -c flag" but is completely harmless. 758 759 Rules: 760 - APPROVE if the command is clearly safe (benign script execution, safe file operations, development tools, package installs, git operations, etc.) 761 - DENY if the command could genuinely damage the system (recursive delete of important paths, overwriting system files, fork bombs, wiping disks, dropping databases, etc.) 762 - ESCALATE if you're uncertain 763 764 Respond with exactly one word: APPROVE, DENY, or ESCALATE""" 765 766 response = call_llm( 767 task="approval", 768 messages=[{"role": "user", "content": prompt}], 769 temperature=0, 770 max_tokens=16, 771 ) 772 773 answer = (response.choices[0].message.content or "").strip().upper() 774 775 if "APPROVE" in answer: 776 return "approve" 777 elif "DENY" in answer: 778 return "deny" 779 else: 780 return "escalate" 781 782 except Exception as e: 783 logger.debug("Smart approvals: LLM call failed (%s), escalating", e) 784 return "escalate" 785 786 787 def check_dangerous_command(command: str, env_type: str, 788 approval_callback=None) -> dict: 789 """Check if a command is dangerous and handle approval. 790 791 This is the main entry point called by terminal_tool before executing 792 any command. It orchestrates detection, session checks, and prompting. 793 794 Args: 795 command: The shell command to check. 796 env_type: Terminal backend type ('local', 'ssh', 'docker', etc.). 797 approval_callback: Optional CLI callback for interactive prompts. 798 799 Returns: 800 {"approved": True/False, "message": str or None, ...} 801 """ 802 if env_type in ("docker", "singularity", "modal", "daytona", "vercel_sandbox"): 803 return {"approved": True, "message": None} 804 805 # Hardline floor: commands with no recovery path (rm -rf /, mkfs, dd 806 # to raw device, shutdown/reboot, fork bomb, kill -1) are blocked 807 # unconditionally, BEFORE the yolo bypass. Opting into yolo is 808 # trusting the agent with your files and services, not trusting it 809 # to wipe the disk or power the box off. 810 is_hardline, hardline_desc = detect_hardline_command(command) 811 if is_hardline: 812 logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200]) 813 return _hardline_block_result(hardline_desc) 814 815 # --yolo: bypass all approval prompts. Gateway /yolo is session-scoped; 816 # CLI --yolo remains process-scoped via the env var for local use. 817 if is_truthy_value(os.getenv("HERMES_YOLO_MODE")) or is_current_session_yolo_enabled(): 818 return {"approved": True, "message": None} 819 820 is_dangerous, pattern_key, description = detect_dangerous_command(command) 821 if not is_dangerous: 822 return {"approved": True, "message": None} 823 824 session_key = get_current_session_key() 825 if is_approved(session_key, pattern_key): 826 return {"approved": True, "message": None} 827 828 is_cli = os.getenv("HERMES_INTERACTIVE") 829 is_gateway = os.getenv("HERMES_GATEWAY_SESSION") 830 831 if not is_cli and not is_gateway: 832 # Cron sessions: respect cron_mode config 833 if os.getenv("HERMES_CRON_SESSION"): 834 if _get_cron_approval_mode() == "deny": 835 return { 836 "approved": False, 837 "message": ( 838 f"BLOCKED: Command flagged as dangerous ({description}) " 839 "but cron jobs run without a user present to approve it. " 840 "Find an alternative approach that avoids this command. " 841 "To allow dangerous commands in cron jobs, set " 842 "approvals.cron_mode: approve in config.yaml." 843 ), 844 } 845 return {"approved": True, "message": None} 846 847 if is_gateway or os.getenv("HERMES_EXEC_ASK"): 848 submit_pending(session_key, { 849 "command": command, 850 "pattern_key": pattern_key, 851 "description": description, 852 }) 853 return { 854 "approved": False, 855 "pattern_key": pattern_key, 856 "status": "approval_required", 857 "command": command, 858 "description": description, 859 "message": ( 860 f"⚠️ This command is potentially dangerous ({description}). " 861 f"Asking the user for approval.\n\n**Command:**\n```\n{command}\n```" 862 ), 863 } 864 865 choice = prompt_dangerous_approval(command, description, 866 approval_callback=approval_callback) 867 868 if choice == "deny": 869 return { 870 "approved": False, 871 "message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.", 872 "pattern_key": pattern_key, 873 "description": description, 874 } 875 876 if choice == "session": 877 approve_session(session_key, pattern_key) 878 elif choice == "always": 879 approve_session(session_key, pattern_key) 880 approve_permanent(pattern_key) 881 save_permanent_allowlist(_permanent_approved) 882 883 return {"approved": True, "message": None} 884 885 886 # ========================================================================= 887 # Combined pre-exec guard (tirith + dangerous command detection) 888 # ========================================================================= 889 890 def _format_tirith_description(tirith_result: dict) -> str: 891 """Build a human-readable description from tirith findings. 892 893 Includes severity, title, and description for each finding so users 894 can make an informed approval decision. 895 """ 896 findings = tirith_result.get("findings") or [] 897 if not findings: 898 summary = tirith_result.get("summary") or "security issue detected" 899 return f"Security scan: {summary}" 900 901 parts = [] 902 for f in findings: 903 severity = f.get("severity", "") 904 title = f.get("title", "") 905 desc = f.get("description", "") 906 if title and desc: 907 parts.append(f"[{severity}] {title}: {desc}" if severity else f"{title}: {desc}") 908 elif title: 909 parts.append(f"[{severity}] {title}" if severity else title) 910 if not parts: 911 summary = tirith_result.get("summary") or "security issue detected" 912 return f"Security scan: {summary}" 913 914 return "Security scan — " + "; ".join(parts) 915 916 917 def check_all_command_guards(command: str, env_type: str, 918 approval_callback=None) -> dict: 919 """Run all pre-exec security checks and return a single approval decision. 920 921 Gathers findings from tirith and dangerous-command detection, then 922 presents them as a single combined approval request. This prevents 923 a gateway force=True replay from bypassing one check when only the 924 other was shown to the user. 925 """ 926 # Skip containers for both checks 927 if env_type in ("docker", "singularity", "modal", "daytona", "vercel_sandbox"): 928 return {"approved": True, "message": None} 929 930 # Hardline floor: unconditional block for catastrophic commands 931 # (rm -rf /, mkfs, dd to raw device, shutdown/reboot, fork bomb, 932 # kill -1). Applies BEFORE yolo / mode=off / cron approve-mode so 933 # no session-level setting can bypass it. 934 is_hardline, hardline_desc = detect_hardline_command(command) 935 if is_hardline: 936 logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200]) 937 return _hardline_block_result(hardline_desc) 938 939 # --yolo or approvals.mode=off: bypass all approval prompts. 940 # Gateway /yolo is session-scoped; CLI --yolo remains process-scoped. 941 approval_mode = _get_approval_mode() 942 if is_truthy_value(os.getenv("HERMES_YOLO_MODE")) or is_current_session_yolo_enabled() or approval_mode == "off": 943 return {"approved": True, "message": None} 944 945 is_cli = os.getenv("HERMES_INTERACTIVE") 946 is_gateway = os.getenv("HERMES_GATEWAY_SESSION") 947 is_ask = os.getenv("HERMES_EXEC_ASK") 948 949 # Preserve the existing non-interactive behavior: outside CLI/gateway/ask 950 # flows, we do not block on approvals and we skip external guard work. 951 if not is_cli and not is_gateway and not is_ask: 952 # Cron sessions: respect cron_mode config 953 if os.getenv("HERMES_CRON_SESSION"): 954 if _get_cron_approval_mode() == "deny": 955 # Run detection to get a description for the block message 956 is_dangerous, _pk, description = detect_dangerous_command(command) 957 if is_dangerous: 958 return { 959 "approved": False, 960 "message": ( 961 f"BLOCKED: Command flagged as dangerous ({description}) " 962 "but cron jobs run without a user present to approve it. " 963 "Find an alternative approach that avoids this command. " 964 "To allow dangerous commands in cron jobs, set " 965 "approvals.cron_mode: approve in config.yaml." 966 ), 967 } 968 return {"approved": True, "message": None} 969 970 # --- Phase 1: Gather findings from both checks --- 971 972 # Tirith check — wrapper guarantees no raise for expected failures. 973 # Only catch ImportError (module not installed). 974 tirith_result = {"action": "allow", "findings": [], "summary": ""} 975 try: 976 from tools.tirith_security import check_command_security 977 tirith_result = check_command_security(command) 978 except ImportError: 979 pass # tirith module not installed — allow 980 981 # Dangerous command check (detection only, no approval) 982 is_dangerous, pattern_key, description = detect_dangerous_command(command) 983 984 # --- Phase 2: Decide --- 985 986 # Collect warnings that need approval 987 warnings = [] # list of (pattern_key, description, is_tirith) 988 989 session_key = get_current_session_key() 990 991 # Tirith block/warn → approvable warning with rich findings. 992 # Previously, tirith "block" was a hard block with no approval prompt. 993 # Now both block and warn go through the approval flow so users can 994 # inspect the explanation and approve if they understand the risk. 995 if tirith_result["action"] in ("block", "warn"): 996 findings = tirith_result.get("findings") or [] 997 rule_id = findings[0].get("rule_id", "unknown") if findings else "unknown" 998 tirith_key = f"tirith:{rule_id}" 999 tirith_desc = _format_tirith_description(tirith_result) 1000 if not is_approved(session_key, tirith_key): 1001 warnings.append((tirith_key, tirith_desc, True)) 1002 1003 if is_dangerous: 1004 if not is_approved(session_key, pattern_key): 1005 warnings.append((pattern_key, description, False)) 1006 1007 # Nothing to warn about 1008 if not warnings: 1009 return {"approved": True, "message": None} 1010 1011 # --- Phase 2.5: Smart approval (auxiliary LLM risk assessment) --- 1012 # When approvals.mode=smart, ask the aux LLM before prompting the user. 1013 # Inspired by OpenAI Codex's Smart Approvals guardian subagent 1014 # (openai/codex#13860). 1015 if approval_mode == "smart": 1016 combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings) 1017 verdict = _smart_approve(command, combined_desc_for_llm) 1018 if verdict == "approve": 1019 # Auto-approve and grant session-level approval for these patterns 1020 for key, _, _ in warnings: 1021 approve_session(session_key, key) 1022 logger.debug("Smart approval: auto-approved '%s' (%s)", 1023 command[:60], combined_desc_for_llm) 1024 return {"approved": True, "message": None, 1025 "smart_approved": True, 1026 "description": combined_desc_for_llm} 1027 elif verdict == "deny": 1028 combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings) 1029 return { 1030 "approved": False, 1031 "message": f"BLOCKED by smart approval: {combined_desc_for_llm}. " 1032 "The command was assessed as genuinely dangerous. Do NOT retry.", 1033 "smart_denied": True, 1034 } 1035 # verdict == "escalate" → fall through to manual prompt 1036 1037 # --- Phase 3: Approval --- 1038 1039 # Combine descriptions for a single approval prompt 1040 combined_desc = "; ".join(desc for _, desc, _ in warnings) 1041 primary_key = warnings[0][0] 1042 all_keys = [key for key, _, _ in warnings] 1043 has_tirith = any(is_t for _, _, is_t in warnings) 1044 1045 # Gateway/async approval — block the agent thread until the user 1046 # responds with /approve or /deny, mirroring the CLI's synchronous 1047 # input() flow. The agent never sees "approval_required"; it either 1048 # gets the command output (approved) or a definitive "BLOCKED" message. 1049 if is_gateway or is_ask: 1050 notify_cb = None 1051 with _lock: 1052 notify_cb = _gateway_notify_cbs.get(session_key) 1053 1054 if notify_cb is not None: 1055 # --- Blocking gateway approval (queue-based) --- 1056 # Each call gets its own _ApprovalEntry so parallel subagents 1057 # and execute_code threads can block concurrently. 1058 approval_data = { 1059 "command": command, 1060 "pattern_key": primary_key, 1061 "pattern_keys": all_keys, 1062 "description": combined_desc, 1063 } 1064 entry = _ApprovalEntry(approval_data) 1065 with _lock: 1066 _gateway_queues.setdefault(session_key, []).append(entry) 1067 1068 # Notify plugins that an approval is being requested. Fires before 1069 # the gateway notify callback so observers (e.g. macOS notifier 1070 # plugins, audit logs, Slack alerts) get the event in real time. 1071 _fire_approval_hook( 1072 "pre_approval_request", 1073 command=command, 1074 description=combined_desc, 1075 pattern_key=primary_key, 1076 pattern_keys=list(all_keys), 1077 session_key=session_key, 1078 surface="gateway", 1079 ) 1080 1081 # Notify the user (bridges sync agent thread → async gateway) 1082 try: 1083 notify_cb(approval_data) 1084 except Exception as exc: 1085 logger.warning("Gateway approval notify failed: %s", exc) 1086 with _lock: 1087 queue = _gateway_queues.get(session_key, []) 1088 if entry in queue: 1089 queue.remove(entry) 1090 if not queue: 1091 _gateway_queues.pop(session_key, None) 1092 return { 1093 "approved": False, 1094 "message": "BLOCKED: Failed to send approval request to user. Do NOT retry.", 1095 "pattern_key": primary_key, 1096 "description": combined_desc, 1097 } 1098 1099 # Block until the user responds or timeout (default 5 min). 1100 # Poll in short slices so we can fire activity heartbeats every 1101 # ~10s to the agent's inactivity tracker. Without this, the 1102 # blocking event.wait() never touches activity, and the 1103 # gateway's inactivity watchdog (agent.gateway_timeout, default 1104 # 1800s) kills the agent while the user is still responding to 1105 # the approval prompt. Mirrors the _wait_for_process() cadence 1106 # in tools/environments/base.py. 1107 timeout = _get_approval_config().get("gateway_timeout", 300) 1108 try: 1109 timeout = int(timeout) 1110 except (ValueError, TypeError): 1111 timeout = 300 1112 1113 try: 1114 from tools.environments.base import touch_activity_if_due 1115 except Exception: # pragma: no cover 1116 touch_activity_if_due = None 1117 1118 _now = time.monotonic() 1119 _deadline = _now + max(timeout, 0) 1120 _activity_state = {"last_touch": _now, "start": _now} 1121 resolved = False 1122 while True: 1123 _remaining = _deadline - time.monotonic() 1124 if _remaining <= 0: 1125 break 1126 # 1s poll slice — the event is set immediately when the 1127 # user responds, so slice length only controls heartbeat 1128 # cadence, not user-visible responsiveness. 1129 if entry.event.wait(timeout=min(1.0, _remaining)): 1130 resolved = True 1131 break 1132 if touch_activity_if_due is not None: 1133 touch_activity_if_due( 1134 _activity_state, "waiting for user approval" 1135 ) 1136 1137 # Clean up this entry from the queue 1138 with _lock: 1139 queue = _gateway_queues.get(session_key, []) 1140 if entry in queue: 1141 queue.remove(entry) 1142 if not queue: 1143 _gateway_queues.pop(session_key, None) 1144 1145 choice = entry.result 1146 # Normalize outcome for the post hook. Unresolved (timeout) and 1147 # None both mean the user never responded; report that explicitly 1148 # so plugins can distinguish timeout from explicit deny. 1149 _outcome = ( 1150 "timeout" if not resolved 1151 else (choice if choice else "timeout") 1152 ) 1153 _fire_approval_hook( 1154 "post_approval_response", 1155 command=command, 1156 description=combined_desc, 1157 pattern_key=primary_key, 1158 pattern_keys=list(all_keys), 1159 session_key=session_key, 1160 surface="gateway", 1161 choice=_outcome, 1162 ) 1163 1164 if not resolved or choice is None or choice == "deny": 1165 reason = "timed out" if not resolved else "denied by user" 1166 return { 1167 "approved": False, 1168 "message": f"BLOCKED: Command {reason}. Do NOT retry this command.", 1169 "pattern_key": primary_key, 1170 "description": combined_desc, 1171 } 1172 1173 # User approved — persist based on scope (same logic as CLI) 1174 for key, _, is_tirith in warnings: 1175 if choice == "session" or (choice == "always" and is_tirith): 1176 approve_session(session_key, key) 1177 elif choice == "always": 1178 approve_session(session_key, key) 1179 approve_permanent(key) 1180 save_permanent_allowlist(_permanent_approved) 1181 # choice == "once": no persistence — command allowed this 1182 # single time only, matching the CLI's behavior. 1183 1184 return {"approved": True, "message": None, 1185 "user_approved": True, "description": combined_desc} 1186 1187 # Fallback: no gateway callback registered (e.g. cron, batch). 1188 # Return approval_required for backward compat. 1189 submit_pending(session_key, { 1190 "command": command, 1191 "pattern_key": primary_key, 1192 "pattern_keys": all_keys, 1193 "description": combined_desc, 1194 }) 1195 return { 1196 "approved": False, 1197 "pattern_key": primary_key, 1198 "status": "approval_required", 1199 "command": command, 1200 "description": combined_desc, 1201 "message": ( 1202 f"⚠️ {combined_desc}. Asking the user for approval.\n\n**Command:**\n```\n{command}\n```" 1203 ), 1204 } 1205 1206 # CLI interactive: single combined prompt 1207 # Hide [a]lways when any tirith warning is present 1208 _fire_approval_hook( 1209 "pre_approval_request", 1210 command=command, 1211 description=combined_desc, 1212 pattern_key=primary_key, 1213 pattern_keys=list(all_keys), 1214 session_key=session_key, 1215 surface="cli", 1216 ) 1217 choice = prompt_dangerous_approval(command, combined_desc, 1218 allow_permanent=not has_tirith, 1219 approval_callback=approval_callback) 1220 _fire_approval_hook( 1221 "post_approval_response", 1222 command=command, 1223 description=combined_desc, 1224 pattern_key=primary_key, 1225 pattern_keys=list(all_keys), 1226 session_key=session_key, 1227 surface="cli", 1228 choice=choice, 1229 ) 1230 1231 if choice == "deny": 1232 return { 1233 "approved": False, 1234 "message": "BLOCKED: User denied. Do NOT retry.", 1235 "pattern_key": primary_key, 1236 "description": combined_desc, 1237 } 1238 1239 # Persist approval for each warning individually 1240 for key, _, is_tirith in warnings: 1241 if choice == "session" or (choice == "always" and is_tirith): 1242 # tirith: session only (no permanent broad allowlisting) 1243 approve_session(session_key, key) 1244 elif choice == "always": 1245 # dangerous patterns: permanent allowed 1246 approve_session(session_key, key) 1247 approve_permanent(key) 1248 save_permanent_allowlist(_permanent_approved) 1249 1250 return {"approved": True, "message": None, 1251 "user_approved": True, "description": combined_desc} 1252 1253 1254 # Load permanent allowlist from config on module import 1255 load_permanent_allowlist()