/ src / core / path_validator.py
path_validator.py
   1  """
   2  Unified path validation for all Ag3ntum tools.
   3  
   4  Single source of truth for path normalization, validation, and logging.
   5  All Ag3ntum file tools use this validator before performing operations.
   6  
   7  ARCHITECTURE:
   8  =============
   9  
  10  This module works in conjunction with sandbox_path_resolver.py to provide
  11  a complete path handling solution:
  12  
  13  1. SandboxPathResolver (sandbox_path_resolver.py):
  14     - Defines canonical path format (sandbox paths)
  15     - Provides bidirectional translation (sandbox ↔ Docker)
  16     - Context-aware resolution
  17  
  18  2. Ag3ntumPathValidator (this module):
  19     - Security validation (blocklist, allowlist, boundaries)
  20     - Read-only path enforcement
  21     - Access logging
  22  
  23  EXECUTION CONTEXT:
  24  ==================
  25  
  26  This validator runs in the main Python process, which sees the REAL Docker
  27  filesystem paths (e.g., /users/greg/sessions/xxx/workspace), NOT bwrap mount
  28  paths (/workspace). The agent thinks it's working with /workspace, but we
  29  must translate to real paths for Python file operations.
  30  
  31  Bwrap paths (/workspace) are only visible inside subprocesses launched via
  32  Ag3ntumBash. All other Ag3ntum tools (Ag3ntumRead, Ag3ntumWrite, etc.) run
  33  in the main process and need this validator for security.
  34  
  35  PATH TRANSLATION:
  36  =================
  37  
  38  Agent provides: /workspace/file.txt (sandbox path)
  39  Validator returns: /users/greg/sessions/xxx/workspace/file.txt (Docker path)
  40  
  41  For external mounts:
  42  - /workspace/persistent/* → /users/{user}/ag3ntum/persistent/*
  43  - /workspace/external/ro/* → /mounts/ro/*
  44  - /workspace/external/rw/* → /mounts/rw/*
  45  """
  46  import fnmatch
  47  import logging
  48  import re
  49  import unicodedata
  50  from dataclasses import dataclass
  51  from pathlib import Path, PurePosixPath
  52  from typing import Literal, Optional
  53  
  54  from pydantic import BaseModel, Field
  55  
  56  # Import sandbox path resolver for integrated path handling
  57  from src.core.sandbox_path_resolver import (
  58      SandboxPathResolver,
  59      SandboxPathContext,
  60      configure_sandbox_path_resolver,
  61      cleanup_sandbox_path_resolver,
  62      get_sandbox_path_resolver,
  63      has_sandbox_path_resolver,
  64      PathResolutionError,
  65  )
  66  
  67  logger = logging.getLogger(__name__)
  68  
  69  
  70  # =============================================================================
  71  # Security Constants - Single source of truth for path validation defaults
  72  # =============================================================================
  73  
  74  # Default blocklist patterns for sensitive files (matched against relative paths)
  75  DEFAULT_BLOCKLIST: list[str] = [
  76      "*.env", ".env.*",  # .env, production.env, .env.local, .env.development, etc.
  77      "*.key", ".git/**", "__pycache__/**", "*.pyc",
  78      ".secrets/**", "*.pem", "*.p12", "*.pfx",
  79      "**/node_modules/**",  # Prevent massive directory traversal
  80  ]
  81  
  82  # Exemptions from blocklist — safe template/documentation files that should remain accessible
  83  DEFAULT_BLOCKLIST_EXEMPTIONS: list[str] = [
  84      ".env.example",
  85      ".env.sample",
  86      ".env.template",
  87      ".env.defaults",
  88  ]
  89  
  90  # Default read-only path prefixes (relative to workspace)
  91  # These paths can be read but not written/edited/deleted by the agent
  92  DEFAULT_READONLY_PREFIXES: list[str] = [
  93      "skills/",           # Legacy skills location
  94      ".claude/",          # SDK configuration and skills (SECURITY: prevents skill tampering)
  95      "external/ro/",      # Read-only external mounts
  96      "external/user-ro/", # Per-user read-only mounts
  97  ]
  98  
  99  
 100  # =============================================================================
 101  # Path Sanitizer - Security hardening for external mount filenames
 102  # =============================================================================
 103  
 104  class PathSanitizer:
 105      """
 106      Sanitize filenames from external mounts for security.
 107  
 108      This class provides defense-in-depth against:
 109      - Path traversal attacks (../)
 110      - Null byte injection
 111      - Control character injection
 112      - Unicode normalization attacks
 113      - Windows reserved device names
 114      - Excessively long filenames
 115  
 116      Used primarily for validating filenames in externally mounted folders
 117      where we can't control the file naming conventions.
 118      """
 119  
 120      # Dangerous filename patterns to reject
 121      DANGEROUS_PATTERNS = [
 122          r"\.\.[\\/]",           # Path traversal (../ or ..\)
 123          r"^\.\.?$",             # Current/parent dir references
 124          r"[\x00-\x1f]",         # Control characters (ASCII 0-31)
 125          r"[<>:\"|?*]",          # Windows reserved characters
 126          r"^(con|prn|aux|nul|com\d|lpt\d)(\..*)?$",  # Windows device names
 127      ]
 128  
 129      # Zero-width and invisible unicode characters that could hide content
 130      INVISIBLE_CHARS = [
 131          "\u200b",  # Zero-width space
 132          "\u200c",  # Zero-width non-joiner
 133          "\u200d",  # Zero-width joiner
 134          "\ufeff",  # Byte order mark
 135          "\u00ad",  # Soft hyphen
 136          "\u2060",  # Word joiner
 137          "\u2061",  # Function application
 138          "\u2062",  # Invisible times
 139          "\u2063",  # Invisible separator
 140          "\u2064",  # Invisible plus
 141      ]
 142  
 143      # Max filename length (common filesystem limit)
 144      MAX_FILENAME_LENGTH = 255
 145  
 146      @classmethod
 147      def sanitize_filename(cls, filename: str, raise_on_error: bool = True) -> str:
 148          """
 149          Sanitize a filename, optionally raising error if dangerous.
 150  
 151          Args:
 152              filename: The filename to sanitize
 153              raise_on_error: If True, raise PathValidationError for dangerous names.
 154                             If False, return sanitized version.
 155  
 156          Returns:
 157              Sanitized filename
 158  
 159          Raises:
 160              PathValidationError: If filename is dangerous and raise_on_error=True
 161          """
 162          if not filename:
 163              if raise_on_error:
 164                  raise PathValidationError(
 165                      "Empty filename",
 166                      path=filename,
 167                      reason="Filename cannot be empty",
 168                  )
 169              return ""
 170  
 171          original = filename
 172  
 173          # Normalize unicode to NFC form (canonical composition)
 174          # This prevents homograph attacks using visually similar characters
 175          try:
 176              filename = unicodedata.normalize("NFC", filename)
 177          except Exception:
 178              pass
 179  
 180          # Remove invisible/zero-width characters
 181          for char in cls.INVISIBLE_CHARS:
 182              filename = filename.replace(char, "")
 183  
 184          # Check for dangerous patterns
 185          for pattern in cls.DANGEROUS_PATTERNS:
 186              if re.search(pattern, filename, re.IGNORECASE):
 187                  if raise_on_error:
 188                      raise PathValidationError(
 189                          f"Dangerous filename pattern detected: {original!r}",
 190                          path=original,
 191                          reason="DANGEROUS_FILENAME",
 192                      )
 193                  # For non-raising mode, remove the dangerous part
 194                  filename = re.sub(pattern, "_", filename, flags=re.IGNORECASE)
 195  
 196          # Check length (after normalization)
 197          if len(filename.encode("utf-8")) > cls.MAX_FILENAME_LENGTH:
 198              if raise_on_error:
 199                  raise PathValidationError(
 200                      f"Filename too long ({len(filename)} chars): {filename[:50]}...",
 201                      path=original,
 202                      reason="FILENAME_TOO_LONG",
 203                  )
 204              # Truncate to max length while preserving extension if possible
 205              if "." in filename:
 206                  name, ext = filename.rsplit(".", 1)
 207                  max_name_len = cls.MAX_FILENAME_LENGTH - len(ext) - 1
 208                  filename = name[:max_name_len] + "." + ext
 209              else:
 210                  filename = filename[: cls.MAX_FILENAME_LENGTH]
 211  
 212          return filename
 213  
 214      @classmethod
 215      def validate_path_components(cls, path: Path) -> None:
 216          """
 217          Validate all components of a path.
 218  
 219          Args:
 220              path: The path to validate
 221  
 222          Raises:
 223              PathValidationError: If any component is dangerous
 224          """
 225          for component in path.parts:
 226              if component not in ("/", ""):
 227                  cls.sanitize_filename(component, raise_on_error=True)
 228  
 229      @classmethod
 230      def has_null_bytes(cls, path: str) -> bool:
 231          """Check if path contains null bytes."""
 232          return "\x00" in path
 233  
 234      @classmethod
 235      def has_path_traversal(cls, path: str) -> bool:
 236          """Check if path contains traversal attempts."""
 237          # Normalize path separators
 238          normalized = path.replace("\\", "/")
 239          parts = normalized.split("/")
 240          return any(part == ".." for part in parts)
 241  
 242  
 243  class PathValidatorConfig(BaseModel):
 244      """
 245      Configuration for path validation.
 246  
 247      IMPORTANT: This uses REAL Docker filesystem paths, not bwrap mount paths.
 248      PathValidator runs in the main Python process, which sees the full Docker
 249      filesystem. Bwrap paths (/workspace) are only visible inside subprocesses.
 250      """
 251  
 252      # REAL path to session workspace (e.g., /users/greg/sessions/xxx/workspace)
 253      workspace_path: Path = Field(
 254          description="Actual filesystem path to session workspace (required)"
 255      )
 256      # REAL path to skills directory (legacy, unused - use global/user skills paths)
 257      skills_path: Path | None = Field(
 258          default=None, description="Deprecated: use global_skills_path/user_skills_path"
 259      )
 260      # REAL path to global skills directory (e.g., /skills/.claude/skills)
 261      global_skills_path: Path | None = Field(
 262          default=None, description="Path to global skills directory (read-only)"
 263      )
 264      # REAL path to user skills directory (e.g., /users/username/.claude/skills)
 265      user_skills_path: Path | None = Field(
 266          default=None, description="Path to user skills directory (read-only)"
 267      )
 268  
 269      # =========================================================================
 270      # EXTERNAL MOUNT PATHS - Host folders mounted via run.sh (flattened structure)
 271      # =========================================================================
 272      # These are Docker container paths (not bwrap paths).
 273      # With flattened mount structure, all mounts are at /mounts/{name}
 274      # Agent sees: /workspace/external/ro/* -> Real path: /mounts/{name}
 275      # Agent sees: /workspace/external/rw/* -> Real path: /mounts/{name}
 276      # Agent sees: /workspace/persistent/* -> Real path: /users/{username}/ag3ntum/persistent/*
 277  
 278      # Global mounts from external-mounts.yaml global section
 279      global_mounts_ro: dict[str, Path] = Field(
 280          default_factory=dict,
 281          description="Global read-only mounts: {name: container_path}"
 282      )
 283      global_mounts_rw: dict[str, Path] = Field(
 284          default_factory=dict,
 285          description="Global read-write mounts: {name: container_path}"
 286      )
 287      persistent_path: Path | None = Field(
 288          default=None,
 289          description="Path to user's persistent storage (/users/{username}/ag3ntum/persistent)"
 290      )
 291  
 292      # =========================================================================
 293      # PER-USER MOUNT PATHS - User-specific external mounts
 294      # =========================================================================
 295      # These are configured via external-mounts.yaml per_user section.
 296      # With flattened structure, mounts appear at /mounts/{name}
 297      # Agent sees: /workspace/external/user-ro/{name}/* -> Real path: /mounts/{name}/*
 298      # Agent sees: /workspace/external/user-rw/{name}/* -> Real path: /mounts/{name}/*
 299  
 300      user_mounts_ro: dict[str, Path] = Field(
 301          default_factory=dict,
 302          description="Per-user read-only mounts: {name: container_path}"
 303      )
 304      user_mounts_rw: dict[str, Path] = Field(
 305          default_factory=dict,
 306          description="Per-user read-write mounts: {name: container_path}"
 307      )
 308  
 309      # =========================================================================
 310      # DYNAMIC MOUNT PATHS - Session-time user-selected mounts
 311      # =========================================================================
 312      # These are configured via API at session creation time.
 313      # Agent sees: ./{alias}/* via symlinks at workspace root
 314      # Real path: /mounts/{base}/{subpath}/* (flattened structure)
 315      # The symlinks are created at workspace/{alias} pointing to /mounts/{base}/{subpath}
 316  
 317      dynamic_mounts_ro: dict[str, Path] = Field(
 318          default_factory=dict,
 319          description="Dynamic read-only mounts for this session: {alias: container_path}"
 320      )
 321      dynamic_mounts_rw: dict[str, Path] = Field(
 322          default_factory=dict,
 323          description="Dynamic read-write mounts for this session: {alias: container_path}"
 324      )
 325  
 326      # =========================================================================
 327      # ORIGINAL-PATH MOUNTS - Access paths at their original locations
 328      # =========================================================================
 329      # These allow accessing paths like /var/log at /var/log (not via workspace).
 330      # Docker mounts them at /mounts/paths/{encoded}, and bubblewrap bind-mounts
 331      # them to their original locations inside the sandbox.
 332      # For file tools in the main Python process, we translate original paths
 333      # to Docker paths: /var/log -> /mounts/paths/_var_log
 334  
 335      original_path_mounts_ro: dict[str, Path] = Field(
 336          default_factory=dict,
 337          description="Original-path read-only mounts: {original_path: docker_path}"
 338      )
 339      original_path_mounts_rw: dict[str, Path] = Field(
 340          default_factory=dict,
 341          description="Original-path read-write mounts: {original_path: docker_path}"
 342      )
 343  
 344      log_all_access: bool = Field(
 345          default=True, description="Log all path access attempts"
 346      )
 347      blocklist: list[str] = Field(
 348          default_factory=lambda: DEFAULT_BLOCKLIST.copy(),
 349          description="Glob patterns to block even within workspace",
 350      )
 351      blocklist_exemptions: list[str] = Field(
 352          default_factory=lambda: DEFAULT_BLOCKLIST_EXEMPTIONS.copy(),
 353          description="Filename patterns exempt from blocklist (e.g., .env.example)",
 354      )
 355      allowlist: list[str] | None = Field(
 356          default=None, description="If set, only these patterns are allowed"
 357      )
 358      readonly_prefixes: list[str] = Field(
 359          default_factory=lambda: DEFAULT_READONLY_PREFIXES.copy(),
 360          description="Path prefixes (relative to workspace) that are read-only",
 361      )
 362  
 363  
 364  @dataclass
 365  class ValidatedPath:
 366      """Result of path validation."""
 367  
 368      original: str
 369      normalized: Path
 370      is_readonly: bool = False
 371  
 372  
 373  class PathValidationError(Exception):
 374      """Raised when path validation fails."""
 375  
 376      def __init__(self, message: str, path: str, reason: str):
 377          super().__init__(message)
 378          self.path = path
 379          self.reason = reason
 380  
 381  
 382  class Ag3ntumPathValidator:
 383      """
 384      Centralized path validation for all Ag3ntum tools.
 385  
 386      IMPORTANT: This runs in the main Python process, NOT inside bwrap.
 387      It sees the REAL Docker filesystem paths, not bwrap mount paths.
 388  
 389      Responsibilities:
 390          1. Normalize paths: ./foo, /workspace/foo, foo -> /users/greg/sessions/xxx/workspace/foo
 391          2. Validate paths are within workspace boundary
 392          3. Check blocklist/allowlist patterns
 393          4. Identify read-only paths (skills)
 394          5. Log all access attempts
 395      """
 396  
 397      def __init__(self, config: PathValidatorConfig):
 398          """
 399          Initialize with session-specific configuration.
 400  
 401          Args:
 402              config: Must include workspace_path (the REAL path in Docker filesystem)
 403          """
 404          self.config = config
 405          self.workspace = config.workspace_path.resolve()  # REAL Docker path
 406          self.skills = config.skills_path.resolve() if config.skills_path else None
 407          # Additional read-only paths for skills access
 408          self.global_skills = config.global_skills_path.resolve() if config.global_skills_path else None
 409          self.user_skills = config.user_skills_path.resolve() if config.user_skills_path else None
 410  
 411          # External mount paths (flattened structure: all at /mounts/{name})
 412          # Agent sees: /workspace/external/ro/* -> Real path: /mounts/{name}
 413          # Agent sees: /workspace/external/rw/* -> Real path: /mounts/{name}
 414          self.global_mounts_ro: dict[str, Path] = {
 415              name: path.resolve() for name, path in config.global_mounts_ro.items()
 416          }
 417          self.global_mounts_rw: dict[str, Path] = {
 418              name: path.resolve() for name, path in config.global_mounts_rw.items()
 419          }
 420          # Agent sees: /workspace/persistent/* -> Real path: /users/{username}/ag3ntum/persistent/*
 421          self.persistent = config.persistent_path.resolve() if config.persistent_path else None
 422  
 423          # Per-user mount paths (resolved at session start, flattened structure)
 424          # Agent sees: /workspace/external/user-ro/{name}/* -> Real path: /mounts/{name}/*
 425          # Agent sees: /workspace/external/user-rw/{name}/* -> Real path: /mounts/{name}/*
 426          self.user_mounts_ro: dict[str, Path] = {
 427              name: path.resolve() for name, path in config.user_mounts_ro.items()
 428          }
 429          self.user_mounts_rw: dict[str, Path] = {
 430              name: path.resolve() for name, path in config.user_mounts_rw.items()
 431          }
 432  
 433          # Dynamic mount paths (configured per-session via API, flattened structure)
 434          # Agent sees: ./{alias}/* via symlinks -> Real path: /mounts/{base}/*
 435          self.dynamic_mounts_ro: dict[str, Path] = {
 436              alias: path.resolve() for alias, path in config.dynamic_mounts_ro.items()
 437          }
 438          self.dynamic_mounts_rw: dict[str, Path] = {
 439              alias: path.resolve() for alias, path in config.dynamic_mounts_rw.items()
 440          }
 441  
 442          # Original-path mounts (access paths at original locations)
 443          # Agent sees: /var/log/* -> Docker path: /mounts/paths/_var_log/*
 444          # The key is the original path, the value is the Docker path
 445          self.original_path_mounts_ro: dict[str, Path] = {
 446              orig: docker.resolve() for orig, docker in config.original_path_mounts_ro.items()
 447          }
 448          self.original_path_mounts_rw: dict[str, Path] = {
 449              orig: docker.resolve() for orig, docker in config.original_path_mounts_rw.items()
 450          }
 451  
 452          # Extract session context from workspace path for cross-user/cross-session blocking
 453          # Path format: .../users/{username}/sessions/{session_id}/workspace
 454          # Note: /users/ may appear anywhere in path (e.g., /tmp/test/users/... in tests)
 455          self._session_username: str | None = None
 456          self._session_id: str | None = None
 457          workspace_str = str(config.workspace_path)
 458          users_idx = workspace_str.find("/users/")
 459          if users_idx >= 0:
 460              # Extract the portion starting from /users/
 461              users_path = workspace_str[users_idx:]
 462              parts = users_path.split("/")
 463              # parts[0] = "", parts[1] = "users", parts[2] = username, ...
 464              if len(parts) >= 3:
 465                  self._session_username = parts[2]
 466              if len(parts) >= 5 and parts[3] == "sessions":
 467                  self._session_id = parts[4]
 468  
 469      def docker_to_display_path(self, docker_path: Path) -> str:
 470          """
 471          Convert a Docker internal path back to an agent-visible display path.
 472  
 473          Used by LS, Glob, Grep tools to show user-friendly paths instead of
 474          raw Docker internal paths (e.g., /mounts/global_var_log/apt/).
 475  
 476          Translation priority:
 477              1. Workspace-relative (e.g., ./src/main.py → src/main.py)
 478              2. Persistent storage (e.g., /users/.../persistent/x → persistent/x)
 479              3. Global RO mounts (e.g., /mounts/name/x → external/ro/name/x)
 480              4. Global RW mounts (e.g., /mounts/name/x → external/rw/name/x)
 481              5. Per-user RO mounts → external/user-ro/name/x
 482              6. Per-user RW mounts → external/user-rw/name/x
 483              7. Dynamic RO mounts → dynamic/alias/x
 484              8. Dynamic RW mounts → dynamic/alias/x
 485              9. Original-path mounts → /original/path/x
 486             10. Fallback: return str(docker_path)
 487  
 488          Args:
 489              docker_path: Docker filesystem path (may be unresolved/symlinked)
 490  
 491          Returns:
 492              Agent-visible display path string
 493          """
 494          # 1. Workspace-relative: try WITHOUT resolving first to preserve symlink names
 495          try:
 496              return str(docker_path.relative_to(self.workspace))
 497          except ValueError:
 498              pass
 499  
 500          # For mount paths, resolve to follow symlinks and match mount boundaries
 501          resolved = docker_path.resolve()
 502  
 503          # Also try workspace-relative with resolved path (for paths reached via symlinks)
 504          try:
 505              return str(resolved.relative_to(self.workspace))
 506          except ValueError:
 507              pass
 508  
 509          # 2. Persistent storage
 510          if self.persistent:
 511              try:
 512                  rel = resolved.relative_to(self.persistent)
 513                  return f"persistent/{rel}" if str(rel) != "." else "persistent"
 514              except ValueError:
 515                  pass
 516  
 517          # 3-4. Global mounts
 518          for name, mount_path in self.global_mounts_ro.items():
 519              try:
 520                  rel = resolved.relative_to(mount_path)
 521                  return f"external/ro/{name}/{rel}" if str(rel) != "." else f"external/ro/{name}"
 522              except ValueError:
 523                  pass
 524  
 525          for name, mount_path in self.global_mounts_rw.items():
 526              try:
 527                  rel = resolved.relative_to(mount_path)
 528                  return f"external/rw/{name}/{rel}" if str(rel) != "." else f"external/rw/{name}"
 529              except ValueError:
 530                  pass
 531  
 532          # 5-6. Per-user mounts
 533          for name, mount_path in self.user_mounts_ro.items():
 534              try:
 535                  rel = resolved.relative_to(mount_path)
 536                  return f"external/user-ro/{name}/{rel}" if str(rel) != "." else f"external/user-ro/{name}"
 537              except ValueError:
 538                  pass
 539  
 540          for name, mount_path in self.user_mounts_rw.items():
 541              try:
 542                  rel = resolved.relative_to(mount_path)
 543                  return f"external/user-rw/{name}/{rel}" if str(rel) != "." else f"external/user-rw/{name}"
 544              except ValueError:
 545                  pass
 546  
 547          # 7-8. Dynamic mounts
 548          for alias, mount_path in self.dynamic_mounts_ro.items():
 549              try:
 550                  rel = resolved.relative_to(mount_path)
 551                  return f"dynamic/{alias}/{rel}" if str(rel) != "." else f"dynamic/{alias}"
 552              except ValueError:
 553                  pass
 554  
 555          for alias, mount_path in self.dynamic_mounts_rw.items():
 556              try:
 557                  rel = resolved.relative_to(mount_path)
 558                  return f"dynamic/{alias}/{rel}" if str(rel) != "." else f"dynamic/{alias}"
 559              except ValueError:
 560                  pass
 561  
 562          # 9. Original-path mounts (reverse: Docker path → original host path)
 563          for orig_path, docker_mount in self.original_path_mounts_ro.items():
 564              try:
 565                  rel = resolved.relative_to(docker_mount)
 566                  return f"{orig_path}/{rel}" if str(rel) != "." else orig_path
 567              except ValueError:
 568                  pass
 569  
 570          for orig_path, docker_mount in self.original_path_mounts_rw.items():
 571              try:
 572                  rel = resolved.relative_to(docker_mount)
 573                  return f"{orig_path}/{rel}" if str(rel) != "." else orig_path
 574              except ValueError:
 575                  pass
 576  
 577          # 10. Fallback
 578          return str(docker_path)
 579  
 580      def validate_path(
 581          self,
 582          path: str,
 583          operation: Literal["read", "write", "edit", "delete", "list", "glob", "grep"],
 584          allow_directory: bool = False,
 585      ) -> ValidatedPath:
 586          """
 587          Validate and normalize a path for the given operation.
 588  
 589          Args:
 590              path: User-provided path (relative or /workspace/... style)
 591              operation: Type of operation (affects read-only check)
 592              allow_directory: Whether directories are valid (for ls, glob)
 593  
 594          Returns:
 595              ValidatedPath with normalized path
 596  
 597          Raises:
 598              PathValidationError: If path is invalid or blocked
 599          """
 600          original = path
 601  
 602          # Step 1: Normalize the path
 603          try:
 604              normalized = self._normalize_path(path)
 605          except Exception as e:
 606              self._log_blocked(path, operation, f"Normalization failed: {e}")
 607              raise PathValidationError(
 608                  f"Invalid path: {path}",
 609                  path=path,
 610                  reason=f"Path normalization failed: {e}",
 611              )
 612  
 613          # Step 1.5: SECURITY - Block cross-user and cross-session access FIRST
 614          # This prevents agents from accessing other users' or other sessions' data
 615          # Must run before boundary check to give specific error messages
 616          norm_str = str(normalized)
 617  
 618          # Cross-user access blocking
 619          if self._session_username and "/users/" in norm_str:
 620              path_username = self._extract_path_component(norm_str, "/users/")
 621              if path_username and path_username != self._session_username:
 622                  # Check if this is an allowed exception (e.g., skills)
 623                  is_allowed = (
 624                      (self.global_skills and self._is_within_boundary(normalized, self.global_skills)) or
 625                      (self.user_skills and self._is_within_boundary(normalized, self.user_skills))
 626                  )
 627                  if not is_allowed:
 628                      self._log_blocked(path, operation, f"Cross-user access blocked: {path_username}")
 629                      raise PathValidationError(
 630                          f"Access to other users' directories is not allowed: {path}",
 631                          path=path,
 632                          reason="CROSS_USER_ACCESS_BLOCKED",
 633                      )
 634  
 635          # Cross-session access blocking (same user, different session)
 636          if self._session_username and self._session_id:
 637              sessions_pattern = f"/users/{self._session_username}/sessions/"
 638              if sessions_pattern in norm_str:
 639                  path_session_id = self._extract_path_component(norm_str, sessions_pattern)
 640                  if path_session_id and path_session_id != self._session_id:
 641                      self._log_blocked(path, operation, f"Cross-session access blocked: {path_session_id}")
 642                      raise PathValidationError(
 643                          f"Access to other sessions is not allowed: {path}",
 644                          path=path,
 645                          reason="CROSS_SESSION_ACCESS_BLOCKED",
 646                      )
 647  
 648          # Step 2: Check boundary (workspace, skills, or external mount directories)
 649          # Paths can be within:
 650          # - Workspace (read-write for most, read-only for some prefixes)
 651          # - Global skills directory (read-only)
 652          # - User skills directory (read-only)
 653          # - External RO mounts (read-only)
 654          # - External RW mounts (read-write)
 655          # - Persistent storage (read-write)
 656          # - Per-user RO mounts (read-only)
 657          # - Per-user RW mounts (read-write)
 658          in_workspace = False
 659          in_global_skills = False
 660          in_user_skills = False
 661          in_external_ro = False
 662          in_external_rw = False
 663          in_persistent = False
 664          in_user_ro = False
 665          in_user_rw = False
 666          rel_path = ""
 667  
 668          try:
 669              rel_path = str(normalized.relative_to(self.workspace))
 670              in_workspace = True
 671          except ValueError:
 672              pass
 673  
 674          if not in_workspace and self.global_skills:
 675              try:
 676                  rel_path = str(normalized.relative_to(self.global_skills))
 677                  in_global_skills = True
 678              except ValueError:
 679                  pass
 680  
 681          if not in_workspace and not in_global_skills and self.user_skills:
 682              try:
 683                  rel_path = str(normalized.relative_to(self.user_skills))
 684                  in_user_skills = True
 685              except ValueError:
 686                  pass
 687  
 688          # Check global external mount boundaries (flattened structure)
 689          if not in_workspace and not in_global_skills and not in_user_skills:
 690              # Check global RO mounts
 691              for mount_name, mount_path in self.global_mounts_ro.items():
 692                  try:
 693                      rel_path = str(normalized.relative_to(mount_path))
 694                      in_external_ro = True
 695                      break
 696                  except ValueError:
 697                      pass
 698  
 699          if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro:
 700              # Check global RW mounts
 701              for mount_name, mount_path in self.global_mounts_rw.items():
 702                  try:
 703                      rel_path = str(normalized.relative_to(mount_path))
 704                      in_external_rw = True
 705                      break
 706                  except ValueError:
 707                      pass
 708  
 709          if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw:
 710              if self.persistent:
 711                  try:
 712                      rel_path = str(normalized.relative_to(self.persistent))
 713                      in_persistent = True
 714                  except ValueError:
 715                      pass
 716  
 717          # Check per-user mount boundaries
 718          if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent:
 719              # Check per-user RO mounts
 720              for mount_name, mount_path in self.user_mounts_ro.items():
 721                  try:
 722                      rel_path = str(normalized.relative_to(mount_path))
 723                      in_user_ro = True
 724                      break
 725                  except ValueError:
 726                      pass
 727  
 728          if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent and not in_user_ro:
 729              # Check per-user RW mounts
 730              for mount_name, mount_path in self.user_mounts_rw.items():
 731                  try:
 732                      rel_path = str(normalized.relative_to(mount_path))
 733                      in_user_rw = True
 734                      break
 735                  except ValueError:
 736                      pass
 737  
 738          # Check dynamic mount boundaries (session-time user-selected mounts)
 739          in_dynamic_ro = False
 740          in_dynamic_rw = False
 741  
 742          if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent and not in_user_ro and not in_user_rw:
 743              # Check dynamic RO mounts
 744              for alias, mount_path in self.dynamic_mounts_ro.items():
 745                  try:
 746                      rel_path = str(normalized.relative_to(mount_path))
 747                      in_dynamic_ro = True
 748                      break
 749                  except ValueError:
 750                      pass
 751  
 752          if not in_workspace and not in_global_skills and not in_user_skills and not in_external_ro and not in_external_rw and not in_persistent and not in_user_ro and not in_user_rw and not in_dynamic_ro:
 753              # Check dynamic RW mounts
 754              for alias, mount_path in self.dynamic_mounts_rw.items():
 755                  try:
 756                      rel_path = str(normalized.relative_to(mount_path))
 757                      in_dynamic_rw = True
 758                      break
 759                  except ValueError:
 760                      pass
 761  
 762          # Check original-path mount boundaries
 763          # Original-path mounts allow access to paths like /var/log at their original locations
 764          in_original_ro = False
 765          in_original_rw = False
 766  
 767          not_in_any_yet = not (
 768              in_workspace or in_global_skills or in_user_skills or
 769              in_external_ro or in_external_rw or in_persistent or
 770              in_user_ro or in_user_rw or in_dynamic_ro or in_dynamic_rw
 771          )
 772          if not_in_any_yet:
 773              # Check original-path RO mounts
 774              for orig_path, docker_path in self.original_path_mounts_ro.items():
 775                  try:
 776                      rel_path = str(normalized.relative_to(docker_path))
 777                      in_original_ro = True
 778                      break
 779                  except ValueError:
 780                      pass
 781  
 782          if not_in_any_yet and not in_original_ro:
 783              # Check original-path RW mounts
 784              for orig_path, docker_path in self.original_path_mounts_rw.items():
 785                  try:
 786                      rel_path = str(normalized.relative_to(docker_path))
 787                      in_original_rw = True
 788                      break
 789                  except ValueError:
 790                      pass
 791  
 792          in_any_allowed = (
 793              in_workspace or in_global_skills or in_user_skills or
 794              in_external_ro or in_external_rw or in_persistent or
 795              in_user_ro or in_user_rw or in_dynamic_ro or in_dynamic_rw or
 796              in_original_ro or in_original_rw
 797          )
 798  
 799          if not in_any_allowed:
 800              self._log_blocked(path, operation, "Outside allowed directories")
 801              raise PathValidationError(
 802                  f"Path outside allowed directories: {path}",
 803                  path=path,
 804                  reason="Path must be within workspace, skills, or external mount directories",
 805              )
 806  
 807          # Step 3: Check for path traversal attempts
 808          if ".." in path:
 809              # Even if normalized path is valid, log the attempt
 810              logger.warning(f"PATH_VALIDATOR: Traversal attempt in path: {path}")
 811  
 812          # Step 4: Check blocklist (workspace and external mount paths)
 813          # Security: blocklist applies to all areas to prevent accessing sensitive files
 814          should_check_blocklist = (
 815              in_workspace or in_external_ro or in_external_rw or in_persistent or
 816              in_user_ro or in_user_rw or in_dynamic_ro or in_dynamic_rw or
 817              in_original_ro or in_original_rw
 818          )
 819          if should_check_blocklist:
 820              # Check exemptions first — safe template files bypass the blocklist
 821              filename = normalized.name
 822              is_exempt = any(
 823                  fnmatch.fnmatch(filename, exempt)
 824                  for exempt in self.config.blocklist_exemptions
 825              )
 826  
 827              if not is_exempt:
 828                  for pattern in self.config.blocklist:
 829                      if fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch(
 830                          filename, pattern
 831                      ):
 832                          self._log_blocked(
 833                              path, operation, f"Matches blocklist pattern: {pattern}"
 834                          )
 835                          raise PathValidationError(
 836                              f"Path blocked by policy: {path}",
 837                              path=path,
 838                              reason=f"BLOCKLIST: Matches pattern: {pattern}",
 839                          )
 840  
 841          # Step 5: Check allowlist (if configured, only for workspace paths)
 842          if in_workspace and self.config.allowlist is not None:
 843              allowed = False
 844              for pattern in self.config.allowlist:
 845                  if fnmatch.fnmatch(rel_path, pattern):
 846                      allowed = True
 847                      break
 848              if not allowed:
 849                  self._log_blocked(path, operation, "Not in allowlist")
 850                  raise PathValidationError(
 851                      f"Path not in allowlist: {path}",
 852                      path=path,
 853                      reason="Path does not match any allowed pattern",
 854                  )
 855  
 856          # Step 6: Check if read-only
 857          # Read-only areas:
 858          # - Skills directories (global and user) are always read-only
 859          # - External RO mounts are always read-only
 860          # - Per-user RO mounts are always read-only
 861          # - Dynamic RO mounts are always read-only
 862          # - Original-path RO mounts are always read-only
 863          # - Workspace paths may have readonly_prefixes
 864          is_readonly = in_global_skills or in_user_skills or in_external_ro or in_user_ro or in_dynamic_ro or in_original_ro
 865  
 866          if in_workspace and not is_readonly:
 867              is_readonly = any(
 868                  rel_path.startswith(ro_prefix.rstrip("/"))
 869                  for ro_prefix in self.config.readonly_prefixes
 870              )
 871  
 872          if is_readonly and operation in ("write", "edit", "delete"):
 873              # Provide helpful error message for external RO mounts
 874              if in_external_ro or in_user_ro or in_dynamic_ro or in_original_ro:
 875                  self._log_blocked(path, operation, "Read-only external mount")
 876                  raise PathValidationError(
 877                      f"Cannot {operation} read-only mount: {path}",
 878                      path=path,
 879                      reason="Mount is read-only (external mount, per-user ro, dynamic ro, or original-path ro)",
 880                  )
 881              else:
 882                  self._log_blocked(path, operation, "Read-only path")
 883                  raise PathValidationError(
 884                      f"Cannot {operation} read-only path: {path}",
 885                      path=path,
 886                      reason="Path is read-only",
 887                  )
 888  
 889          # Log success
 890          self._log_allowed(original, normalized, operation)
 891  
 892          return ValidatedPath(
 893              original=original,
 894              normalized=normalized,
 895              is_readonly=is_readonly,
 896          )
 897  
 898      def _normalize_path(self, path: str) -> Path:
 899          """
 900          Normalize agent-provided path to REAL Docker filesystem path.
 901  
 902          The agent thinks it's working with bwrap paths:
 903          - /workspace/foo.txt -> becomes /users/greg/sessions/xxx/workspace/foo.txt
 904          - ./foo.txt -> becomes /users/greg/sessions/xxx/workspace/foo.txt
 905          - foo.txt -> becomes /users/greg/sessions/xxx/workspace/foo.txt
 906  
 907          External mount paths are translated as:
 908          - /workspace/external/ro/{name}/file -> /mounts/ro/{name}/file
 909          - /workspace/external/rw/{name}/file -> /mounts/rw/{name}/file
 910          - /workspace/persistent/file -> /users/{username}/ag3ntum/persistent/file
 911          - ./external/ro/{name}/file -> same translations
 912          - ./persistent/file -> same translation
 913  
 914          This translation is critical because the Python file tools run OUTSIDE bwrap
 915          and need the real Docker filesystem paths.
 916          """
 917          p = PurePosixPath(path)
 918          path_str = str(p)
 919  
 920          # First, normalize relative paths that reference external mounts or persistent
 921          # NOTE: Dynamic mounts are now at workspace root as symlinks (e.g., ./logs/ instead of ./dynamic/logs/)
 922          # and are resolved automatically via standard workspace path handling below.
 923          if not p.is_absolute():
 924              # Check if it's a relative external path like ./external/ro/... or ./persistent/...
 925              if path_str.startswith("./external/") or path_str.startswith("external/"):
 926                  # Convert to absolute bwrap-style path
 927                  path_str = "/workspace/" + path_str.lstrip("./")
 928                  p = PurePosixPath(path_str)
 929              # Handle persistent paths - with or without trailing slash
 930              # PurePosixPath normalizes "./persistent/" to "persistent" (strips ./ and trailing /)
 931              # So we need to check for: ./persistent, ./persistent/, persistent, persistent/
 932              elif (
 933                  path_str == "./persistent" or path_str == "persistent" or
 934                  path_str.startswith("./persistent/") or path_str.startswith("persistent/")
 935              ):
 936                  # Convert to absolute bwrap-style path
 937                  path_str = "/workspace/" + path_str.lstrip("./")
 938                  p = PurePosixPath(path_str)
 939  
 940          # Handle absolute /persistent/* path (bwrap sandbox internal path)
 941          # /persistent/* -> /users/{username}/ag3ntum/persistent/*
 942          # This is the path format agents see inside bwrap sandbox
 943          if path_str.startswith("/persistent/") or path_str == "/persistent":
 944              relative = path_str[len("/persistent/"):] if path_str != "/persistent" else ""
 945              return self._resolve_persistent_path(path_str, relative, path)
 946  
 947          # Handle agent paths that reference persistent storage (at workspace root, not under external/)
 948          # /workspace/persistent/* -> /users/{username}/ag3ntum/persistent/*
 949          if path_str.startswith("/workspace/persistent/") or path_str == "/workspace/persistent":
 950              if self.persistent:
 951                  relative = path_str[len("/workspace/persistent/"):] if path_str != "/workspace/persistent" else ""
 952                  return self._resolve_persistent_path(path_str, relative, path)
 953              # Persistent not configured, treat as workspace path
 954              relative_to_workspace = path_str[len("/workspace"):].lstrip("/")
 955              return (self.workspace / relative_to_workspace).resolve()
 956  
 957          # Handle agent paths that reference external mounts
 958          if path_str.startswith("/workspace/external/"):
 959              # Extract the part after /workspace/external/
 960              external_part = path_str[len("/workspace/external/"):]
 961  
 962              # Route to correct external mount (flattened structure: /mounts/{name})
 963              if external_part.startswith("ro/"):
 964                  # Read-only external mount: /workspace/external/ro/{name}/* -> /mounts/{name}/*
 965                  relative = external_part[3:]  # Remove "ro/"
 966  
 967                  # Extract mount name (first path component)
 968                  if "/" in relative:
 969                      mount_name, mount_relative = relative.split("/", 1)
 970                  else:
 971                      mount_name = relative
 972                      mount_relative = ""
 973  
 974                  # Check global RO mounts first
 975                  if mount_name in self.global_mounts_ro:
 976                      mount_path = self.global_mounts_ro[mount_name]
 977                      if mount_relative:
 978                          resolved = (mount_path / mount_relative).resolve()
 979                      else:
 980                          resolved = mount_path.resolve()
 981                      # Security: verify resolved path stays within boundary
 982                      if not self._is_within_boundary(resolved, mount_path):
 983                          raise PathValidationError(
 984                              f"Path traversal detected: {path}",
 985                              path=path,
 986                              reason="PATH_TRAVERSAL: Resolved path escapes global-ro mount boundary",
 987                          )
 988                      return resolved
 989                  # Fallback to user mounts for backward compatibility
 990                  elif mount_name in self.user_mounts_ro:
 991                      mount_path = self.user_mounts_ro[mount_name]
 992                      if mount_relative:
 993                          resolved = (mount_path / mount_relative).resolve()
 994                      else:
 995                          resolved = mount_path.resolve()
 996                      # Security: verify resolved path stays within boundary
 997                      if not self._is_within_boundary(resolved, mount_path):
 998                          raise PathValidationError(
 999                              f"Path traversal detected: {path}",
1000                              path=path,
1001                              reason="PATH_TRAVERSAL: Resolved path escapes user-ro mount boundary",
1002                          )
1003                      return resolved
1004                  else:
1005                      # Mount not found, treat as workspace path (will likely fail boundary check)
1006                      relative_to_workspace = path_str[len("/workspace"):].lstrip("/")
1007                      resolved = (self.workspace / relative_to_workspace).resolve()
1008                      return resolved
1009  
1010              elif external_part.startswith("rw/"):
1011                  # Read-write external mount: /workspace/external/rw/{name}/* -> /mounts/{name}/*
1012                  relative = external_part[3:]  # Remove "rw/"
1013  
1014                  # Extract mount name (first path component)
1015                  if "/" in relative:
1016                      mount_name, mount_relative = relative.split("/", 1)
1017                  else:
1018                      mount_name = relative
1019                      mount_relative = ""
1020  
1021                  # Check global RW mounts first
1022                  if mount_name in self.global_mounts_rw:
1023                      mount_path = self.global_mounts_rw[mount_name]
1024                      if mount_relative:
1025                          resolved = (mount_path / mount_relative).resolve()
1026                      else:
1027                          resolved = mount_path.resolve()
1028                      # Security: verify resolved path stays within boundary
1029                      if not self._is_within_boundary(resolved, mount_path):
1030                          raise PathValidationError(
1031                              f"Path traversal detected: {path}",
1032                              path=path,
1033                              reason="PATH_TRAVERSAL: Resolved path escapes global-rw mount boundary",
1034                          )
1035                      return resolved
1036                  # Fallback to user mounts for backward compatibility
1037                  elif mount_name in self.user_mounts_rw:
1038                      mount_path = self.user_mounts_rw[mount_name]
1039                      if mount_relative:
1040                          resolved = (mount_path / mount_relative).resolve()
1041                      else:
1042                          resolved = mount_path.resolve()
1043                      # Security: verify resolved path stays within boundary
1044                      if not self._is_within_boundary(resolved, mount_path):
1045                          raise PathValidationError(
1046                              f"Path traversal detected: {path}",
1047                              path=path,
1048                              reason="PATH_TRAVERSAL: Resolved path escapes user-rw mount boundary",
1049                          )
1050                      return resolved
1051                  else:
1052                      # Mount not found, treat as workspace path
1053                      relative_to_workspace = path_str[len("/workspace"):].lstrip("/")
1054                      resolved = (self.workspace / relative_to_workspace).resolve()
1055                      return resolved
1056  
1057              elif external_part.startswith("persistent/") or external_part == "persistent":
1058                  # DEPRECATED: /workspace/external/persistent/* is deprecated
1059                  # Use /workspace/persistent/* instead (persistent is now at workspace root)
1060                  logger.warning(
1061                      f"Deprecated path: {path}. Use ./persistent/ instead of ./external/persistent/"
1062                  )
1063                  if self.persistent:
1064                      relative = external_part[11:] if external_part != "persistent" else ""  # Remove "persistent/"
1065                      return self._resolve_persistent_path(path_str, relative, path)
1066                  # Persistent not configured, treat as workspace path
1067                  relative_to_workspace = path_str[len("/workspace"):].lstrip("/")
1068                  return (self.workspace / relative_to_workspace).resolve()
1069  
1070              elif external_part.startswith("user-ro/"):
1071                  # Per-user read-only mount: /workspace/external/user-ro/{name}/* -> real path/*
1072                  remaining = external_part[8:]  # Remove "user-ro/"
1073                  # Extract mount name (first path component)
1074                  if "/" in remaining:
1075                      mount_name, relative = remaining.split("/", 1)
1076                  else:
1077                      mount_name = remaining
1078                      relative = ""
1079  
1080                  if mount_name in self.user_mounts_ro:
1081                      mount_path = self.user_mounts_ro[mount_name]
1082                      if relative:
1083                          resolved = (mount_path / relative).resolve()
1084                      else:
1085                          resolved = mount_path.resolve()
1086                      # Security: verify resolved path stays within boundary
1087                      if not self._is_within_boundary(resolved, mount_path):
1088                          raise PathValidationError(
1089                              f"Path traversal detected: {path}",
1090                              path=path,
1091                              reason="PATH_TRAVERSAL: Resolved path escapes user-ro mount boundary",
1092                          )
1093                      return resolved
1094                  else:
1095                      # Mount not configured, treat as workspace path
1096                      relative_to_workspace = path_str[len("/workspace"):].lstrip("/")
1097                      resolved = (self.workspace / relative_to_workspace).resolve()
1098                      return resolved
1099  
1100              elif external_part.startswith("user-rw/"):
1101                  # Per-user read-write mount: /workspace/external/user-rw/{name}/* -> real path/*
1102                  remaining = external_part[8:]  # Remove "user-rw/"
1103                  # Extract mount name (first path component)
1104                  if "/" in remaining:
1105                      mount_name, relative = remaining.split("/", 1)
1106                  else:
1107                      mount_name = remaining
1108                      relative = ""
1109  
1110                  if mount_name in self.user_mounts_rw:
1111                      mount_path = self.user_mounts_rw[mount_name]
1112                      if relative:
1113                          resolved = (mount_path / relative).resolve()
1114                      else:
1115                          resolved = mount_path.resolve()
1116                      # Security: verify resolved path stays within boundary
1117                      if not self._is_within_boundary(resolved, mount_path):
1118                          raise PathValidationError(
1119                              f"Path traversal detected: {path}",
1120                              path=path,
1121                              reason="PATH_TRAVERSAL: Resolved path escapes user-rw mount boundary",
1122                          )
1123                      return resolved
1124                  else:
1125                      # Mount not configured, treat as workspace path
1126                      relative_to_workspace = path_str[len("/workspace"):].lstrip("/")
1127                      resolved = (self.workspace / relative_to_workspace).resolve()
1128                      return resolved
1129  
1130              # Unrecognized external path - fall through to workspace handling
1131  
1132          # NOTE: Dynamic mounts are now symlinked at workspace root (e.g., workspace/{alias})
1133          # instead of workspace/dynamic/{alias}. The symlink resolution in workspace path
1134          # handling below automatically resolves to /mounts/dynamic/{base}/{subpath}.
1135          # Validation then checks if the resolved path is within allowed dynamic_mounts_*.
1136  
1137          # Handle standard workspace paths
1138          if path_str.startswith("/workspace"):
1139              # Agent provided bwrap-style path: /workspace/foo -> workspace/foo
1140              relative_to_workspace = path_str[len("/workspace"):].lstrip("/")
1141              resolved = (self.workspace / relative_to_workspace).resolve()
1142              # Security: verify resolved path stays within workspace boundary
1143              if not self._is_within_boundary(resolved, self.workspace):
1144                  raise PathValidationError(
1145                      f"Path traversal detected: {path}",
1146                      path=path,
1147                      reason="PATH_TRAVERSAL: Resolved path escapes workspace boundary",
1148                  )
1149          elif not p.is_absolute():
1150              # Relative path: ./foo or foo -> workspace/foo
1151              resolved = (self.workspace / p).resolve()
1152          else:
1153              # Absolute path NOT starting with /workspace
1154              # Check if this is an original-path mount (e.g., /var/log)
1155              # Original-path mounts allow accessing paths at their original locations
1156              # Translate: /var/log -> /mounts/paths/_var_log
1157              original_mount = self._find_original_path_mount(path_str)
1158              if original_mount:
1159                  orig_path, docker_path, is_ro = original_mount
1160                  if path_str == orig_path:
1161                      resolved = docker_path.resolve()
1162                  else:
1163                      # Path is under the mount (e.g., /var/log/syslog)
1164                      relative = path_str[len(orig_path):].lstrip("/")
1165                      resolved = (docker_path / relative).resolve()
1166                  # Security: verify resolved path stays within mount boundary
1167                  if not self._is_within_boundary(resolved, docker_path):
1168                      raise PathValidationError(
1169                          f"Path traversal detected: {path}",
1170                          path=path,
1171                          reason="PATH_TRAVERSAL: Resolved path escapes original-path mount boundary",
1172                      )
1173              else:
1174                  # This is an escape attempt (like /etc/passwd)
1175                  resolved = Path(p).resolve()
1176  
1177          return resolved
1178  
1179      def validate_no_symlink_escape(
1180          self, path: Path, boundary: Path, check_intermediate: bool = True
1181      ) -> Path:
1182          """
1183          Validate that path (including symlinks) doesn't escape boundary.
1184  
1185          This prevents TOCTOU attacks where:
1186          1. Attacker creates: /workspace/external/rw/projects/link -> /etc/passwd
1187          2. Validation passes (link exists in allowed area)
1188          3. Read follows symlink to /etc/passwd
1189  
1190          Args:
1191              path: The path to validate
1192              boundary: The boundary the resolved path must stay within
1193              check_intermediate: If True, check each intermediate symlink
1194  
1195          Returns:
1196              The fully resolved path
1197  
1198          Raises:
1199              PathValidationError: If path or any symlink escapes boundary
1200          """
1201          # Resolve the path fully (follows all symlinks)
1202          try:
1203              resolved = path.resolve(strict=False)
1204          except (OSError, RuntimeError) as e:
1205              raise PathValidationError(
1206                  f"Cannot resolve path: {path} - {e}",
1207                  path=str(path),
1208                  reason="PATH_RESOLUTION_ERROR",
1209              )
1210  
1211          # Check each intermediate component for symlink escape
1212          if check_intermediate and path.exists():
1213              current = Path("/")
1214              for part in path.parts[1:]:  # Skip root
1215                  current = current / part
1216                  if current.exists() and current.is_symlink():
1217                      try:
1218                          link_target = current.resolve()
1219                          link_target.relative_to(boundary)
1220                      except ValueError:
1221                          logger.warning(
1222                              f"PATH_VALIDATOR: Symlink escape detected: "
1223                              f"{current} -> {link_target} (outside {boundary})"
1224                          )
1225                          raise PathValidationError(
1226                              f"Symlink escape detected: {current}",
1227                              path=str(path),
1228                              reason="SYMLINK_ESCAPE",
1229                          )
1230                      except OSError:
1231                          # Broken symlink or permission error - allow to continue
1232                          pass
1233  
1234          # Final resolved path must be within boundary
1235          try:
1236              resolved.relative_to(boundary)
1237          except ValueError:
1238              raise PathValidationError(
1239                  f"Path resolves outside boundary: {path} -> {resolved}",
1240                  path=str(path),
1241                  reason="PATH_ESCAPE",
1242              )
1243  
1244          return resolved
1245  
1246      def _log_allowed(self, original: str, normalized: Path, operation: str) -> None:
1247          """Log allowed path access."""
1248          if self.config.log_all_access:
1249              logger.info(
1250                  f"PATH_VALIDATOR: ALLOWED {operation.upper()} "
1251                  f"'{original}' -> '{normalized}'"
1252              )
1253  
1254      def _extract_path_component(self, path_str: str, pattern: str) -> str | None:
1255          """
1256          Extract the first path component after a pattern.
1257  
1258          Args:
1259              path_str: The path string to search
1260              pattern: The pattern to find (e.g., "/users/")
1261  
1262          Returns:
1263              The first component after the pattern, or None if not found
1264          """
1265          idx = path_str.find(pattern)
1266          if idx < 0:
1267              return None
1268          remaining = path_str[idx + len(pattern):]
1269          return remaining.split("/")[0] if remaining else None
1270  
1271      def _resolve_persistent_path(self, path: str, relative: str, original_path: str) -> Path:
1272          """
1273          Resolve a path within persistent storage with boundary validation.
1274  
1275          Args:
1276              path: The full path being resolved (for error messages)
1277              relative: The relative path within persistent storage
1278              original_path: The original user-provided path (for error messages)
1279  
1280          Returns:
1281              Resolved Path within persistent storage
1282  
1283          Raises:
1284              PathValidationError: If path escapes boundary or persistent not configured
1285          """
1286          if not self.persistent:
1287              raise PathValidationError(
1288                  f"Persistent storage not configured: {path}",
1289                  path=original_path,
1290                  reason="Persistent storage path not available",
1291              )
1292  
1293          resolved = (self.persistent / relative).resolve() if relative else self.persistent.resolve()
1294  
1295          if not self._is_within_boundary(resolved, self.persistent):
1296              raise PathValidationError(
1297                  f"Path traversal detected: {original_path}",
1298                  path=original_path,
1299                  reason="PATH_TRAVERSAL: Resolved path escapes persistent storage boundary",
1300              )
1301          return resolved
1302  
1303      def _find_original_path_mount(
1304          self, path: str
1305      ) -> tuple[str, Path, bool] | None:
1306          """
1307          Find the original-path mount that contains the given path.
1308  
1309          Args:
1310              path: An absolute path (e.g., "/var/log" or "/var/log/syslog")
1311  
1312          Returns:
1313              Tuple of (original_path, docker_path, is_readonly) if found, else None
1314          """
1315          best_match: tuple[str, Path, bool] | None = None
1316          best_len = 0
1317  
1318          # Check RO mounts
1319          for orig_path, docker_path in self.original_path_mounts_ro.items():
1320              if path == orig_path or path.startswith(orig_path + "/"):
1321                  if len(orig_path) > best_len:
1322                      best_match = (orig_path, docker_path, True)
1323                      best_len = len(orig_path)
1324  
1325          # Check RW mounts
1326          for orig_path, docker_path in self.original_path_mounts_rw.items():
1327              if path == orig_path or path.startswith(orig_path + "/"):
1328                  if len(orig_path) > best_len:
1329                      best_match = (orig_path, docker_path, False)
1330                      best_len = len(orig_path)
1331  
1332          return best_match
1333  
1334      def get_sandbox_root_entries(self) -> list[tuple[str, str, str]]:
1335          """
1336          Synthesize a virtual directory listing of the sandbox root (/).
1337  
1338          Returns entries matching what the agent would see inside bwrap,
1339          based on configured mounts. This avoids exposing Docker container
1340          internals while giving the agent a useful view of available paths.
1341  
1342          Returns:
1343              List of (display_path, access_mode, description) tuples.
1344              display_path: The path as the agent should see it (e.g., "/workspace")
1345              access_mode: "rw" or "ro"
1346              description: Human-readable description
1347          """
1348          entries: list[tuple[str, str, str]] = []
1349  
1350          # Core paths (always present)
1351          entries.append(("/workspace", "rw", "Session workspace (working directory)"))
1352  
1353          if self.persistent:
1354              entries.append(("/persistent", "rw", "Persistent storage (cross-session)"))
1355  
1356          # User environment
1357          # Venv is always configured in permissions.yaml session_mounts
1358          entries.append(("/venv", "ro", "Python virtual environment"))
1359  
1360          if self.global_skills:
1361              entries.append(("/skills", "ro", "Global skills"))
1362          if self.user_skills:
1363              entries.append(("/user-skills", "ro", "User skills"))
1364  
1365          # Original-path mounts (e.g., /var/log)
1366          for orig_path in sorted(self.original_path_mounts_ro.keys()):
1367              entries.append((orig_path, "ro", f"Mounted from host (read-only)"))
1368          for orig_path in sorted(self.original_path_mounts_rw.keys()):
1369              entries.append((orig_path, "rw", f"Mounted from host (read-write)"))
1370  
1371          return entries
1372  
1373      def find_virtual_children(self, parent_path: str) -> list[tuple[str, str, str]] | None:
1374          """
1375          Find virtual directory children for a path that is a parent of configured mounts.
1376  
1377          For example, if /var/log is a configured original-path mount,
1378          calling this with "/var" returns [("log", "ro", "Mounted from host")].
1379  
1380          Args:
1381              parent_path: Absolute path to check (e.g., "/var")
1382  
1383          Returns:
1384              List of (child_name, access_mode, description) if any mounts exist
1385              under this path. None if no mounts are found under this path.
1386          """
1387          parent = parent_path.rstrip("/") + "/"
1388          children: list[tuple[str, str, str]] = []
1389          seen: set[str] = set()
1390  
1391          for orig_path in self.original_path_mounts_ro:
1392              if orig_path.startswith(parent):
1393                  # Extract the immediate child component
1394                  remainder = orig_path[len(parent):]
1395                  child_name = remainder.split("/")[0]
1396                  if child_name and child_name not in seen:
1397                      seen.add(child_name)
1398                      children.append((child_name, "ro", f"Contains mount: {orig_path}"))
1399  
1400          for orig_path in self.original_path_mounts_rw:
1401              if orig_path.startswith(parent):
1402                  remainder = orig_path[len(parent):]
1403                  child_name = remainder.split("/")[0]
1404                  if child_name and child_name not in seen:
1405                      seen.add(child_name)
1406                      children.append((child_name, "rw", f"Contains mount: {orig_path}"))
1407  
1408          return children if children else None
1409  
1410      def _is_within_boundary(self, path: Path, boundary: Path) -> bool:
1411          """
1412          Check if a resolved path is within the given boundary.
1413  
1414          This prevents path traversal attacks where .. components
1415          could escape the intended directory boundary.
1416  
1417          Args:
1418              path: The resolved path to check
1419              boundary: The boundary directory path must stay within
1420  
1421          Returns:
1422              True if path is within boundary, False otherwise
1423          """
1424          try:
1425              # Resolve both paths to handle any symlinks
1426              resolved_path = path.resolve()
1427              resolved_boundary = boundary.resolve()
1428              # Check if path is relative to boundary
1429              resolved_path.relative_to(resolved_boundary)
1430              return True
1431          except ValueError:
1432              return False
1433  
1434      def _log_blocked(self, path: str, operation: str, reason: str) -> None:
1435          """Log blocked path access."""
1436          logger.warning(
1437              f"PATH_VALIDATOR: BLOCKED {operation.upper()} " f"'{path}' - {reason}"
1438          )
1439  
1440  
1441  # =============================================================================
1442  # Session-Scoped Validator Management
1443  # =============================================================================
1444  
1445  # Session-scoped validators (NOT singleton - each session has its own)
1446  _session_validators: dict[str, Ag3ntumPathValidator] = {}
1447  
1448  # Session-scoped linux UIDs for file ownership (sandbox user UID per session)
1449  _session_linux_uids: dict[str, int] = {}
1450  
1451  
1452  def set_session_linux_uid(session_id: str, linux_uid: int) -> None:
1453      """Store the linux_uid (sandbox user UID) for a session."""
1454      _session_linux_uids[session_id] = linux_uid
1455      logger.debug(f"PATH_VALIDATOR: Set linux_uid={linux_uid} for session {session_id}")
1456  
1457  
1458  def get_session_linux_uid(session_id: str) -> int | None:
1459      """Get the linux_uid for a session, or None if not set."""
1460      return _session_linux_uids.get(session_id)
1461  
1462  
1463  def get_path_validator(session_id: str) -> Ag3ntumPathValidator:
1464      """
1465      Get the path validator for a session.
1466  
1467      Args:
1468          session_id: The session ID
1469  
1470      Returns:
1471          The configured Ag3ntumPathValidator for this session
1472  
1473      Raises:
1474          RuntimeError: If validator not configured for this session
1475      """
1476      if session_id not in _session_validators:
1477          raise RuntimeError(
1478              f"PathValidator not configured for session {session_id}. "
1479              "Call configure_path_validator() first."
1480          )
1481      return _session_validators[session_id]
1482  
1483  
1484  def configure_path_validator(
1485      session_id: str,
1486      workspace_path: Path,
1487      username: str | None = None,
1488      skills_path: Path | None = None,
1489      global_skills_path: Path | None = None,
1490      user_skills_path: Path | None = None,
1491      global_mounts_ro: dict[str, Path] | None = None,
1492      global_mounts_rw: dict[str, Path] | None = None,
1493      persistent_path: Path | None = None,
1494      user_mounts_ro: dict[str, Path] | None = None,
1495      user_mounts_rw: dict[str, Path] | None = None,
1496      dynamic_mounts_ro: dict[str, Path] | None = None,
1497      dynamic_mounts_rw: dict[str, Path] | None = None,
1498      original_path_mounts_ro: dict[str, Path] | None = None,
1499      original_path_mounts_rw: dict[str, Path] | None = None,
1500      blocklist: list[str] | None = None,
1501      readonly_prefixes: list[str] | None = None,
1502  ) -> Ag3ntumPathValidator:
1503      """
1504      Configure and return path validator for a session.
1505  
1506      This function also configures the SandboxPathResolver for the session,
1507      ensuring both components are available for path handling.
1508  
1509      Args:
1510          session_id: The session ID
1511          workspace_path: REAL Docker filesystem path to session workspace
1512          username: Username for this session (extracted from path if not provided)
1513          skills_path: Deprecated, use global_skills_path/user_skills_path
1514          global_skills_path: Path to global skills directory (read-only)
1515          user_skills_path: Path to user skills directory (read-only)
1516          global_mounts_ro: Global read-only mounts {name: container_path} (flattened)
1517          global_mounts_rw: Global read-write mounts {name: container_path} (flattened)
1518          persistent_path: Path to user's persistent storage
1519          user_mounts_ro: Per-user read-only mounts {name: container_path}
1520          user_mounts_rw: Per-user read-write mounts {name: container_path}
1521          dynamic_mounts_ro: Dynamic read-only mounts {alias: container_path}
1522          dynamic_mounts_rw: Dynamic read-write mounts {alias: container_path}
1523          original_path_mounts_ro: Original-path read-only mounts {orig_path: docker_path}
1524          original_path_mounts_rw: Original-path read-write mounts {orig_path: docker_path}
1525          blocklist: Optional list of blocked patterns (defaults to common sensitive files)
1526          readonly_prefixes: Optional list of read-only path prefixes
1527  
1528      Returns:
1529          The configured Ag3ntumPathValidator
1530      """
1531      # Extract username from workspace path if not provided
1532      # Path format: /users/{username}/sessions/{session_id}/workspace
1533      if username is None:
1534          workspace_str = str(workspace_path)
1535          if workspace_str.startswith("/users/"):
1536              parts = workspace_str.split("/")
1537              if len(parts) >= 3:
1538                  username = parts[2]
1539          if username is None:
1540              logger.warning(
1541                  f"Could not extract username from workspace path: {workspace_path}. "
1542                  "SandboxPathResolver will not be configured."
1543              )
1544  
1545      config = PathValidatorConfig(
1546          workspace_path=workspace_path,
1547          skills_path=skills_path,
1548          global_skills_path=global_skills_path,
1549          user_skills_path=user_skills_path,
1550          global_mounts_ro=global_mounts_ro or {},
1551          global_mounts_rw=global_mounts_rw or {},
1552          persistent_path=persistent_path,
1553          user_mounts_ro=user_mounts_ro or {},
1554          user_mounts_rw=user_mounts_rw or {},
1555          dynamic_mounts_ro=dynamic_mounts_ro or {},
1556          dynamic_mounts_rw=dynamic_mounts_rw or {},
1557          original_path_mounts_ro=original_path_mounts_ro or {},
1558          original_path_mounts_rw=original_path_mounts_rw or {},
1559          blocklist=blocklist or DEFAULT_BLOCKLIST.copy(),
1560          readonly_prefixes=readonly_prefixes or DEFAULT_READONLY_PREFIXES.copy(),
1561      )
1562      validator = Ag3ntumPathValidator(config)
1563      _session_validators[session_id] = validator
1564  
1565      # Also configure SandboxPathResolver for this session
1566      if username:
1567          try:
1568              configure_sandbox_path_resolver(
1569                  session_id=session_id,
1570                  username=username,
1571                  workspace_docker=str(workspace_path),
1572                  global_mounts_ro={k: str(v) for k, v in (global_mounts_ro or {}).items()},
1573                  global_mounts_rw={k: str(v) for k, v in (global_mounts_rw or {}).items()},
1574                  user_mounts_ro={k: str(v) for k, v in (user_mounts_ro or {}).items()},
1575                  user_mounts_rw={k: str(v) for k, v in (user_mounts_rw or {}).items()},
1576              )
1577          except Exception as e:
1578              logger.warning(f"Failed to configure SandboxPathResolver: {e}")
1579  
1580      # Log mount info if any configured
1581      global_ro_count = len(global_mounts_ro) if global_mounts_ro else 0
1582      global_rw_count = len(global_mounts_rw) if global_mounts_rw else 0
1583      user_ro_count = len(user_mounts_ro) if user_mounts_ro else 0
1584      user_rw_count = len(user_mounts_rw) if user_mounts_rw else 0
1585      orig_ro_count = len(original_path_mounts_ro) if original_path_mounts_ro else 0
1586      orig_rw_count = len(original_path_mounts_rw) if original_path_mounts_rw else 0
1587  
1588      logger.info(
1589          f"PATH_VALIDATOR: Configured for session {session_id} "
1590          f"with workspace={workspace_path}, username={username}, "
1591          f"global_mounts={global_ro_count} RO/{global_rw_count} RW, "
1592          f"persistent={persistent_path}, "
1593          f"user_mounts={user_ro_count} RO/{user_rw_count} RW, "
1594          f"original_paths={orig_ro_count} RO/{orig_rw_count} RW"
1595      )
1596      return validator
1597  
1598  
1599  def cleanup_path_validator(session_id: str) -> None:
1600      """
1601      Remove path validator when session ends.
1602  
1603      This also cleans up the associated SandboxPathResolver.
1604  
1605      Args:
1606          session_id: The session ID to clean up
1607      """
1608      if session_id in _session_validators:
1609          del _session_validators[session_id]
1610          logger.info(f"PATH_VALIDATOR: Cleaned up validator for session {session_id}")
1611  
1612      # Also cleanup session linux_uid
1613      _session_linux_uids.pop(session_id, None)
1614  
1615      # Also cleanup SandboxPathResolver
1616      cleanup_sandbox_path_resolver(session_id)
1617  
1618  
1619  def has_path_validator(session_id: str) -> bool:
1620      """
1621      Check if a path validator is configured for a session.
1622  
1623      Args:
1624          session_id: The session ID to check
1625  
1626      Returns:
1627          True if validator is configured, False otherwise
1628      """
1629      return session_id in _session_validators
1630  
1631  
1632  # =============================================================================
1633  # Sandbox Path Resolution Utilities
1634  # =============================================================================
1635  
1636  def get_resolver_for_session(session_id: str) -> Optional[SandboxPathResolver]:
1637      """
1638      Get the SandboxPathResolver for a session if available.
1639  
1640      Args:
1641          session_id: The session ID
1642  
1643      Returns:
1644          SandboxPathResolver if configured, None otherwise
1645      """
1646      if has_sandbox_path_resolver(session_id):
1647          return get_sandbox_path_resolver(session_id)
1648      return None
1649  
1650  
1651  def sandbox_to_docker_path(session_id: str, sandbox_path: str) -> str:
1652      """
1653      Convert a sandbox path to Docker path for a session.
1654  
1655      This is a convenience function for converting agent-provided paths
1656      (in sandbox format) to Docker filesystem paths.
1657  
1658      Args:
1659          session_id: The session ID
1660          sandbox_path: Path in sandbox format (e.g., /workspace/file.txt)
1661  
1662      Returns:
1663          Docker filesystem path
1664  
1665      Raises:
1666          RuntimeError: If resolver not configured for session
1667          PathResolutionError: If path cannot be resolved
1668      """
1669      resolver = get_sandbox_path_resolver(session_id)
1670      return resolver.sandbox_to_docker(sandbox_path)
1671  
1672  
1673  def docker_to_sandbox_path(session_id: str, docker_path: str) -> str:
1674      """
1675      Convert a Docker path to sandbox path for a session.
1676  
1677      This is useful for translating error messages or paths from Docker
1678      processes back to the canonical sandbox format.
1679  
1680      Args:
1681          session_id: The session ID
1682          docker_path: Path in Docker format
1683  
1684      Returns:
1685          Sandbox path
1686  
1687      Raises:
1688          RuntimeError: If resolver not configured for session
1689          PathResolutionError: If path cannot be resolved
1690      """
1691      resolver = get_sandbox_path_resolver(session_id)
1692      return resolver.docker_to_sandbox(docker_path)
1693  
1694  
1695  def translate_error_message(session_id: str, error_message: str) -> str:
1696      """
1697      Translate Docker paths in an error message to sandbox paths.
1698  
1699      This makes error messages more user-friendly by showing paths
1700      in the format the agent understands.
1701  
1702      Args:
1703          session_id: The session ID
1704          error_message: Error message that may contain Docker paths
1705  
1706      Returns:
1707          Error message with Docker paths replaced by sandbox paths
1708      """
1709      resolver = get_resolver_for_session(session_id)
1710      if resolver:
1711          return resolver.translate_error_paths(error_message)
1712      return error_message
1713  
1714  
1715  def normalize_sandbox_path(session_id: str, path: str) -> str:
1716      """
1717      Normalize any path to canonical sandbox format.
1718  
1719      Args:
1720          session_id: The session ID
1721          path: Input path (can be relative or absolute)
1722  
1723      Returns:
1724          Canonical sandbox path (e.g., /workspace/file.txt)
1725  
1726      Raises:
1727          RuntimeError: If resolver not configured for session
1728          PathResolutionError: If path is invalid
1729      """
1730      resolver = get_sandbox_path_resolver(session_id)
1731      return resolver.normalize(path)