path_validator.py
1 """ 2 Unified path validation for all Ag3ntum tools. 3 4 Single source of truth for path normalization, validation, and logging. 5 All Ag3ntum file tools use this validator before performing operations. 6 7 ARCHITECTURE: 8 ============= 9 10 This module works in conjunction with sandbox_path_resolver.py to provide 11 a complete path handling solution: 12 13 1. SandboxPathResolver (sandbox_path_resolver.py): 14 - Defines canonical path format (sandbox paths) 15 - Provides bidirectional translation (sandbox ↔ Docker) 16 - Context-aware resolution 17 18 2. Ag3ntumPathValidator (this module): 19 - Security validation (blocklist, allowlist, boundaries) 20 - Read-only path enforcement 21 - Access logging 22 23 EXECUTION CONTEXT: 24 ================== 25 26 This validator runs in the main Python process, which sees the REAL Docker 27 filesystem paths (e.g., /users/greg/sessions/xxx/workspace), NOT bwrap mount 28 paths (/workspace). The agent thinks it's working with /workspace, but we 29 must translate to real paths for Python file operations. 30 31 Bwrap paths (/workspace) are only visible inside subprocesses launched via 32 Ag3ntumBash. All other Ag3ntum tools (Ag3ntumRead, Ag3ntumWrite, etc.) run 33 in the main process and need this validator for security. 34 35 PATH TRANSLATION: 36 ================= 37 38 Agent provides: /workspace/file.txt (sandbox path) 39 Validator returns: /users/greg/sessions/xxx/workspace/file.txt (Docker path) 40 41 For external mounts: 42 - /workspace/persistent/* → /users/{user}/ag3ntum/persistent/* 43 - /workspace/external/ro/* → /mounts/ro/* 44 - /workspace/external/rw/* → /mounts/rw/* 45 """ 46 import fnmatch 47 import logging 48 import re 49 import unicodedata 50 from dataclasses import dataclass 51 from pathlib import Path, PurePosixPath 52 from typing import Literal, Optional 53 54 from pydantic import BaseModel, Field 55 56 # Import sandbox path resolver for integrated path handling 57 from src.core.sandbox_path_resolver import ( 58 SandboxPathResolver, 59 SandboxPathContext, 60 configure_sandbox_path_resolver, 61 cleanup_sandbox_path_resolver, 62 get_sandbox_path_resolver, 63 has_sandbox_path_resolver, 64 PathResolutionError, 65 ) 66 67 logger = logging.getLogger(__name__) 68 69 70 # ============================================================================= 71 # Security Constants - Single source of truth for path validation defaults 72 # ============================================================================= 73 74 # Default blocklist patterns for sensitive files (matched against relative paths) 75 DEFAULT_BLOCKLIST: list[str] = [ 76 "*.env", ".env.*", # .env, production.env, .env.local, .env.development, etc. 77 "*.key", ".git/**", "__pycache__/**", "*.pyc", 78 ".secrets/**", "*.pem", "*.p12", "*.pfx", 79 "**/node_modules/**", # Prevent massive directory traversal 80 ] 81 82 # Exemptions from blocklist — safe template/documentation files that should remain accessible 83 DEFAULT_BLOCKLIST_EXEMPTIONS: list[str] = [ 84 ".env.example", 85 ".env.sample", 86 ".env.template", 87 ".env.defaults", 88 ] 89 90 # Default read-only path prefixes (relative to workspace) 91 # These paths can be read but not written/edited/deleted by the agent 92 DEFAULT_READONLY_PREFIXES: list[str] = [ 93 "skills/", # Legacy skills location 94 ".claude/", # SDK configuration and skills (SECURITY: prevents skill tampering) 95 "external/ro/", # Read-only external mounts 96 "external/user-ro/", # Per-user read-only mounts 97 ] 98 99 100 # ============================================================================= 101 # Path Sanitizer - Security hardening for external mount filenames 102 # ============================================================================= 103 104 class PathSanitizer: 105 """ 106 Sanitize filenames from external mounts for security. 107 108 This class provides defense-in-depth against: 109 - Path traversal attacks (../) 110 - Null byte injection 111 - Control character injection 112 - Unicode normalization attacks 113 - Windows reserved device names 114 - Excessively long filenames 115 116 Used primarily for validating filenames in externally mounted folders 117 where we can't control the file naming conventions. 118 """ 119 120 # Dangerous filename patterns to reject 121 DANGEROUS_PATTERNS = [ 122 r"\.\.[\\/]", # Path traversal (../ or ..\) 123 r"^\.\.?$", # Current/parent dir references 124 r"[\x00-\x1f]", # Control characters (ASCII 0-31) 125 r"[<>:\"|?*]", # Windows reserved characters 126 r"^(con|prn|aux|nul|com\d|lpt\d)(\..*)?$", # Windows device names 127 ] 128 129 # Zero-width and invisible unicode characters that could hide content 130 INVISIBLE_CHARS = [ 131 "\u200b", # Zero-width space 132 "\u200c", # Zero-width non-joiner 133 "\u200d", # Zero-width joiner 134 "\ufeff", # Byte order mark 135 "\u00ad", # Soft hyphen 136 "\u2060", # Word joiner 137 "\u2061", # Function application 138 "\u2062", # Invisible times 139 "\u2063", # Invisible separator 140 "\u2064", # Invisible plus 141 ] 142 143 # Max filename length (common filesystem limit) 144 MAX_FILENAME_LENGTH = 255 145 146 @classmethod 147 def sanitize_filename(cls, filename: str, raise_on_error: bool = True) -> str: 148 """ 149 Sanitize a filename, optionally raising error if dangerous. 150 151 Args: 152 filename: The filename to sanitize 153 raise_on_error: If True, raise PathValidationError for dangerous names. 154 If False, return sanitized version. 155 156 Returns: 157 Sanitized filename 158 159 Raises: 160 PathValidationError: If filename is dangerous and raise_on_error=True 161 """ 162 if not filename: 163 if raise_on_error: 164 raise PathValidationError( 165 "Empty filename", 166 path=filename, 167 reason="Filename cannot be empty", 168 ) 169 return "" 170 171 original = filename 172 173 # Normalize unicode to NFC form (canonical composition) 174 # This prevents homograph attacks using visually similar characters 175 try: 176 filename = unicodedata.normalize("NFC", filename) 177 except Exception: 178 pass 179 180 # Remove invisible/zero-width characters 181 for char in cls.INVISIBLE_CHARS: 182 filename = filename.replace(char, "") 183 184 # Check for dangerous patterns 185 for pattern in cls.DANGEROUS_PATTERNS: 186 if re.search(pattern, filename, re.IGNORECASE): 187 if raise_on_error: 188 raise PathValidationError( 189 f"Dangerous filename pattern detected: {original!r}", 190 path=original, 191 reason="DANGEROUS_FILENAME", 192 ) 193 # For non-raising mode, remove the dangerous part 194 filename = re.sub(pattern, "_", filename, flags=re.IGNORECASE) 195 196 # Check length (after normalization) 197 if len(filename.encode("utf-8")) > cls.MAX_FILENAME_LENGTH: 198 if raise_on_error: 199 raise PathValidationError( 200 f"Filename too long ({len(filename)} chars): {filename[:50]}...", 201 path=original, 202 reason="FILENAME_TOO_LONG", 203 ) 204 # Truncate to max length while preserving extension if possible 205 if "." in filename: 206 name, ext = filename.rsplit(".", 1) 207 max_name_len = cls.MAX_FILENAME_LENGTH - len(ext) - 1 208 filename = name[:max_name_len] + "." + ext 209 else: 210 filename = filename[: cls.MAX_FILENAME_LENGTH] 211 212 return filename 213 214 @classmethod 215 def validate_path_components(cls, path: Path) -> None: 216 """ 217 Validate all components of a path. 218 219 Args: 220 path: The path to validate 221 222 Raises: 223 PathValidationError: If any component is dangerous 224 """ 225 for component in path.parts: 226 if component not in ("/", ""): 227 cls.sanitize_filename(component, raise_on_error=True) 228 229 @classmethod 230 def has_null_bytes(cls, path: str) -> bool: 231 """Check if path contains null bytes.""" 232 return "\x00" in path 233 234 @classmethod 235 def has_path_traversal(cls, path: str) -> bool: 236 """Check if path contains traversal attempts.""" 237 # Normalize path separators 238 normalized = path.replace("\\", "/") 239 parts = normalized.split("/") 240 return any(part == ".." for part in parts) 241 242 243 class PathValidatorConfig(BaseModel): 244 """ 245 Configuration for path validation. 246 247 IMPORTANT: This uses REAL Docker filesystem paths, not bwrap mount paths. 248 PathValidator runs in the main Python process, which sees the full Docker 249 filesystem. Bwrap paths (/workspace) are only visible inside subprocesses. 250 """ 251 252 # REAL path to session workspace (e.g., /users/greg/sessions/xxx/workspace) 253 workspace_path: Path = Field( 254 description="Actual filesystem path to session workspace (required)" 255 ) 256 # REAL path to skills directory (legacy, unused - use global/user skills paths) 257 skills_path: Path | None = Field( 258 default=None, description="Deprecated: use global_skills_path/user_skills_path" 259 ) 260 # REAL path to global skills directory (e.g., /skills/.claude/skills) 261 global_skills_path: Path | None = Field( 262 default=None, description="Path to global skills directory (read-only)" 263 ) 264 # REAL path to user skills directory (e.g., /users/username/.claude/skills) 265 user_skills_path: Path | None = Field( 266 default=None, description="Path to user skills directory (read-only)" 267 ) 268 269 # ========================================================================= 270 # EXTERNAL MOUNT PATHS - Host folders mounted via run.sh (flattened structure) 271 # ========================================================================= 272 # These are Docker container paths (not bwrap paths). 273 # With flattened mount structure, all mounts are at /mounts/{name} 274 # Agent sees: /workspace/external/ro/* -> Real path: /mounts/{name} 275 # Agent sees: /workspace/external/rw/* -> Real path: /mounts/{name} 276 # Agent sees: /workspace/persistent/* -> Real path: /users/{username}/ag3ntum/persistent/* 277 278 # Global mounts from external-mounts.yaml global section 279 global_mounts_ro: dict[str, Path] = Field( 280 default_factory=dict, 281 description="Global read-only mounts: {name: container_path}" 282 ) 283 global_mounts_rw: dict[str, Path] = Field( 284 default_factory=dict, 285 description="Global read-write mounts: {name: container_path}" 286 ) 287 persistent_path: Path | None = Field( 288 default=None, 289 description="Path to user's persistent storage (/users/{username}/ag3ntum/persistent)" 290 ) 291 292 # ========================================================================= 293 # PER-USER MOUNT PATHS - User-specific external mounts 294 # ========================================================================= 295 # These are configured via external-mounts.yaml per_user section. 296 # With flattened structure, mounts appear at /mounts/{name} 297 # Agent sees: /workspace/external/user-ro/{name}/* -> Real path: /mounts/{name}/* 298 # Agent sees: /workspace/external/user-rw/{name}/* -> Real path: /mounts/{name}/* 299 300 user_mounts_ro: dict[str, Path] = Field( 301 default_factory=dict, 302 description="Per-user read-only mounts: {name: container_path}" 303 ) 304 user_mounts_rw: dict[str, Path] = Field( 305 default_factory=dict, 306 description="Per-user read-write mounts: {name: container_path}" 307 ) 308 309 # ========================================================================= 310 # DYNAMIC MOUNT PATHS - Session-time user-selected mounts 311 # ========================================================================= 312 # These are configured via API at session creation time. 313 # Agent sees: ./{alias}/* via symlinks at workspace root 314 # Real path: /mounts/{base}/{subpath}/* (flattened structure) 315 # The symlinks are created at workspace/{alias} pointing to /mounts/{base}/{subpath} 316 317 dynamic_mounts_ro: dict[str, Path] = Field( 318 default_factory=dict, 319 description="Dynamic read-only mounts for this session: {alias: container_path}" 320 ) 321 dynamic_mounts_rw: dict[str, Path] = Field( 322 default_factory=dict, 323 description="Dynamic read-write mounts for this session: {alias: container_path}" 324 ) 325 326 # ========================================================================= 327 # ORIGINAL-PATH MOUNTS - Access paths at their original locations 328 # ========================================================================= 329 # These allow accessing paths like /var/log at /var/log (not via workspace). 330 # Docker mounts them at /mounts/paths/{encoded}, and bubblewrap bind-mounts 331 # them to their original locations inside the sandbox. 332 # For file tools in the main Python process, we translate original paths 333 # to Docker paths: /var/log -> /mounts/paths/_var_log 334 335 original_path_mounts_ro: dict[str, Path] = Field( 336 default_factory=dict, 337 description="Original-path read-only mounts: {original_path: docker_path}" 338 ) 339 original_path_mounts_rw: dict[str, Path] = Field( 340 default_factory=dict, 341 description="Original-path read-write mounts: {original_path: docker_path}" 342 ) 343 344 log_all_access: bool = Field( 345 default=True, description="Log all path access attempts" 346 ) 347 blocklist: list[str] = Field( 348 default_factory=lambda: DEFAULT_BLOCKLIST.copy(), 349 description="Glob patterns to block even within workspace", 350 ) 351 blocklist_exemptions: list[str] = Field( 352 default_factory=lambda: DEFAULT_BLOCKLIST_EXEMPTIONS.copy(), 353 description="Filename patterns exempt from blocklist (e.g., .env.example)", 354 ) 355 allowlist: list[str] | None = Field( 356 default=None, description="If set, only these patterns are allowed" 357 ) 358 readonly_prefixes: list[str] = Field( 359 default_factory=lambda: DEFAULT_READONLY_PREFIXES.copy(), 360 description="Path prefixes (relative to workspace) that are read-only", 361 ) 362 363 364 @dataclass 365 class ValidatedPath: 366 """Result of path validation.""" 367 368 original: str 369 normalized: Path 370 is_readonly: bool = False 371 372 373 class PathValidationError(Exception): 374 """Raised when path validation fails.""" 375 376 def __init__(self, message: str, path: str, reason: str): 377 super().__init__(message) 378 self.path = path 379 self.reason = reason 380 381 382 class Ag3ntumPathValidator: 383 """ 384 Centralized path validation for all Ag3ntum tools. 385 386 IMPORTANT: This runs in the main Python process, NOT inside bwrap. 387 It sees the REAL Docker filesystem paths, not bwrap mount paths. 388 389 Responsibilities: 390 1. Normalize paths: ./foo, /workspace/foo, foo -> /users/greg/sessions/xxx/workspace/foo 391 2. Validate paths are within workspace boundary 392 3. Check blocklist/allowlist patterns 393 4. Identify read-only paths (skills) 394 5. Log all access attempts 395 """ 396 397 def __init__(self, config: PathValidatorConfig): 398 """ 399 Initialize with session-specific configuration. 400 401 Args: 402 config: Must include workspace_path (the REAL path in Docker filesystem) 403 """ 404 self.config = config 405 self.workspace = config.workspace_path.resolve() # REAL Docker path 406 self.skills = config.skills_path.resolve() if config.skills_path else None 407 # Additional read-only paths for skills access 408 self.global_skills = config.global_skills_path.resolve() if config.global_skills_path else None 409 self.user_skills = config.user_skills_path.resolve() if config.user_skills_path else None 410 411 # External mount paths (flattened structure: all at /mounts/{name}) 412 # Agent sees: /workspace/external/ro/* -> Real path: /mounts/{name} 413 # Agent sees: /workspace/external/rw/* -> Real path: /mounts/{name} 414 self.global_mounts_ro: dict[str, Path] = { 415 name: path.resolve() for name, path in config.global_mounts_ro.items() 416 } 417 self.global_mounts_rw: dict[str, Path] = { 418 name: path.resolve() for name, path in config.global_mounts_rw.items() 419 } 420 # Agent sees: /workspace/persistent/* -> Real path: /users/{username}/ag3ntum/persistent/* 421 self.persistent = config.persistent_path.resolve() if config.persistent_path else None 422 423 # Per-user mount paths (resolved at session start, flattened structure) 424 # Agent sees: /workspace/external/user-ro/{name}/* -> Real path: /mounts/{name}/* 425 # Agent sees: /workspace/external/user-rw/{name}/* -> Real path: /mounts/{name}/* 426 self.user_mounts_ro: dict[str, Path] = { 427 name: path.resolve() for name, path in config.user_mounts_ro.items() 428 } 429 self.user_mounts_rw: dict[str, Path] = { 430 name: path.resolve() for name, path in config.user_mounts_rw.items() 431 } 432 433 # Dynamic mount paths (configured per-session via API, flattened structure) 434 # Agent sees: ./{alias}/* via symlinks -> Real path: /mounts/{base}/* 435 self.dynamic_mounts_ro: dict[str, Path] = { 436 alias: path.resolve() for alias, path in config.dynamic_mounts_ro.items() 437 } 438 self.dynamic_mounts_rw: dict[str, Path] = { 439 alias: path.resolve() for alias, path in config.dynamic_mounts_rw.items() 440 } 441 442 # Original-path mounts (access paths at original locations) 443 # Agent sees: /var/log/* -> Docker path: /mounts/paths/_var_log/* 444 # The key is the original path, the value is the Docker path 445 self.original_path_mounts_ro: dict[str, Path] = { 446 orig: docker.resolve() for orig, docker in config.original_path_mounts_ro.items() 447 } 448 self.original_path_mounts_rw: dict[str, Path] = { 449 orig: docker.resolve() for orig, docker in config.original_path_mounts_rw.items() 450 } 451 452 # Extract session context from workspace path for cross-user/cross-session blocking 453 # Path format: .../users/{username}/sessions/{session_id}/workspace 454 # Note: /users/ may appear anywhere in path (e.g., /tmp/test/users/... in tests) 455 self._session_username: str | None = None 456 self._session_id: str | None = None 457 workspace_str = str(config.workspace_path) 458 users_idx = workspace_str.find("/users/") 459 if users_idx >= 0: 460 # Extract the portion starting from /users/ 461 users_path = workspace_str[users_idx:] 462 parts = users_path.split("/") 463 # parts[0] = "", parts[1] = "users", parts[2] = username, ... 464 if len(parts) >= 3: 465 self._session_username = parts[2] 466 if len(parts) >= 5 and parts[3] == "sessions": 467 self._session_id = parts[4] 468 469 def docker_to_display_path(self, docker_path: Path) -> str: 470 """ 471 Convert a Docker internal path back to an agent-visible display path. 472 473 Used by LS, Glob, Grep tools to show user-friendly paths instead of 474 raw Docker internal paths (e.g., /mounts/global_var_log/apt/). 475 476 Translation priority: 477 1. Workspace-relative (e.g., ./src/main.py → src/main.py) 478 2. Persistent storage (e.g., /users/.../persistent/x → persistent/x) 479 3. Global RO mounts (e.g., /mounts/name/x → external/ro/name/x) 480 4. Global RW mounts (e.g., /mounts/name/x → external/rw/name/x) 481 5. Per-user RO mounts → external/user-ro/name/x 482 6. Per-user RW mounts → external/user-rw/name/x 483 7. Dynamic RO mounts → dynamic/alias/x 484 8. Dynamic RW mounts → dynamic/alias/x 485 9. Original-path mounts → /original/path/x 486 10. Fallback: return str(docker_path) 487 488 Args: 489 docker_path: Docker filesystem path (may be unresolved/symlinked) 490 491 Returns: 492 Agent-visible display path string 493 """ 494 # 1. Workspace-relative: try WITHOUT resolving first to preserve symlink names 495 try: 496 return str(docker_path.relative_to(self.workspace)) 497 except ValueError: 498 pass 499 500 # For mount paths, resolve to follow symlinks and match mount boundaries 501 resolved = docker_path.resolve() 502 503 # Also try workspace-relative with resolved path (for paths reached via symlinks) 504 try: 505 return str(resolved.relative_to(self.workspace)) 506 except ValueError: 507 pass 508 509 # 2. Persistent storage 510 if self.persistent: 511 try: 512 rel = resolved.relative_to(self.persistent) 513 return f"persistent/{rel}" if str(rel) != "." else "persistent" 514 except ValueError: 515 pass 516 517 # 3-4. Global mounts 518 for name, mount_path in self.global_mounts_ro.items(): 519 try: 520 rel = resolved.relative_to(mount_path) 521 return f"external/ro/{name}/{rel}" if str(rel) != "." else f"external/ro/{name}" 522 except ValueError: 523 pass 524 525 for name, mount_path in self.global_mounts_rw.items(): 526 try: 527 rel = resolved.relative_to(mount_path) 528 return f"external/rw/{name}/{rel}" if str(rel) != "." else f"external/rw/{name}" 529 except ValueError: 530 pass 531 532 # 5-6. Per-user mounts 533 for name, mount_path in self.user_mounts_ro.items(): 534 try: 535 rel = resolved.relative_to(mount_path) 536 return f"external/user-ro/{name}/{rel}" if str(rel) != "." else f"external/user-ro/{name}" 537 except ValueError: 538 pass 539 540 for name, mount_path in self.user_mounts_rw.items(): 541 try: 542 rel = resolved.relative_to(mount_path) 543 return f"external/user-rw/{name}/{rel}" if str(rel) != "." else f"external/user-rw/{name}" 544 except ValueError: 545 pass 546 547 # 7-8. Dynamic mounts 548 for alias, mount_path in self.dynamic_mounts_ro.items(): 549 try: 550 rel = resolved.relative_to(mount_path) 551 return f"dynamic/{alias}/{rel}" if str(rel) != "." else f"dynamic/{alias}" 552 except ValueError: 553 pass 554 555 for alias, mount_path in self.dynamic_mounts_rw.items(): 556 try: 557 rel = resolved.relative_to(mount_path) 558 return f"dynamic/{alias}/{rel}" if str(rel) != "." else f"dynamic/{alias}" 559 except ValueError: 560 pass 561 562 # 9. Original-path mounts (reverse: Docker path → original host path) 563 for orig_path, docker_mount in self.original_path_mounts_ro.items(): 564 try: 565 rel = resolved.relative_to(docker_mount) 566 return f"{orig_path}/{rel}" if str(rel) != "." else orig_path 567 except ValueError: 568 pass 569 570 for orig_path, docker_mount in self.original_path_mounts_rw.items(): 571 try: 572 rel = resolved.relative_to(docker_mount) 573 return f"{orig_path}/{rel}" if str(rel) != "." else orig_path 574 except ValueError: 575 pass 576 577 # 10. Fallback 578 return str(docker_path) 579 580 def validate_path( 581 self, 582 path: str, 583 operation: Literal["read", "write", "edit", "delete", "list", "glob", "grep"], 584 allow_directory: bool = False, 585 ) -> ValidatedPath: 586 """ 587 Validate and normalize a path for the given operation. 588 589 Args: 590 path: User-provided path (relative or /workspace/... style) 591 operation: Type of operation (affects read-only check) 592 allow_directory: Whether directories are valid (for ls, glob) 593 594 Returns: 595 ValidatedPath with normalized path 596 597 Raises: 598 PathValidationError: If path is invalid or blocked 599 """ 600 original = path 601 602 # Step 1: Normalize the path 603 try: 604 normalized = self._normalize_path(path) 605 except Exception as e: 606 self._log_blocked(path, operation, f"Normalization failed: {e}") 607 raise PathValidationError( 608 f"Invalid path: {path}", 609 path=path, 610 reason=f"Path normalization failed: {e}", 611 ) 612 613 # Step 1.5: SECURITY - Block cross-user and cross-session access FIRST 614 # This prevents agents from accessing other users' or other sessions' data 615 # Must run before boundary check to give specific error messages 616 norm_str = str(normalized) 617 618 # Cross-user access blocking 619 if self._session_username and "/users/" in norm_str: 620 path_username = self._extract_path_component(norm_str, "/users/") 621 if path_username and path_username != self._session_username: 622 # Check if this is an allowed exception (e.g., skills) 623 is_allowed = ( 624 (self.global_skills and self._is_within_boundary(normalized, self.global_skills)) or 625 (self.user_skills and self._is_within_boundary(normalized, self.user_skills)) 626 ) 627 if not is_allowed: 628 self._log_blocked(path, operation, f"Cross-user access blocked: {path_username}") 629 raise PathValidationError( 630 f"Access to other users' directories is not allowed: {path}", 631 path=path, 632 reason="CROSS_USER_ACCESS_BLOCKED", 633 ) 634 635 # Cross-session access blocking (same user, different session) 636 if self._session_username and self._session_id: 637 sessions_pattern = f"/users/{self._session_username}/sessions/" 638 if sessions_pattern in norm_str: 639 path_session_id = self._extract_path_component(norm_str, sessions_pattern) 640 if path_session_id and path_session_id != self._session_id: 641 self._log_blocked(path, operation, f"Cross-session access blocked: {path_session_id}") 642 raise PathValidationError( 643 f"Access to other sessions is not allowed: {path}", 644 path=path, 645 reason="CROSS_SESSION_ACCESS_BLOCKED", 646 ) 647 648 # Step 2: Check boundary (workspace, skills, or external mount directories) 649 # Paths can be within: 650 # - Workspace (read-write for most, read-only for some prefixes) 651 # - Global skills directory (read-only) 652 # - User skills directory (read-only) 653 # - External RO mounts (read-only) 654 # - External RW mounts (read-write) 655 # - Persistent storage (read-write) 656 # - Per-user RO mounts (read-only) 657 # - Per-user RW mounts (read-write) 658 in_workspace = False 659 in_global_skills = False 660 in_user_skills = False 661 in_external_ro = False 662 in_external_rw = False 663 in_persistent = False 664 in_user_ro = False 665 in_user_rw = False 666 rel_path = "" 667 668 try: 669 rel_path = str(normalized.relative_to(self.workspace)) 670 in_workspace = True 671 except ValueError: 672 pass 673 674 if not in_workspace and self.global_skills: 675 try: 676 rel_path = str(normalized.relative_to(self.global_skills)) 677 in_global_skills = True 678 except ValueError: 679 pass 680 681 if not in_workspace and not in_global_skills and self.user_skills: 682 try: 683 rel_path = str(normalized.relative_to(self.user_skills)) 684 in_user_skills = True 685 except ValueError: 686 pass 687 688 # Check global external mount boundaries (flattened structure) 689 if not in_workspace and not in_global_skills and not in_user_skills: 690 # Check global RO mounts 691 for mount_name, mount_path in self.global_mounts_ro.items(): 692 try: 693 rel_path = str(normalized.relative_to(mount_path)) 694 in_external_ro = True 695 break 696 except ValueError: 697 pass 698 699 if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro: 700 # Check global RW mounts 701 for mount_name, mount_path in self.global_mounts_rw.items(): 702 try: 703 rel_path = str(normalized.relative_to(mount_path)) 704 in_external_rw = True 705 break 706 except ValueError: 707 pass 708 709 if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw: 710 if self.persistent: 711 try: 712 rel_path = str(normalized.relative_to(self.persistent)) 713 in_persistent = True 714 except ValueError: 715 pass 716 717 # Check per-user mount boundaries 718 if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent: 719 # Check per-user RO mounts 720 for mount_name, mount_path in self.user_mounts_ro.items(): 721 try: 722 rel_path = str(normalized.relative_to(mount_path)) 723 in_user_ro = True 724 break 725 except ValueError: 726 pass 727 728 if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent and not in_user_ro: 729 # Check per-user RW mounts 730 for mount_name, mount_path in self.user_mounts_rw.items(): 731 try: 732 rel_path = str(normalized.relative_to(mount_path)) 733 in_user_rw = True 734 break 735 except ValueError: 736 pass 737 738 # Check dynamic mount boundaries (session-time user-selected mounts) 739 in_dynamic_ro = False 740 in_dynamic_rw = False 741 742 if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent and not in_user_ro and not in_user_rw: 743 # Check dynamic RO mounts 744 for alias, mount_path in self.dynamic_mounts_ro.items(): 745 try: 746 rel_path = str(normalized.relative_to(mount_path)) 747 in_dynamic_ro = True 748 break 749 except ValueError: 750 pass 751 752 if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent and not in_user_ro and not in_user_rw and not in_dynamic_ro: 753 # Check dynamic RW mounts 754 for alias, mount_path in self.dynamic_mounts_rw.items(): 755 try: 756 rel_path = str(normalized.relative_to(mount_path)) 757 in_dynamic_rw = True 758 break 759 except ValueError: 760 pass 761 762 # Check original-path mount boundaries 763 # Original-path mounts allow access to paths like /var/log at their original locations 764 in_original_ro = False 765 in_original_rw = False 766 767 not_in_any_yet = not ( 768 in_workspace or in_global_skills or in_user_skills or 769 in_external_ro or in_external_rw or in_persistent or 770 in_user_ro or in_user_rw or in_dynamic_ro or in_dynamic_rw 771 ) 772 if not_in_any_yet: 773 # Check original-path RO mounts 774 for orig_path, docker_path in self.original_path_mounts_ro.items(): 775 try: 776 rel_path = str(normalized.relative_to(docker_path)) 777 in_original_ro = True 778 break 779 except ValueError: 780 pass 781 782 if not_in_any_yet and not in_original_ro: 783 # Check original-path RW mounts 784 for orig_path, docker_path in self.original_path_mounts_rw.items(): 785 try: 786 rel_path = str(normalized.relative_to(docker_path)) 787 in_original_rw = True 788 break 789 except ValueError: 790 pass 791 792 in_any_allowed = ( 793 in_workspace or in_global_skills or in_user_skills or 794 in_external_ro or in_external_rw or in_persistent or 795 in_user_ro or in_user_rw or in_dynamic_ro or in_dynamic_rw or 796 in_original_ro or in_original_rw 797 ) 798 799 if not in_any_allowed: 800 self._log_blocked(path, operation, "Outside allowed directories") 801 raise PathValidationError( 802 f"Path outside allowed directories: {path}", 803 path=path, 804 reason="Path must be within workspace, skills, or external mount directories", 805 ) 806 807 # Step 3: Check for path traversal attempts 808 if ".." in path: 809 # Even if normalized path is valid, log the attempt 810 logger.warning(f"PATH_VALIDATOR: Traversal attempt in path: {path}") 811 812 # Step 4: Check blocklist (workspace and external mount paths) 813 # Security: blocklist applies to all areas to prevent accessing sensitive files 814 should_check_blocklist = ( 815 in_workspace or in_external_ro or in_external_rw or in_persistent or 816 in_user_ro or in_user_rw or in_dynamic_ro or in_dynamic_rw or 817 in_original_ro or in_original_rw 818 ) 819 if should_check_blocklist: 820 # Check exemptions first — safe template files bypass the blocklist 821 filename = normalized.name 822 is_exempt = any( 823 fnmatch.fnmatch(filename, exempt) 824 for exempt in self.config.blocklist_exemptions 825 ) 826 827 if not is_exempt: 828 for pattern in self.config.blocklist: 829 if fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch( 830 filename, pattern 831 ): 832 self._log_blocked( 833 path, operation, f"Matches blocklist pattern: {pattern}" 834 ) 835 raise PathValidationError( 836 f"Path blocked by policy: {path}", 837 path=path, 838 reason=f"BLOCKLIST: Matches pattern: {pattern}", 839 ) 840 841 # Step 5: Check allowlist (if configured, only for workspace paths) 842 if in_workspace and self.config.allowlist is not None: 843 allowed = False 844 for pattern in self.config.allowlist: 845 if fnmatch.fnmatch(rel_path, pattern): 846 allowed = True 847 break 848 if not allowed: 849 self._log_blocked(path, operation, "Not in allowlist") 850 raise PathValidationError( 851 f"Path not in allowlist: {path}", 852 path=path, 853 reason="Path does not match any allowed pattern", 854 ) 855 856 # Step 6: Check if read-only 857 # Read-only areas: 858 # - Skills directories (global and user) are always read-only 859 # - External RO mounts are always read-only 860 # - Per-user RO mounts are always read-only 861 # - Dynamic RO mounts are always read-only 862 # - Original-path RO mounts are always read-only 863 # - Workspace paths may have readonly_prefixes 864 is_readonly = in_global_skills or in_user_skills or in_external_ro or in_user_ro or in_dynamic_ro or in_original_ro 865 866 if in_workspace and not is_readonly: 867 is_readonly = any( 868 rel_path.startswith(ro_prefix.rstrip("/")) 869 for ro_prefix in self.config.readonly_prefixes 870 ) 871 872 if is_readonly and operation in ("write", "edit", "delete"): 873 # Provide helpful error message for external RO mounts 874 if in_external_ro or in_user_ro or in_dynamic_ro or in_original_ro: 875 self._log_blocked(path, operation, "Read-only external mount") 876 raise PathValidationError( 877 f"Cannot {operation} read-only mount: {path}", 878 path=path, 879 reason="Mount is read-only (external mount, per-user ro, dynamic ro, or original-path ro)", 880 ) 881 else: 882 self._log_blocked(path, operation, "Read-only path") 883 raise PathValidationError( 884 f"Cannot {operation} read-only path: {path}", 885 path=path, 886 reason="Path is read-only", 887 ) 888 889 # Log success 890 self._log_allowed(original, normalized, operation) 891 892 return ValidatedPath( 893 original=original, 894 normalized=normalized, 895 is_readonly=is_readonly, 896 ) 897 898 def _normalize_path(self, path: str) -> Path: 899 """ 900 Normalize agent-provided path to REAL Docker filesystem path. 901 902 The agent thinks it's working with bwrap paths: 903 - /workspace/foo.txt -> becomes /users/greg/sessions/xxx/workspace/foo.txt 904 - ./foo.txt -> becomes /users/greg/sessions/xxx/workspace/foo.txt 905 - foo.txt -> becomes /users/greg/sessions/xxx/workspace/foo.txt 906 907 External mount paths are translated as: 908 - /workspace/external/ro/{name}/file -> /mounts/ro/{name}/file 909 - /workspace/external/rw/{name}/file -> /mounts/rw/{name}/file 910 - /workspace/persistent/file -> /users/{username}/ag3ntum/persistent/file 911 - ./external/ro/{name}/file -> same translations 912 - ./persistent/file -> same translation 913 914 This translation is critical because the Python file tools run OUTSIDE bwrap 915 and need the real Docker filesystem paths. 916 """ 917 p = PurePosixPath(path) 918 path_str = str(p) 919 920 # First, normalize relative paths that reference external mounts or persistent 921 # NOTE: Dynamic mounts are now at workspace root as symlinks (e.g., ./logs/ instead of ./dynamic/logs/) 922 # and are resolved automatically via standard workspace path handling below. 923 if not p.is_absolute(): 924 # Check if it's a relative external path like ./external/ro/... or ./persistent/... 925 if path_str.startswith("./external/") or path_str.startswith("external/"): 926 # Convert to absolute bwrap-style path 927 path_str = "/workspace/" + path_str.lstrip("./") 928 p = PurePosixPath(path_str) 929 # Handle persistent paths - with or without trailing slash 930 # PurePosixPath normalizes "./persistent/" to "persistent" (strips ./ and trailing /) 931 # So we need to check for: ./persistent, ./persistent/, persistent, persistent/ 932 elif ( 933 path_str == "./persistent" or path_str == "persistent" or 934 path_str.startswith("./persistent/") or path_str.startswith("persistent/") 935 ): 936 # Convert to absolute bwrap-style path 937 path_str = "/workspace/" + path_str.lstrip("./") 938 p = PurePosixPath(path_str) 939 940 # Handle absolute /persistent/* path (bwrap sandbox internal path) 941 # /persistent/* -> /users/{username}/ag3ntum/persistent/* 942 # This is the path format agents see inside bwrap sandbox 943 if path_str.startswith("/persistent/") or path_str == "/persistent": 944 relative = path_str[len("/persistent/"):] if path_str != "/persistent" else "" 945 return self._resolve_persistent_path(path_str, relative, path) 946 947 # Handle agent paths that reference persistent storage (at workspace root, not under external/) 948 # /workspace/persistent/* -> /users/{username}/ag3ntum/persistent/* 949 if path_str.startswith("/workspace/persistent/") or path_str == "/workspace/persistent": 950 if self.persistent: 951 relative = path_str[len("/workspace/persistent/"):] if path_str != "/workspace/persistent" else "" 952 return self._resolve_persistent_path(path_str, relative, path) 953 # Persistent not configured, treat as workspace path 954 relative_to_workspace = path_str[len("/workspace"):].lstrip("/") 955 return (self.workspace / relative_to_workspace).resolve() 956 957 # Handle agent paths that reference external mounts 958 if path_str.startswith("/workspace/external/"): 959 # Extract the part after /workspace/external/ 960 external_part = path_str[len("/workspace/external/"):] 961 962 # Route to correct external mount (flattened structure: /mounts/{name}) 963 if external_part.startswith("ro/"): 964 # Read-only external mount: /workspace/external/ro/{name}/* -> /mounts/{name}/* 965 relative = external_part[3:] # Remove "ro/" 966 967 # Extract mount name (first path component) 968 if "/" in relative: 969 mount_name, mount_relative = relative.split("/", 1) 970 else: 971 mount_name = relative 972 mount_relative = "" 973 974 # Check global RO mounts first 975 if mount_name in self.global_mounts_ro: 976 mount_path = self.global_mounts_ro[mount_name] 977 if mount_relative: 978 resolved = (mount_path / mount_relative).resolve() 979 else: 980 resolved = mount_path.resolve() 981 # Security: verify resolved path stays within boundary 982 if not self._is_within_boundary(resolved, mount_path): 983 raise PathValidationError( 984 f"Path traversal detected: {path}", 985 path=path, 986 reason="PATH_TRAVERSAL: Resolved path escapes global-ro mount boundary", 987 ) 988 return resolved 989 # Fallback to user mounts for backward compatibility 990 elif mount_name in self.user_mounts_ro: 991 mount_path = self.user_mounts_ro[mount_name] 992 if mount_relative: 993 resolved = (mount_path / mount_relative).resolve() 994 else: 995 resolved = mount_path.resolve() 996 # Security: verify resolved path stays within boundary 997 if not self._is_within_boundary(resolved, mount_path): 998 raise PathValidationError( 999 f"Path traversal detected: {path}", 1000 path=path, 1001 reason="PATH_TRAVERSAL: Resolved path escapes user-ro mount boundary", 1002 ) 1003 return resolved 1004 else: 1005 # Mount not found, treat as workspace path (will likely fail boundary check) 1006 relative_to_workspace = path_str[len("/workspace"):].lstrip("/") 1007 resolved = (self.workspace / relative_to_workspace).resolve() 1008 return resolved 1009 1010 elif external_part.startswith("rw/"): 1011 # Read-write external mount: /workspace/external/rw/{name}/* -> /mounts/{name}/* 1012 relative = external_part[3:] # Remove "rw/" 1013 1014 # Extract mount name (first path component) 1015 if "/" in relative: 1016 mount_name, mount_relative = relative.split("/", 1) 1017 else: 1018 mount_name = relative 1019 mount_relative = "" 1020 1021 # Check global RW mounts first 1022 if mount_name in self.global_mounts_rw: 1023 mount_path = self.global_mounts_rw[mount_name] 1024 if mount_relative: 1025 resolved = (mount_path / mount_relative).resolve() 1026 else: 1027 resolved = mount_path.resolve() 1028 # Security: verify resolved path stays within boundary 1029 if not self._is_within_boundary(resolved, mount_path): 1030 raise PathValidationError( 1031 f"Path traversal detected: {path}", 1032 path=path, 1033 reason="PATH_TRAVERSAL: Resolved path escapes global-rw mount boundary", 1034 ) 1035 return resolved 1036 # Fallback to user mounts for backward compatibility 1037 elif mount_name in self.user_mounts_rw: 1038 mount_path = self.user_mounts_rw[mount_name] 1039 if mount_relative: 1040 resolved = (mount_path / mount_relative).resolve() 1041 else: 1042 resolved = mount_path.resolve() 1043 # Security: verify resolved path stays within boundary 1044 if not self._is_within_boundary(resolved, mount_path): 1045 raise PathValidationError( 1046 f"Path traversal detected: {path}", 1047 path=path, 1048 reason="PATH_TRAVERSAL: Resolved path escapes user-rw mount boundary", 1049 ) 1050 return resolved 1051 else: 1052 # Mount not found, treat as workspace path 1053 relative_to_workspace = path_str[len("/workspace"):].lstrip("/") 1054 resolved = (self.workspace / relative_to_workspace).resolve() 1055 return resolved 1056 1057 elif external_part.startswith("persistent/") or external_part == "persistent": 1058 # DEPRECATED: /workspace/external/persistent/* is deprecated 1059 # Use /workspace/persistent/* instead (persistent is now at workspace root) 1060 logger.warning( 1061 f"Deprecated path: {path}. Use ./persistent/ instead of ./external/persistent/" 1062 ) 1063 if self.persistent: 1064 relative = external_part[11:] if external_part != "persistent" else "" # Remove "persistent/" 1065 return self._resolve_persistent_path(path_str, relative, path) 1066 # Persistent not configured, treat as workspace path 1067 relative_to_workspace = path_str[len("/workspace"):].lstrip("/") 1068 return (self.workspace / relative_to_workspace).resolve() 1069 1070 elif external_part.startswith("user-ro/"): 1071 # Per-user read-only mount: /workspace/external/user-ro/{name}/* -> real path/* 1072 remaining = external_part[8:] # Remove "user-ro/" 1073 # Extract mount name (first path component) 1074 if "/" in remaining: 1075 mount_name, relative = remaining.split("/", 1) 1076 else: 1077 mount_name = remaining 1078 relative = "" 1079 1080 if mount_name in self.user_mounts_ro: 1081 mount_path = self.user_mounts_ro[mount_name] 1082 if relative: 1083 resolved = (mount_path / relative).resolve() 1084 else: 1085 resolved = mount_path.resolve() 1086 # Security: verify resolved path stays within boundary 1087 if not self._is_within_boundary(resolved, mount_path): 1088 raise PathValidationError( 1089 f"Path traversal detected: {path}", 1090 path=path, 1091 reason="PATH_TRAVERSAL: Resolved path escapes user-ro mount boundary", 1092 ) 1093 return resolved 1094 else: 1095 # Mount not configured, treat as workspace path 1096 relative_to_workspace = path_str[len("/workspace"):].lstrip("/") 1097 resolved = (self.workspace / relative_to_workspace).resolve() 1098 return resolved 1099 1100 elif external_part.startswith("user-rw/"): 1101 # Per-user read-write mount: /workspace/external/user-rw/{name}/* -> real path/* 1102 remaining = external_part[8:] # Remove "user-rw/" 1103 # Extract mount name (first path component) 1104 if "/" in remaining: 1105 mount_name, relative = remaining.split("/", 1) 1106 else: 1107 mount_name = remaining 1108 relative = "" 1109 1110 if mount_name in self.user_mounts_rw: 1111 mount_path = self.user_mounts_rw[mount_name] 1112 if relative: 1113 resolved = (mount_path / relative).resolve() 1114 else: 1115 resolved = mount_path.resolve() 1116 # Security: verify resolved path stays within boundary 1117 if not self._is_within_boundary(resolved, mount_path): 1118 raise PathValidationError( 1119 f"Path traversal detected: {path}", 1120 path=path, 1121 reason="PATH_TRAVERSAL: Resolved path escapes user-rw mount boundary", 1122 ) 1123 return resolved 1124 else: 1125 # Mount not configured, treat as workspace path 1126 relative_to_workspace = path_str[len("/workspace"):].lstrip("/") 1127 resolved = (self.workspace / relative_to_workspace).resolve() 1128 return resolved 1129 1130 # Unrecognized external path - fall through to workspace handling 1131 1132 # NOTE: Dynamic mounts are now symlinked at workspace root (e.g., workspace/{alias}) 1133 # instead of workspace/dynamic/{alias}. The symlink resolution in workspace path 1134 # handling below automatically resolves to /mounts/dynamic/{base}/{subpath}. 1135 # Validation then checks if the resolved path is within allowed dynamic_mounts_*. 1136 1137 # Handle standard workspace paths 1138 if path_str.startswith("/workspace"): 1139 # Agent provided bwrap-style path: /workspace/foo -> workspace/foo 1140 relative_to_workspace = path_str[len("/workspace"):].lstrip("/") 1141 resolved = (self.workspace / relative_to_workspace).resolve() 1142 # Security: verify resolved path stays within workspace boundary 1143 if not self._is_within_boundary(resolved, self.workspace): 1144 raise PathValidationError( 1145 f"Path traversal detected: {path}", 1146 path=path, 1147 reason="PATH_TRAVERSAL: Resolved path escapes workspace boundary", 1148 ) 1149 elif not p.is_absolute(): 1150 # Relative path: ./foo or foo -> workspace/foo 1151 resolved = (self.workspace / p).resolve() 1152 else: 1153 # Absolute path NOT starting with /workspace 1154 # Check if this is an original-path mount (e.g., /var/log) 1155 # Original-path mounts allow accessing paths at their original locations 1156 # Translate: /var/log -> /mounts/paths/_var_log 1157 original_mount = self._find_original_path_mount(path_str) 1158 if original_mount: 1159 orig_path, docker_path, is_ro = original_mount 1160 if path_str == orig_path: 1161 resolved = docker_path.resolve() 1162 else: 1163 # Path is under the mount (e.g., /var/log/syslog) 1164 relative = path_str[len(orig_path):].lstrip("/") 1165 resolved = (docker_path / relative).resolve() 1166 # Security: verify resolved path stays within mount boundary 1167 if not self._is_within_boundary(resolved, docker_path): 1168 raise PathValidationError( 1169 f"Path traversal detected: {path}", 1170 path=path, 1171 reason="PATH_TRAVERSAL: Resolved path escapes original-path mount boundary", 1172 ) 1173 else: 1174 # This is an escape attempt (like /etc/passwd) 1175 resolved = Path(p).resolve() 1176 1177 return resolved 1178 1179 def validate_no_symlink_escape( 1180 self, path: Path, boundary: Path, check_intermediate: bool = True 1181 ) -> Path: 1182 """ 1183 Validate that path (including symlinks) doesn't escape boundary. 1184 1185 This prevents TOCTOU attacks where: 1186 1. Attacker creates: /workspace/external/rw/projects/link -> /etc/passwd 1187 2. Validation passes (link exists in allowed area) 1188 3. Read follows symlink to /etc/passwd 1189 1190 Args: 1191 path: The path to validate 1192 boundary: The boundary the resolved path must stay within 1193 check_intermediate: If True, check each intermediate symlink 1194 1195 Returns: 1196 The fully resolved path 1197 1198 Raises: 1199 PathValidationError: If path or any symlink escapes boundary 1200 """ 1201 # Resolve the path fully (follows all symlinks) 1202 try: 1203 resolved = path.resolve(strict=False) 1204 except (OSError, RuntimeError) as e: 1205 raise PathValidationError( 1206 f"Cannot resolve path: {path} - {e}", 1207 path=str(path), 1208 reason="PATH_RESOLUTION_ERROR", 1209 ) 1210 1211 # Check each intermediate component for symlink escape 1212 if check_intermediate and path.exists(): 1213 current = Path("/") 1214 for part in path.parts[1:]: # Skip root 1215 current = current / part 1216 if current.exists() and current.is_symlink(): 1217 try: 1218 link_target = current.resolve() 1219 link_target.relative_to(boundary) 1220 except ValueError: 1221 logger.warning( 1222 f"PATH_VALIDATOR: Symlink escape detected: " 1223 f"{current} -> {link_target} (outside {boundary})" 1224 ) 1225 raise PathValidationError( 1226 f"Symlink escape detected: {current}", 1227 path=str(path), 1228 reason="SYMLINK_ESCAPE", 1229 ) 1230 except OSError: 1231 # Broken symlink or permission error - allow to continue 1232 pass 1233 1234 # Final resolved path must be within boundary 1235 try: 1236 resolved.relative_to(boundary) 1237 except ValueError: 1238 raise PathValidationError( 1239 f"Path resolves outside boundary: {path} -> {resolved}", 1240 path=str(path), 1241 reason="PATH_ESCAPE", 1242 ) 1243 1244 return resolved 1245 1246 def _log_allowed(self, original: str, normalized: Path, operation: str) -> None: 1247 """Log allowed path access.""" 1248 if self.config.log_all_access: 1249 logger.info( 1250 f"PATH_VALIDATOR: ALLOWED {operation.upper()} " 1251 f"'{original}' -> '{normalized}'" 1252 ) 1253 1254 def _extract_path_component(self, path_str: str, pattern: str) -> str | None: 1255 """ 1256 Extract the first path component after a pattern. 1257 1258 Args: 1259 path_str: The path string to search 1260 pattern: The pattern to find (e.g., "/users/") 1261 1262 Returns: 1263 The first component after the pattern, or None if not found 1264 """ 1265 idx = path_str.find(pattern) 1266 if idx < 0: 1267 return None 1268 remaining = path_str[idx + len(pattern):] 1269 return remaining.split("/")[0] if remaining else None 1270 1271 def _resolve_persistent_path(self, path: str, relative: str, original_path: str) -> Path: 1272 """ 1273 Resolve a path within persistent storage with boundary validation. 1274 1275 Args: 1276 path: The full path being resolved (for error messages) 1277 relative: The relative path within persistent storage 1278 original_path: The original user-provided path (for error messages) 1279 1280 Returns: 1281 Resolved Path within persistent storage 1282 1283 Raises: 1284 PathValidationError: If path escapes boundary or persistent not configured 1285 """ 1286 if not self.persistent: 1287 raise PathValidationError( 1288 f"Persistent storage not configured: {path}", 1289 path=original_path, 1290 reason="Persistent storage path not available", 1291 ) 1292 1293 resolved = (self.persistent / relative).resolve() if relative else self.persistent.resolve() 1294 1295 if not self._is_within_boundary(resolved, self.persistent): 1296 raise PathValidationError( 1297 f"Path traversal detected: {original_path}", 1298 path=original_path, 1299 reason="PATH_TRAVERSAL: Resolved path escapes persistent storage boundary", 1300 ) 1301 return resolved 1302 1303 def _find_original_path_mount( 1304 self, path: str 1305 ) -> tuple[str, Path, bool] | None: 1306 """ 1307 Find the original-path mount that contains the given path. 1308 1309 Args: 1310 path: An absolute path (e.g., "/var/log" or "/var/log/syslog") 1311 1312 Returns: 1313 Tuple of (original_path, docker_path, is_readonly) if found, else None 1314 """ 1315 best_match: tuple[str, Path, bool] | None = None 1316 best_len = 0 1317 1318 # Check RO mounts 1319 for orig_path, docker_path in self.original_path_mounts_ro.items(): 1320 if path == orig_path or path.startswith(orig_path + "/"): 1321 if len(orig_path) > best_len: 1322 best_match = (orig_path, docker_path, True) 1323 best_len = len(orig_path) 1324 1325 # Check RW mounts 1326 for orig_path, docker_path in self.original_path_mounts_rw.items(): 1327 if path == orig_path or path.startswith(orig_path + "/"): 1328 if len(orig_path) > best_len: 1329 best_match = (orig_path, docker_path, False) 1330 best_len = len(orig_path) 1331 1332 return best_match 1333 1334 def get_sandbox_root_entries(self) -> list[tuple[str, str, str]]: 1335 """ 1336 Synthesize a virtual directory listing of the sandbox root (/). 1337 1338 Returns entries matching what the agent would see inside bwrap, 1339 based on configured mounts. This avoids exposing Docker container 1340 internals while giving the agent a useful view of available paths. 1341 1342 Returns: 1343 List of (display_path, access_mode, description) tuples. 1344 display_path: The path as the agent should see it (e.g., "/workspace") 1345 access_mode: "rw" or "ro" 1346 description: Human-readable description 1347 """ 1348 entries: list[tuple[str, str, str]] = [] 1349 1350 # Core paths (always present) 1351 entries.append(("/workspace", "rw", "Session workspace (working directory)")) 1352 1353 if self.persistent: 1354 entries.append(("/persistent", "rw", "Persistent storage (cross-session)")) 1355 1356 # User environment 1357 # Venv is always configured in permissions.yaml session_mounts 1358 entries.append(("/venv", "ro", "Python virtual environment")) 1359 1360 if self.global_skills: 1361 entries.append(("/skills", "ro", "Global skills")) 1362 if self.user_skills: 1363 entries.append(("/user-skills", "ro", "User skills")) 1364 1365 # Original-path mounts (e.g., /var/log) 1366 for orig_path in sorted(self.original_path_mounts_ro.keys()): 1367 entries.append((orig_path, "ro", f"Mounted from host (read-only)")) 1368 for orig_path in sorted(self.original_path_mounts_rw.keys()): 1369 entries.append((orig_path, "rw", f"Mounted from host (read-write)")) 1370 1371 return entries 1372 1373 def find_virtual_children(self, parent_path: str) -> list[tuple[str, str, str]] | None: 1374 """ 1375 Find virtual directory children for a path that is a parent of configured mounts. 1376 1377 For example, if /var/log is a configured original-path mount, 1378 calling this with "/var" returns [("log", "ro", "Mounted from host")]. 1379 1380 Args: 1381 parent_path: Absolute path to check (e.g., "/var") 1382 1383 Returns: 1384 List of (child_name, access_mode, description) if any mounts exist 1385 under this path. None if no mounts are found under this path. 1386 """ 1387 parent = parent_path.rstrip("/") + "/" 1388 children: list[tuple[str, str, str]] = [] 1389 seen: set[str] = set() 1390 1391 for orig_path in self.original_path_mounts_ro: 1392 if orig_path.startswith(parent): 1393 # Extract the immediate child component 1394 remainder = orig_path[len(parent):] 1395 child_name = remainder.split("/")[0] 1396 if child_name and child_name not in seen: 1397 seen.add(child_name) 1398 children.append((child_name, "ro", f"Contains mount: {orig_path}")) 1399 1400 for orig_path in self.original_path_mounts_rw: 1401 if orig_path.startswith(parent): 1402 remainder = orig_path[len(parent):] 1403 child_name = remainder.split("/")[0] 1404 if child_name and child_name not in seen: 1405 seen.add(child_name) 1406 children.append((child_name, "rw", f"Contains mount: {orig_path}")) 1407 1408 return children if children else None 1409 1410 def _is_within_boundary(self, path: Path, boundary: Path) -> bool: 1411 """ 1412 Check if a resolved path is within the given boundary. 1413 1414 This prevents path traversal attacks where .. components 1415 could escape the intended directory boundary. 1416 1417 Args: 1418 path: The resolved path to check 1419 boundary: The boundary directory path must stay within 1420 1421 Returns: 1422 True if path is within boundary, False otherwise 1423 """ 1424 try: 1425 # Resolve both paths to handle any symlinks 1426 resolved_path = path.resolve() 1427 resolved_boundary = boundary.resolve() 1428 # Check if path is relative to boundary 1429 resolved_path.relative_to(resolved_boundary) 1430 return True 1431 except ValueError: 1432 return False 1433 1434 def _log_blocked(self, path: str, operation: str, reason: str) -> None: 1435 """Log blocked path access.""" 1436 logger.warning( 1437 f"PATH_VALIDATOR: BLOCKED {operation.upper()} " f"'{path}' - {reason}" 1438 ) 1439 1440 1441 # ============================================================================= 1442 # Session-Scoped Validator Management 1443 # ============================================================================= 1444 1445 # Session-scoped validators (NOT singleton - each session has its own) 1446 _session_validators: dict[str, Ag3ntumPathValidator] = {} 1447 1448 # Session-scoped linux UIDs for file ownership (sandbox user UID per session) 1449 _session_linux_uids: dict[str, int] = {} 1450 1451 1452 def set_session_linux_uid(session_id: str, linux_uid: int) -> None: 1453 """Store the linux_uid (sandbox user UID) for a session.""" 1454 _session_linux_uids[session_id] = linux_uid 1455 logger.debug(f"PATH_VALIDATOR: Set linux_uid={linux_uid} for session {session_id}") 1456 1457 1458 def get_session_linux_uid(session_id: str) -> int | None: 1459 """Get the linux_uid for a session, or None if not set.""" 1460 return _session_linux_uids.get(session_id) 1461 1462 1463 def get_path_validator(session_id: str) -> Ag3ntumPathValidator: 1464 """ 1465 Get the path validator for a session. 1466 1467 Args: 1468 session_id: The session ID 1469 1470 Returns: 1471 The configured Ag3ntumPathValidator for this session 1472 1473 Raises: 1474 RuntimeError: If validator not configured for this session 1475 """ 1476 if session_id not in _session_validators: 1477 raise RuntimeError( 1478 f"PathValidator not configured for session {session_id}. " 1479 "Call configure_path_validator() first." 1480 ) 1481 return _session_validators[session_id] 1482 1483 1484 def configure_path_validator( 1485 session_id: str, 1486 workspace_path: Path, 1487 username: str | None = None, 1488 skills_path: Path | None = None, 1489 global_skills_path: Path | None = None, 1490 user_skills_path: Path | None = None, 1491 global_mounts_ro: dict[str, Path] | None = None, 1492 global_mounts_rw: dict[str, Path] | None = None, 1493 persistent_path: Path | None = None, 1494 user_mounts_ro: dict[str, Path] | None = None, 1495 user_mounts_rw: dict[str, Path] | None = None, 1496 dynamic_mounts_ro: dict[str, Path] | None = None, 1497 dynamic_mounts_rw: dict[str, Path] | None = None, 1498 original_path_mounts_ro: dict[str, Path] | None = None, 1499 original_path_mounts_rw: dict[str, Path] | None = None, 1500 blocklist: list[str] | None = None, 1501 readonly_prefixes: list[str] | None = None, 1502 ) -> Ag3ntumPathValidator: 1503 """ 1504 Configure and return path validator for a session. 1505 1506 This function also configures the SandboxPathResolver for the session, 1507 ensuring both components are available for path handling. 1508 1509 Args: 1510 session_id: The session ID 1511 workspace_path: REAL Docker filesystem path to session workspace 1512 username: Username for this session (extracted from path if not provided) 1513 skills_path: Deprecated, use global_skills_path/user_skills_path 1514 global_skills_path: Path to global skills directory (read-only) 1515 user_skills_path: Path to user skills directory (read-only) 1516 global_mounts_ro: Global read-only mounts {name: container_path} (flattened) 1517 global_mounts_rw: Global read-write mounts {name: container_path} (flattened) 1518 persistent_path: Path to user's persistent storage 1519 user_mounts_ro: Per-user read-only mounts {name: container_path} 1520 user_mounts_rw: Per-user read-write mounts {name: container_path} 1521 dynamic_mounts_ro: Dynamic read-only mounts {alias: container_path} 1522 dynamic_mounts_rw: Dynamic read-write mounts {alias: container_path} 1523 original_path_mounts_ro: Original-path read-only mounts {orig_path: docker_path} 1524 original_path_mounts_rw: Original-path read-write mounts {orig_path: docker_path} 1525 blocklist: Optional list of blocked patterns (defaults to common sensitive files) 1526 readonly_prefixes: Optional list of read-only path prefixes 1527 1528 Returns: 1529 The configured Ag3ntumPathValidator 1530 """ 1531 # Extract username from workspace path if not provided 1532 # Path format: /users/{username}/sessions/{session_id}/workspace 1533 if username is None: 1534 workspace_str = str(workspace_path) 1535 if workspace_str.startswith("/users/"): 1536 parts = workspace_str.split("/") 1537 if len(parts) >= 3: 1538 username = parts[2] 1539 if username is None: 1540 logger.warning( 1541 f"Could not extract username from workspace path: {workspace_path}. " 1542 "SandboxPathResolver will not be configured." 1543 ) 1544 1545 config = PathValidatorConfig( 1546 workspace_path=workspace_path, 1547 skills_path=skills_path, 1548 global_skills_path=global_skills_path, 1549 user_skills_path=user_skills_path, 1550 global_mounts_ro=global_mounts_ro or {}, 1551 global_mounts_rw=global_mounts_rw or {}, 1552 persistent_path=persistent_path, 1553 user_mounts_ro=user_mounts_ro or {}, 1554 user_mounts_rw=user_mounts_rw or {}, 1555 dynamic_mounts_ro=dynamic_mounts_ro or {}, 1556 dynamic_mounts_rw=dynamic_mounts_rw or {}, 1557 original_path_mounts_ro=original_path_mounts_ro or {}, 1558 original_path_mounts_rw=original_path_mounts_rw or {}, 1559 blocklist=blocklist or DEFAULT_BLOCKLIST.copy(), 1560 readonly_prefixes=readonly_prefixes or DEFAULT_READONLY_PREFIXES.copy(), 1561 ) 1562 validator = Ag3ntumPathValidator(config) 1563 _session_validators[session_id] = validator 1564 1565 # Also configure SandboxPathResolver for this session 1566 if username: 1567 try: 1568 configure_sandbox_path_resolver( 1569 session_id=session_id, 1570 username=username, 1571 workspace_docker=str(workspace_path), 1572 global_mounts_ro={k: str(v) for k, v in (global_mounts_ro or {}).items()}, 1573 global_mounts_rw={k: str(v) for k, v in (global_mounts_rw or {}).items()}, 1574 user_mounts_ro={k: str(v) for k, v in (user_mounts_ro or {}).items()}, 1575 user_mounts_rw={k: str(v) for k, v in (user_mounts_rw or {}).items()}, 1576 ) 1577 except Exception as e: 1578 logger.warning(f"Failed to configure SandboxPathResolver: {e}") 1579 1580 # Log mount info if any configured 1581 global_ro_count = len(global_mounts_ro) if global_mounts_ro else 0 1582 global_rw_count = len(global_mounts_rw) if global_mounts_rw else 0 1583 user_ro_count = len(user_mounts_ro) if user_mounts_ro else 0 1584 user_rw_count = len(user_mounts_rw) if user_mounts_rw else 0 1585 orig_ro_count = len(original_path_mounts_ro) if original_path_mounts_ro else 0 1586 orig_rw_count = len(original_path_mounts_rw) if original_path_mounts_rw else 0 1587 1588 logger.info( 1589 f"PATH_VALIDATOR: Configured for session {session_id} " 1590 f"with workspace={workspace_path}, username={username}, " 1591 f"global_mounts={global_ro_count} RO/{global_rw_count} RW, " 1592 f"persistent={persistent_path}, " 1593 f"user_mounts={user_ro_count} RO/{user_rw_count} RW, " 1594 f"original_paths={orig_ro_count} RO/{orig_rw_count} RW" 1595 ) 1596 return validator 1597 1598 1599 def cleanup_path_validator(session_id: str) -> None: 1600 """ 1601 Remove path validator when session ends. 1602 1603 This also cleans up the associated SandboxPathResolver. 1604 1605 Args: 1606 session_id: The session ID to clean up 1607 """ 1608 if session_id in _session_validators: 1609 del _session_validators[session_id] 1610 logger.info(f"PATH_VALIDATOR: Cleaned up validator for session {session_id}") 1611 1612 # Also cleanup session linux_uid 1613 _session_linux_uids.pop(session_id, None) 1614 1615 # Also cleanup SandboxPathResolver 1616 cleanup_sandbox_path_resolver(session_id) 1617 1618 1619 def has_path_validator(session_id: str) -> bool: 1620 """ 1621 Check if a path validator is configured for a session. 1622 1623 Args: 1624 session_id: The session ID to check 1625 1626 Returns: 1627 True if validator is configured, False otherwise 1628 """ 1629 return session_id in _session_validators 1630 1631 1632 # ============================================================================= 1633 # Sandbox Path Resolution Utilities 1634 # ============================================================================= 1635 1636 def get_resolver_for_session(session_id: str) -> Optional[SandboxPathResolver]: 1637 """ 1638 Get the SandboxPathResolver for a session if available. 1639 1640 Args: 1641 session_id: The session ID 1642 1643 Returns: 1644 SandboxPathResolver if configured, None otherwise 1645 """ 1646 if has_sandbox_path_resolver(session_id): 1647 return get_sandbox_path_resolver(session_id) 1648 return None 1649 1650 1651 def sandbox_to_docker_path(session_id: str, sandbox_path: str) -> str: 1652 """ 1653 Convert a sandbox path to Docker path for a session. 1654 1655 This is a convenience function for converting agent-provided paths 1656 (in sandbox format) to Docker filesystem paths. 1657 1658 Args: 1659 session_id: The session ID 1660 sandbox_path: Path in sandbox format (e.g., /workspace/file.txt) 1661 1662 Returns: 1663 Docker filesystem path 1664 1665 Raises: 1666 RuntimeError: If resolver not configured for session 1667 PathResolutionError: If path cannot be resolved 1668 """ 1669 resolver = get_sandbox_path_resolver(session_id) 1670 return resolver.sandbox_to_docker(sandbox_path) 1671 1672 1673 def docker_to_sandbox_path(session_id: str, docker_path: str) -> str: 1674 """ 1675 Convert a Docker path to sandbox path for a session. 1676 1677 This is useful for translating error messages or paths from Docker 1678 processes back to the canonical sandbox format. 1679 1680 Args: 1681 session_id: The session ID 1682 docker_path: Path in Docker format 1683 1684 Returns: 1685 Sandbox path 1686 1687 Raises: 1688 RuntimeError: If resolver not configured for session 1689 PathResolutionError: If path cannot be resolved 1690 """ 1691 resolver = get_sandbox_path_resolver(session_id) 1692 return resolver.docker_to_sandbox(docker_path) 1693 1694 1695 def translate_error_message(session_id: str, error_message: str) -> str: 1696 """ 1697 Translate Docker paths in an error message to sandbox paths. 1698 1699 This makes error messages more user-friendly by showing paths 1700 in the format the agent understands. 1701 1702 Args: 1703 session_id: The session ID 1704 error_message: Error message that may contain Docker paths 1705 1706 Returns: 1707 Error message with Docker paths replaced by sandbox paths 1708 """ 1709 resolver = get_resolver_for_session(session_id) 1710 if resolver: 1711 return resolver.translate_error_paths(error_message) 1712 return error_message 1713 1714 1715 def normalize_sandbox_path(session_id: str, path: str) -> str: 1716 """ 1717 Normalize any path to canonical sandbox format. 1718 1719 Args: 1720 session_id: The session ID 1721 path: Input path (can be relative or absolute) 1722 1723 Returns: 1724 Canonical sandbox path (e.g., /workspace/file.txt) 1725 1726 Raises: 1727 RuntimeError: If resolver not configured for session 1728 PathResolutionError: If path is invalid 1729 """ 1730 resolver = get_sandbox_path_resolver(session_id) 1731 return resolver.normalize(path)