/ src / core / command_security.py
command_security.py
  1  """
  2  Command Security Filter for Ag3ntumBash.
  3  
  4  Loads security rules from YAML configuration and validates commands
  5  before execution. Provides defense-in-depth beyond bwrap sandboxing.
  6  
  7  Rules are defined in config/security/command-filtering.yaml with:
  8  - pattern: Python regex to match against command
  9  - action: "block" (deny) or "record" (log but allow)
 10  - exploit: Example command for testing
 11  
 12  Security Philosophy:
 13  1. Deny by default for dangerous categories
 14  2. Fail-closed on any error
 15  3. Log all matches for audit trail
 16  4. Allow trusted skill scripts from designated directories
 17  5. Command-name patterns only match against command tokens, not quoted arguments
 18  """
 19  import logging
 20  import re
 21  import shlex
 22  from dataclasses import dataclass, field
 23  from pathlib import Path
 24  from typing import Literal, Optional
 25  
 26  import yaml
 27  
 28  logger = logging.getLogger(__name__)
 29  
 30  # Regex to detect patterns that target command names (start with command-position anchor).
 31  # These patterns should only match against command tokens, not inside quoted arguments.
 32  # Content patterns (paths like /proc/, /etc/shadow) should match anywhere.
 33  _COMMAND_POSITION_PREFIX = re.compile(
 34      r'^\(\?:\^'       # Starts with (?:^  — command-position anchor
 35  )
 36  
 37  # Regex to strip contents of quoted strings for command-name matching.
 38  # Preserves quote delimiters but removes content to prevent false positives
 39  # from natural language words that happen to match command names.
 40  # Example: python3 script.py "show the top 3" → python3 script.py ""
 41  _QUOTED_CONTENT_RE = re.compile(
 42      r"""'(?:[^'\\]|\\.)*'"""   # single-quoted strings
 43      r"""|"(?:[^"\\]|\\.)*\""""  # double-quoted strings
 44      , re.DOTALL
 45  )
 46  
 47  # Regex to split compound commands on shell operators.
 48  # Used by _is_trusted_skill_command to check each subcommand independently.
 49  _COMPOUND_SPLIT_RE = re.compile(r'\s*(?:&&|\|\||[;|])\s*')
 50  
 51  # Default path to security rules
 52  DEFAULT_RULES_PATH = Path(__file__).parent.parent.parent / "config" / "security" / "command-filtering.yaml"
 53  
 54  # Trusted skill script paths - commands executing scripts from these paths bypass security filters
 55  # These paths are verified within the sandboxed environment (bwrap) and are read-only mounted
 56  # See permissions.yaml for mount configuration (global_skills, user_skills, user_venv)
 57  # SECURITY: user_skills is a per-user mount to prevent cross-user access
 58  TRUSTED_SKILL_PATHS = (
 59      "/skills/",       # Global skills directory (read-only, contains .claude/skills/<skill_name>/)
 60      "/user-skills/",  # User skills directory (read-only, per-user mount for isolation)
 61      "/venv/",         # User's Python venv (read-only mount at /venv)
 62  )
 63  
 64  # Interpreters that can execute skill scripts
 65  TRUSTED_INTERPRETERS = ("python", "python3", "bash", "sh")
 66  
 67  
 68  def _strip_quoted_content(command: str) -> str:
 69      """
 70      Strip contents of quoted strings from a command, preserving delimiters.
 71  
 72      This prevents false positives where natural language words inside quoted
 73      arguments match command-name patterns (e.g., "top" in "show the top 3").
 74  
 75      SECURITY NOTE: Content patterns (paths like /proc/, /etc/shadow) are applied
 76      against the FULL command and are NOT affected by this stripping.
 77  
 78      Returns:
 79          Command with quoted string contents replaced by empty quotes.
 80      """
 81      return _QUOTED_CONTENT_RE.sub('""', command)
 82  
 83  
 84  def _check_subcommand_is_trusted_skill(subcmd: str) -> bool:
 85      """
 86      Check if a single subcommand (no &&, ||, ;, |) runs a trusted skill script.
 87  
 88      Args:
 89          subcmd: A single shell command (no compound operators).
 90  
 91      Returns:
 92          True if the command executes a script from a trusted skill path.
 93      """
 94      try:
 95          parts = shlex.split(subcmd.strip())
 96          if not parts:
 97              return False
 98  
 99          base_cmd = Path(parts[0]).name
