command_security.py
1 """ 2 Command Security Filter for Ag3ntumBash. 3 4 Loads security rules from YAML configuration and validates commands 5 before execution. Provides defense-in-depth beyond bwrap sandboxing. 6 7 Rules are defined in config/security/command-filtering.yaml with: 8 - pattern: Python regex to match against command 9 - action: "block" (deny) or "record" (log but allow) 10 - exploit: Example command for testing 11 12 Security Philosophy: 13 1. Deny by default for dangerous categories 14 2. Fail-closed on any error 15 3. Log all matches for audit trail 16 4. Allow trusted skill scripts from designated directories 17 5. Command-name patterns only match against command tokens, not quoted arguments 18 """ 19 import logging 20 import re 21 import shlex 22 from dataclasses import dataclass, field 23 from pathlib import Path 24 from typing import Literal, Optional 25 26 import yaml 27 28 logger = logging.getLogger(__name__) 29 30 # Regex to detect patterns that target command names (start with command-position anchor). 31 # These patterns should only match against command tokens, not inside quoted arguments. 32 # Content patterns (paths like /proc/, /etc/shadow) should match anywhere. 33 _COMMAND_POSITION_PREFIX = re.compile( 34 r'^\(\?:\^' # Starts with (?:^ — command-position anchor 35 ) 36 37 # Regex to strip contents of quoted strings for command-name matching. 38 # Preserves quote delimiters but removes content to prevent false positives 39 # from natural language words that happen to match command names. 40 # Example: python3 script.py "show the top 3" → python3 script.py "" 41 _QUOTED_CONTENT_RE = re.compile( 42 r"""'(?:[^'\\]|\\.)*'""" # single-quoted strings 43 r"""|"(?:[^"\\]|\\.)*\"""" # double-quoted strings 44 , re.DOTALL 45 ) 46 47 # Regex to split compound commands on shell operators. 48 # Used by _is_trusted_skill_command to check each subcommand independently. 49 _COMPOUND_SPLIT_RE = re.compile(r'\s*(?:&&|\|\||[;|])\s*') 50 51 # Default path to security rules 52 DEFAULT_RULES_PATH = Path(__file__).parent.parent.parent / "config" / "security" / "command-filtering.yaml" 53 54 # Trusted skill script paths - commands executing scripts from these paths bypass security filters 55 # These paths are verified within the sandboxed environment (bwrap) and are read-only mounted 56 # See permissions.yaml for mount configuration (global_skills, user_skills, user_venv) 57 # SECURITY: user_skills is a per-user mount to prevent cross-user access 58 TRUSTED_SKILL_PATHS = ( 59 "/skills/", # Global skills directory (read-only, contains .claude/skills/<skill_name>/) 60 "/user-skills/", # User skills directory (read-only, per-user mount for isolation) 61 "/venv/", # User's Python venv (read-only mount at /venv) 62 ) 63 64 # Interpreters that can execute skill scripts 65 TRUSTED_INTERPRETERS = ("python", "python3", "bash", "sh") 66 67 68 def _strip_quoted_content(command: str) -> str: 69 """ 70 Strip contents of quoted strings from a command, preserving delimiters. 71 72 This prevents false positives where natural language words inside quoted 73 arguments match command-name patterns (e.g., "top" in "show the top 3"). 74 75 SECURITY NOTE: Content patterns (paths like /proc/, /etc/shadow) are applied 76 against the FULL command and are NOT affected by this stripping. 77 78 Returns: 79 Command with quoted string contents replaced by empty quotes. 80 """ 81 return _QUOTED_CONTENT_RE.sub('""', command) 82 83 84 def _check_subcommand_is_trusted_skill(subcmd: str) -> bool: 85 """ 86 Check if a single subcommand (no &&, ||, ;, |) runs a trusted skill script. 87 88 Args: 89 subcmd: A single shell command (no compound operators). 90 91 Returns: 92 True if the command executes a script from a trusted skill path. 93 """ 94 try: 95 parts = shlex.split(subcmd.strip()) 96 if not parts: 97 return False 98 99 base_cmd = Path(parts[0]).name 100 101 if base_cmd not in TRUSTED_INTERPRETERS: 102 return False 103 104 for arg in parts[1:]: 105 if arg.startswith("-"): 106 continue 107 108 resolved_arg = str(Path(arg).resolve()) 109 110 for skill_path in TRUSTED_SKILL_PATHS: 111 if resolved_arg.startswith(skill_path): 112 if resolved_arg.endswith((".py", ".sh", ".bash")): 113 logger.info( 114 f"CommandSecurityFilter: TRUSTED SKILL - " 115 f"Allowing execution of skill script: {resolved_arg}" 116 ) 117 return True 118 119 # First non-flag argument found but not a skill - stop looking 120 break 121 122 return False 123 124 except ValueError: 125 return False 126 except Exception: 127 return False 128 129 130 def _is_trusted_skill_command(command: str) -> bool: 131 """ 132 Check if a command is executing a trusted skill script. 133 134 Skill scripts are Python/bash files located in designated skill directories. 135 These scripts are mounted read-only in the sandbox and are trusted to execute 136 even if their arguments might match security filter patterns. 137 138 Handles compound commands: ``cd . && python3 /skills/my_skill/run.py "arg"`` 139 is correctly identified as a trusted skill command by checking each subcommand. 140 141 Args: 142 command: The full command string to check. 143 144 Returns: 145 True if the command executes a script from a trusted skill path. 146 """ 147 try: 148 # Split compound commands on &&, ||, ;, | 149 subcommands = _COMPOUND_SPLIT_RE.split(command) 150 151 for subcmd in subcommands: 152 subcmd = subcmd.strip() 153 if not subcmd: 154 continue 155 if _check_subcommand_is_trusted_skill(subcmd): 156 return True 157 158 return False 159 160 except Exception as e: 161 logger.warning(f"CommandSecurityFilter: Error in skill check: {e}") 162 return False 163 164 165 @dataclass 166 class SecurityRule: 167 """Single security rule for command filtering.""" 168 pattern: str 169 action: Literal["block", "record"] 170 exploit: str 171 category: str 172 compiled_pattern: re.Pattern = field(init=False, repr=False) 173 is_command_pattern: bool = field(init=False, repr=False) 174 175 def __post_init__(self) -> None: 176 """Compile the regex pattern and classify as command-name vs content pattern.""" 177 try: 178 self.compiled_pattern = re.compile(self.pattern, re.IGNORECASE) 179 except re.error as e: 180 logger.error(f"Invalid regex in rule: {self.pattern} - {e}") 181 # Create a pattern that never matches as fallback 182 self.compiled_pattern = re.compile(r"^\b$") 183 184 # Auto-classify: patterns starting with (?:^ are "command-position" patterns 185 # that target command names. These are matched against a version of the command 186 # with quoted string contents stripped to prevent false positives. 187 # Content patterns (e.g., /proc/1/, /etc/shadow) match the full raw command. 188 self.is_command_pattern = bool(_COMMAND_POSITION_PREFIX.match(self.pattern)) 189 190 191 @dataclass 192 class SecurityCheckResult: 193 """Result of security check on a command.""" 194 allowed: bool 195 matched_rule: Optional[SecurityRule] = None 196 message: str = "" 197 198 @property 199 def should_block(self) -> bool: 200 """Return True if command should be blocked.""" 201 return not self.allowed 202 203 @property 204 def action(self) -> str: 205 """Return the action that was/will be taken.""" 206 if self.matched_rule: 207 return self.matched_rule.action 208 return "allow" 209 210 211 class CommandSecurityFilter: 212 """ 213 Command security filter that validates commands against security rules. 214 215 Loads rules from YAML configuration and provides methods to check 216 commands before execution. 217 218 Usage: 219 filter = CommandSecurityFilter() 220 result = filter.check_command("kill -9 147") 221 if result.should_block: 222 logger.warning(f"Blocked: {result.message}") 223 return error_response(result.message) 224 """ 225 226 def __init__( 227 self, 228 rules_path: Optional[Path] = None, 229 fail_closed: bool = True, 230 ) -> None: 231 """ 232 Initialize the command security filter. 233 234 Args: 235 rules_path: Path to YAML rules file. Defaults to config/security/command-filtering.yaml 236 fail_closed: If True, block commands when rules fail to load (security-first). 237 If False, allow commands when rules fail to load (availability-first). 238 """ 239 self._rules_path = rules_path or DEFAULT_RULES_PATH 240 self._fail_closed = fail_closed 241 self._rules: list[SecurityRule] = [] 242 self._rules_loaded = False 243 self._load_error: Optional[str] = None 244 245 self._load_rules() 246 247 def _load_rules(self) -> None: 248 """Load security rules from YAML configuration.""" 249 try: 250 if not self._rules_path.exists(): 251 self._load_error = f"Rules file not found: {self._rules_path}" 252 logger.error(f"CommandSecurityFilter: {self._load_error}") 253 return 254 255 with self._rules_path.open("r", encoding="utf-8") as f: 256 config = yaml.safe_load(f) 257 258 if not config: 259 self._load_error = "Empty rules configuration" 260 logger.error(f"CommandSecurityFilter: {self._load_error}") 261 return 262 263 # Parse rules from all categories 264 rules: list[SecurityRule] = [] 265 for category, category_rules in config.items(): 266 # Skip metadata keys 267 if category in ("version",): 268 continue 269 270 if not isinstance(category_rules, list): 271 continue 272 273 for rule_data in category_rules: 274 if not isinstance(rule_data, dict): 275 continue 276 277 pattern = rule_data.get("pattern") 278 action = rule_data.get("action", "block") 279 exploit = rule_data.get("exploit", "") 280 281 if not pattern: 282 continue 283 284 if action not in ("block", "record"): 285 action = "block" # Default to secure 286 287 rule = SecurityRule( 288 pattern=pattern, 289 action=action, 290 exploit=exploit, 291 category=category, 292 ) 293 rules.append(rule) 294 295 self._rules = rules 296 self._rules_loaded = True 297 logger.info( 298 f"CommandSecurityFilter: Loaded {len(rules)} rules " 299 f"from {self._rules_path}" 300 ) 301 302 except yaml.YAMLError as e: 303 self._load_error = f"YAML parse error: {e}" 304 logger.error(f"CommandSecurityFilter: {self._load_error}") 305 except Exception as e: 306 self._load_error = f"Failed to load rules: {e}" 307 logger.exception(f"CommandSecurityFilter: {self._load_error}") 308 309 def reload_rules(self) -> bool: 310 """ 311 Reload rules from configuration file. 312 313 Returns: 314 True if rules loaded successfully, False otherwise. 315 """ 316 self._rules = [] 317 self._rules_loaded = False 318 self._load_error = None 319 self._load_rules() 320 return self._rules_loaded 321 322 @property 323 def rules_loaded(self) -> bool: 324 """Return True if rules were loaded successfully.""" 325 return self._rules_loaded 326 327 @property 328 def rule_count(self) -> int: 329 """Return number of loaded rules.""" 330 return len(self._rules) 331 332 def check_command(self, command: str) -> SecurityCheckResult: 333 """ 334 Check a command against security rules. 335 336 Args: 337 command: The command string to check. 338 339 Returns: 340 SecurityCheckResult with allowed status and matched rule if any. 341 """ 342 # Handle load failures 343 if not self._rules_loaded: 344 if self._fail_closed: 345 return SecurityCheckResult( 346 allowed=False, 347 message=f"Security rules not loaded: {self._load_error}. " 348 "Commands blocked for security (fail-closed mode)." 349 ) 350 else: 351 logger.warning( 352 f"CommandSecurityFilter: Rules not loaded, allowing command " 353 f"(fail-open mode): {command[:50]}..." 354 ) 355 return SecurityCheckResult( 356 allowed=True, 357 message="Rules not loaded, allowing (fail-open mode)" 358 ) 359 360 # SECURITY EXCEPTION: Allow trusted skill scripts 361 # Skill scripts are located in read-only mounted directories and are trusted. 362 # This check runs BEFORE pattern matching to prevent false positives from 363 # skill arguments (e.g., prompts containing words like "at" or "kill"). 364 # Now handles compound commands like: cd . && python3 /skills/script.py "arg" 365 if _is_trusted_skill_command(command): 366 return SecurityCheckResult( 367 allowed=True, 368 message="Trusted skill script execution allowed", 369 ) 370 371 # Two-tier matching: 372 # 1. "Command patterns" (targeting command names like kill, top, sudo) are 373 # matched against a STRIPPED version of the command where quoted string 374 # contents are removed. This prevents false positives from natural language 375 # words in arguments (e.g., "top" in "show the top 3 folders"). 376 # 2. "Content patterns" (targeting paths like /proc/, /etc/shadow, or specific 377 # strings like release_agent) are matched against the FULL raw command. 378 # These indicate malicious intent regardless of quoting context. 379 stripped_command = _strip_quoted_content(command) 380 381 for rule in self._rules: 382 try: 383 # Choose which version of the command to match against 384 match_target = stripped_command if rule.is_command_pattern else command 385 386 if rule.compiled_pattern.search(match_target): 387 # Found a match 388 if rule.action == "block": 389 message = ( 390 f"Command blocked by security rule [{rule.category}]: " 391 f"pattern='{rule.pattern[:50]}...'" 392 ) 393 logger.warning( 394 f"CommandSecurityFilter: BLOCKED - " 395 f"category={rule.category}, command={command[:100]}..." 396 ) 397 return SecurityCheckResult( 398 allowed=False, 399 matched_rule=rule, 400 message=message, 401 ) 402 else: # record 403 logger.info( 404 f"CommandSecurityFilter: RECORDED - " 405 f"category={rule.category}, command={command[:100]}..." 406 ) 407 return SecurityCheckResult( 408 allowed=True, 409 matched_rule=rule, 410 message=f"Command recorded for audit [{rule.category}]", 411 ) 412 except Exception as e: 413 logger.error( 414 f"CommandSecurityFilter: Error checking rule {rule.pattern}: {e}" 415 ) 416 if self._fail_closed: 417 return SecurityCheckResult( 418 allowed=False, 419 message=f"Security check error: {e}. Blocking for safety." 420 ) 421 422 # No rules matched - command is allowed 423 return SecurityCheckResult( 424 allowed=True, 425 message="No security rules matched", 426 ) 427 428 def get_rules_by_category(self, category: str) -> list[SecurityRule]: 429 """Get all rules in a specific category.""" 430 return [r for r in self._rules if r.category == category] 431 432 def get_categories(self) -> list[str]: 433 """Get list of all rule categories.""" 434 return list(set(r.category for r in self._rules)) 435 436 def get_exploits_for_testing(self) -> list[tuple[str, SecurityRule]]: 437 """ 438 Get all exploit examples for security testing. 439 440 Returns: 441 List of (exploit_command, rule) tuples. 442 """ 443 return [(r.exploit, r) for r in self._rules if r.exploit] 444 445 def get_block_rules(self) -> list[SecurityRule]: 446 """Get all rules that block commands.""" 447 return [r for r in self._rules if r.action == "block"] 448 449 def get_record_rules(self) -> list[SecurityRule]: 450 """Get all rules that only record commands.""" 451 return [r for r in self._rules if r.action == "record"] 452 453 454 # Module-level singleton for easy access 455 _default_filter: Optional[CommandSecurityFilter] = None 456 457 458 def get_command_security_filter() -> CommandSecurityFilter: 459 """ 460 Get the default command security filter singleton. 461 462 Returns: 463 CommandSecurityFilter instance. 464 """ 465 global _default_filter 466 if _default_filter is None: 467 _default_filter = CommandSecurityFilter() 468 return _default_filter 469 470 471 def check_command_security(command: str) -> SecurityCheckResult: 472 """ 473 Convenience function to check a command using the default filter. 474 475 Args: 476 command: Command string to check. 477 478 Returns: 479 SecurityCheckResult with allowed status. 480 """ 481 return get_command_security_filter().check_command(command)