100  
101          if base_cmd not in TRUSTED_INTERPRETERS:
102              return False
103  
104          for arg in parts[1:]:
105              if arg.startswith("-"):
106                  continue
107  
108              resolved_arg = str(Path(arg).resolve())
109  
110              for skill_path in TRUSTED_SKILL_PATHS:
111                  if resolved_arg.startswith(skill_path):
112                      if resolved_arg.endswith((".py", ".sh", ".bash")):
113                          logger.info(
114                              f"CommandSecurityFilter: TRUSTED SKILL - "
115                              f"Allowing execution of skill script: {resolved_arg}"
116                          )
117                          return True
118  
119              # First non-flag argument found but not a skill - stop looking
120              break
121  
122          return False
123  
124      except ValueError:
125          return False
126      except Exception:
127          return False
128  
129  
130  def _is_trusted_skill_command(command: str) -> bool:
131      """
132      Check if a command is executing a trusted skill script.
133  
134      Skill scripts are Python/bash files located in designated skill directories.
135      These scripts are mounted read-only in the sandbox and are trusted to execute
136      even if their arguments might match security filter patterns.
137  
138      Handles compound commands: ``cd . && python3 /skills/my_skill/run.py "arg"``
139      is correctly identified as a trusted skill command by checking each subcommand.
140  
141      Args:
142          command: The full command string to check.
143  
144      Returns:
145          True if the command executes a script from a trusted skill path.
146      """
147      try:
148          # Split compound commands on &&, ||, ;, |
149          subcommands = _COMPOUND_SPLIT_RE.split(command)
150  
151          for subcmd in subcommands:
152              subcmd = subcmd.strip()
153              if not subcmd:
154                  continue
155              if _check_subcommand_is_trusted_skill(subcmd):
156                  return True
157  
158          return False
159  
160      except Exception as e:
161          logger.warning(f"CommandSecurityFilter: Error in skill check: {e}")
162          return False
163  
164  
165  @dataclass
166  class SecurityRule:
167      """Single security rule for command filtering."""
168      pattern: str
169      action: Literal["block", "record"]
170      exploit: str
171      category: str
172      compiled_pattern: re.Pattern = field(init=False, repr=False)
173      is_command_pattern: bool = field(init=False, repr=False)
174  
175      def __post_init__(self) -> None:
176          """Compile the regex pattern and classify as command-name vs content pattern."""
177          try:
178              self.compiled_pattern = re.compile(self.pattern, re.IGNORECASE)
179          except re.error as e:
180              logger.error(f"Invalid regex in rule: {self.pattern} - {e}")
181              # Create a pattern that never matches as fallback
182              self.compiled_pattern = re.compile(r"^\b$")
183  
184          # Auto-classify: patterns starting with (?:^ are "command-position" patterns
185          # that target command names. These are matched against a version of the command
186          # with quoted string contents stripped to prevent false positives.
187          # Content patterns (e.g., /proc/1/, /etc/shadow) match the full raw command.
188          self.is_command_pattern = bool(_COMMAND_POSITION_PREFIX.match(self.pattern))
189  
190  
191  @dataclass
192  class SecurityCheckResult:
193      """Result of security check on a command."""
194      allowed: bool
195      matched_rule: Optional[SecurityRule] = None
196      message: str = ""
197      
198      @property
199      def should_block(self) -> bool:
200          """Return True if command should be blocked."""
201          return not self.allowed
202      
203      @property
204      def action(self) -> str:
205          """Return the action that was/will be taken."""
206          if self.matched_rule:
207              return self.matched_rule.action
208          return "allow"
209  
210  
211  class CommandSecurityFilter:
212      """
213      Command security filter that validates commands against security rules.
214      
215      Loads rules from YAML configuration and provides methods to check
216      commands before execution.
217      
218      Usage:
219          filter = CommandSecurityFilter()
220          result = filter.check_command("kill -9 147")
221          if result.should_block:
222              logger.warning(f"Blocked: {result.message}")
223              return error_response(result.message)
224      """
225      
226      def __init__(
227          self,
228          rules_path: Optional[Path] = None,
229          fail_closed: bool = True,
230      ) -> None:
231          """
232          Initialize the command security filter.
233          
234          Args:
235              rules_path: Path to YAML rules file. Defaults to config/security/command-filtering.yaml
236              fail_closed: If True, block commands when rules fail to load (security-first).
237                          If False, allow commands when rules fail to load (availability-first).
238          """
239          self._rules_path = rules_path or DEFAULT_RULES_PATH
240          self._fail_closed = fail_closed
241          self._rules: list[SecurityRule] = []
242          self._rules_loaded = False
243          self._load_error: Optional[str] = None
244          
245          self._load_rules()
246      
247      def _load_rules(self) -> None:
248          """Load security rules from YAML configuration."""
249          try:
250              if not self._rules_path.exists():
251                  self._load_error = f"Rules file not found: {self._rules_path}"
252                  logger.error(f"CommandSecurityFilter: {self._load_error}")
253                  return
254              
255              with self._rules_path.open("r", encoding="utf-8") as f:
256                  config = yaml.safe_load(f)
257              
258              if not config:
259                  self._load_error = "Empty rules configuration"
260                  logger.error(f"CommandSecurityFilter: {self._load_error}")
261                  return
262              
263              # Parse rules from all categories
264              rules: list[SecurityRule] = []
265              for category, category_rules in config.items():
266                  # Skip metadata keys
267                  if category in ("version",):
268                      continue
269                  
270                  if not isinstance(category_rules, list):
271                      continue
272                  
273                  for rule_data in category_rules:
274                      if not isinstance(rule_data, dict):
275                          continue
276                      
277                      pattern = rule_data.get("pattern")
278                      action = rule_data.get("action", "block")
279                      exploit = rule_data.get("exploit", "")
280                      
281                      if not pattern:
282                          continue
283                      
284                      if action not in ("block", "record"):
285                          action = "block"  # Default to secure
286                      
287                      rule = SecurityRule(
288                          pattern=pattern,
289                          action=action,
290                          exploit=exploit,
291                          category=category,
292                      )
293                      rules.append(rule)
294              
295              self._rules = rules
296              self._rules_loaded = True
297              logger.info(
298                  f"CommandSecurityFilter: Loaded {len(rules)} rules "
299                  f"from {self._rules_path}"
300              )
301              
302          except yaml.YAMLError as e:
303              self._load_error = f"YAML parse error: {e}"
304              logger.error(f"CommandSecurityFilter: {self._load_error}")
305          except Exception as e:
306              self._load_error = f"Failed to load rules: {e}"
307              logger.exception(f"CommandSecurityFilter: {self._load_error}")
308      
309      def reload_rules(self) -> bool:
310          """
311          Reload rules from configuration file.
312          
313          Returns:
314              True if rules loaded successfully, False otherwise.
315          """
316          self._rules = []
317          self._rules_loaded = False
318          self._load_error = None
319          self._load_rules()
320          return self._rules_loaded
321      
322      @property
323      def rules_loaded(self) -> bool:
324          """Return True if rules were loaded successfully."""
325          return self._rules_loaded
326      
327      @property
328      def rule_count(self) -> int:
329          """Return number of loaded rules."""
330          return len(self._rules)
331      
332      def check_command(self, command: str) -> SecurityCheckResult:
333          """
334          Check a command against security rules.
335  
336          Args:
337              command: The command string to check.
338  
339          Returns:
340              SecurityCheckResult with allowed status and matched rule if any.
341          """
342          # Handle load failures
343          if not self._rules_loaded:
344              if self._fail_closed:
345                  return SecurityCheckResult(
346                      allowed=False,
347                      message=f"Security rules not loaded: {self._load_error}. "
348                              "Commands blocked for security (fail-closed mode)."
349                  )
350              else:
351                  logger.warning(
352                      f"CommandSecurityFilter: Rules not loaded, allowing command "
353                      f"(fail-open mode): {command[:50]}..."
354                  )
355                  return SecurityCheckResult(
356                      allowed=True,
357                      message="Rules not loaded, allowing (fail-open mode)"
358                  )
359  
360          # SECURITY EXCEPTION: Allow trusted skill scripts
361          # Skill scripts are located in read-only mounted directories and are trusted.
362          # This check runs BEFORE pattern matching to prevent false positives from
363          # skill arguments (e.g., prompts containing words like "at" or "kill").
364          # Now handles compound commands like: cd . && python3 /skills/script.py "arg"
365          if _is_trusted_skill_command(command):
366              return SecurityCheckResult(
367                  allowed=True,
368                  message="Trusted skill script execution allowed",
369              )
370  
371          # Two-tier matching:
372          # 1. "Command patterns" (targeting command names like kill, top, sudo) are
373          #    matched against a STRIPPED version of the command where quoted string
374          #    contents are removed. This prevents false positives from natural language
375          #    words in arguments (e.g., "top" in "show the top 3 folders").
376          # 2. "Content patterns" (targeting paths like /proc/, /etc/shadow, or specific
377          #    strings like release_agent) are matched against the FULL raw command.
378          #    These indicate malicious intent regardless of quoting context.
379          stripped_command = _strip_quoted_content(command)
380  
381          for rule in self._rules:
382              try:
383                  # Choose which version of the command to match against
384                  match_target = stripped_command if rule.is_command_pattern else command
385  
386                  if rule.compiled_pattern.search(match_target):
387                      # Found a match
388                      if rule.action == "block":
389                          message = (
390                              f"Command blocked by security rule [{rule.category}]: "
391                              f"pattern='{rule.pattern[:50]}...'"
392                          )
393                          logger.warning(
394                              f"CommandSecurityFilter: BLOCKED - "
395                              f"category={rule.category}, command={command[:100]}..."
396                          )
397                          return SecurityCheckResult(
398                              allowed=False,
399                              matched_rule=rule,
400                              message=message,
401                          )
402                      else:  # record
403                          logger.info(
404                              f"CommandSecurityFilter: RECORDED - "
405                              f"category={rule.category}, command={command[:100]}..."
406                          )
407                          return SecurityCheckResult(
408                              allowed=True,
409                              matched_rule=rule,
410                              message=f"Command recorded for audit [{rule.category}]",
411                          )
412              except Exception as e:
413                  logger.error(
414                      f"CommandSecurityFilter: Error checking rule {rule.pattern}: {e}"
415                  )
416                  if self._fail_closed:
417                      return SecurityCheckResult(
418                          allowed=False,
419                          message=f"Security check error: {e}. Blocking for safety."
420                      )
421          
422          # No rules matched - command is allowed
423          return SecurityCheckResult(
424              allowed=True,
425              message="No security rules matched",
426          )
427      
428      def get_rules_by_category(self, category: str) -> list[SecurityRule]:
429          """Get all rules in a specific category."""
430          return [r for r in self._rules if r.category == category]
431      
432      def get_categories(self) -> list[str]:
433          """Get list of all rule categories."""
434          return list(set(r.category for r in self._rules))
435      
436      def get_exploits_for_testing(self) -> list[tuple[str, SecurityRule]]:
437          """
438          Get all exploit examples for security testing.
439          
440          Returns:
441              List of (exploit_command, rule) tuples.
442          """
443          return [(r.exploit, r) for r in self._rules if r.exploit]
444      
445      def get_block_rules(self) -> list[SecurityRule]:
446          """Get all rules that block commands."""
447          return [r for r in self._rules if r.action == "block"]
448      
449      def get_record_rules(self) -> list[SecurityRule]:
450          """Get all rules that only record commands."""
451          return [r for r in self._rules if r.action == "record"]
452  
453  
454  # Module-level singleton for easy access
455  _default_filter: Optional[CommandSecurityFilter] = None
456  
457  
458  def get_command_security_filter() -> CommandSecurityFilter:
459      """
460      Get the default command security filter singleton.
461      
462      Returns:
463          CommandSecurityFilter instance.
464      """
465      global _default_filter
466      if _default_filter is None:
467          _default_filter = CommandSecurityFilter()
468      return _default_filter
469  
470  
471  def check_command_security(command: str) -> SecurityCheckResult:
472      """
473      Convenience function to check a command using the default filter.
474      
475      Args:
476          command: Command string to check.
477          
478      Returns:
479          SecurityCheckResult with allowed status.
480      """
481      return get_command_security_filter().check_command(command)