/ src / core / sandbox_path_resolver.py
sandbox_path_resolver.py
   1  """
   2  Sandbox Path Resolver - Unified path translation for Ag3ntum.
   3  
   4  This module provides a centralized, context-aware path resolution system that
   5  translates between sandbox paths (what the agent sees inside bubblewrap) and
   6  Docker paths (the real filesystem paths inside the Docker container).
   7  
   8  ARCHITECTURE OVERVIEW:
   9  ======================
  10  
  11  Ground Truth: SANDBOX PATHS
  12  ---------------------------
  13  All paths in Ag3ntum are expressed in "sandbox path" format - the paths as
  14  they appear inside the bubblewrap sandbox:
  15  
  16      /workspace/file.txt           - Main workspace file
  17      /workspace/persistent/img.png - Persistent storage
  18      /workspace/external/ro/name/file.csv    - Read-only external mount
  19      /workspace/external/rw/name/file.txt    - Read-write external mount
  20      /venv/bin/python3             - User's Python virtual environment
  21      /skills/.claude/skills/name/  - Global skills
  22      /user-skills/name/            - User skills (per-user mount for isolation)
  23  
  24  These paths are the CANONICAL representation used throughout the system.
  25  
  26  Docker Paths (Translation Target):
  27  ----------------------------------
  28  When code runs in Docker (not inside bwrap), paths must be translated:
  29  
  30      /workspace/file.txt → /users/{user}/sessions/{sid}/workspace/file.txt
  31      /workspace/persistent/img.png → /users/{user}/ag3ntum/persistent/img.png
  32      /workspace/external/ro/name/file.csv → /mounts/name/file.csv
  33      /venv/bin/python3 → /users/{user}/venv/bin/python3
  34  
  35  EXECUTION CONTEXTS:
  36  ==================
  37  
  38  1. SANDBOX (bubblewrap):
  39     - Bash commands run here via mcp__ag3ntum__Bash
  40     - Paths work as-is, no translation needed
  41     - Environment variable: AG3NTUM_CONTEXT=sandbox
  42  
  43  2. DOCKER (main Python process):
  44     - MCP tools (Read, Write, Edit, etc.) run here
  45     - API endpoints run here
  46     - Paths must be translated from sandbox → Docker format
  47  
  48  USAGE:
  49  ======
  50  
  51      # Get resolver for a session
  52      resolver = get_sandbox_path_resolver(session_id)
  53  
  54      # Convert sandbox path to current context
  55      actual_path = resolver.resolve("./file.txt")
  56  
  57      # Explicit conversions
  58      docker_path = resolver.sandbox_to_docker("/workspace/file.txt")
  59      sandbox_path = resolver.docker_to_sandbox("/users/greg/sessions/xxx/workspace/file.txt")
  60  
  61      # Normalize any path to canonical sandbox format
  62      canonical = resolver.normalize("/workspace/./foo/../bar.txt")  # → /workspace/bar.txt
  63  
  64  SECURITY:
  65  =========
  66  
  67  - All paths are validated to be within allowed mount boundaries
  68  - Symlink resolution is controlled to prevent escape attacks
  69  - Path traversal (../) is blocked or resolved within boundaries
  70  - Unicode normalization prevents homograph attacks
  71  """
  72  
  73  from __future__ import annotations
  74  
  75  import logging
  76  import os
  77  import re
  78  from dataclasses import dataclass, field
  79  from enum import Enum
  80  from pathlib import Path, PurePosixPath
  81  from typing import Optional
  82  
  83  logger = logging.getLogger(__name__)
  84  
  85  
  86  # =============================================================================
  87  # Path Utilities
  88  # =============================================================================
  89  
  90  def _strip_relative_prefix(path: str) -> str:
  91      """
  92      Strip './' prefix from path without affecting hidden directories.
  93  
  94      NOTE: Do NOT use lstrip("./") - it strips ALL '.' and '/' characters,
  95      which corrupts hidden directories like ".tmp" → "tmp".
  96      """
  97      if path.startswith("./"):
  98          return path[2:]
  99      return path
 100  
 101  
 102  # =============================================================================
 103  # Execution Context Detection
 104  # =============================================================================
 105  
 106  class ExecutionContext(Enum):
 107      """
 108      Execution environment where code is running.
 109  
 110      SANDBOX: Inside bubblewrap sandbox (bash commands, skill scripts)
 111      DOCKER: Inside Docker container but outside bwrap (API, MCP tools)
 112      """
 113      SANDBOX = "sandbox"
 114      DOCKER = "docker"
 115  
 116  
 117  # Cache the detected context (it doesn't change during process lifetime)
 118  _cached_context: Optional[ExecutionContext] = None
 119  
 120  
 121  def detect_execution_context() -> ExecutionContext:
 122      """
 123      Detect the current execution context.
 124  
 125      Detection strategy (in order):
 126      1. Check AG3NTUM_CONTEXT environment variable (set by bwrap via --setenv)
 127      2. Check filesystem markers (/workspace as mount vs subdirectory)
 128      3. Default to DOCKER (fail-safe for API/Python processes)
 129  
 130      Returns:
 131          ExecutionContext.SANDBOX if inside bubblewrap
 132          ExecutionContext.DOCKER if in main Docker container
 133      """
 134      global _cached_context
 135  
 136      if _cached_context is not None:
 137          return _cached_context
 138  
 139      # Method 1: Explicit environment variable (most reliable)
 140      # Bubblewrap sets this via: --setenv AG3NTUM_CONTEXT sandbox
 141      context_env = os.environ.get("AG3NTUM_CONTEXT", "").lower()
 142      if context_env == "sandbox":
 143          _cached_context = ExecutionContext.SANDBOX
 144          logger.debug("Execution context: SANDBOX (from AG3NTUM_CONTEXT env)")
 145          return _cached_context
 146  
 147      # Method 2: Check filesystem structure
 148      # In sandbox: /workspace exists and HOME=/workspace
 149      # In Docker: /workspace doesn't exist at root level
 150      workspace_root = Path("/workspace")
 151      home_env = os.environ.get("HOME", "")
 152  
 153      if workspace_root.exists() and workspace_root.is_dir() and home_env == "/workspace":
 154          # Additional verification: in sandbox, /users/*/sessions/* shouldn't be accessible
 155          # (only specific user paths are mounted)
 156          users_sessions = Path("/users")
 157          try:
 158              # In sandbox, listing /users should show limited content
 159              # In Docker, it shows all user directories
 160              if users_sessions.exists():
 161                  subdirs = list(users_sessions.iterdir())
 162                  # Heuristic: if we can see multiple session directories, we're in Docker
 163                  session_count = 0
 164                  for user_dir in subdirs[:5]:  # Check first 5 users max
 165                      sessions_dir = user_dir / "sessions"
 166                      if sessions_dir.exists():
 167                          try:
 168                              session_count += len(list(sessions_dir.iterdir())[:10])
 169                          except PermissionError:
 170                              pass
 171                  if session_count > 5:
 172                      # Can see many sessions - likely Docker, not sandbox
 173                      _cached_context = ExecutionContext.DOCKER
 174                      logger.debug("Execution context: DOCKER (filesystem heuristic)")
 175                      return _cached_context
 176          except (PermissionError, OSError):
 177              pass
 178  
 179          _cached_context = ExecutionContext.SANDBOX
 180          logger.debug("Execution context: SANDBOX (filesystem check)")
 181          return _cached_context
 182  
 183      # Default to Docker (safe default for API and MCP tools)
 184      _cached_context = ExecutionContext.DOCKER
 185      logger.debug("Execution context: DOCKER (default)")
 186      return _cached_context
 187  
 188  
 189  def reset_context_cache() -> None:
 190      """Reset the cached execution context (for testing)."""
 191      global _cached_context
 192      _cached_context = None
 193  
 194  
 195  # =============================================================================
 196  # Mount Configuration
 197  # =============================================================================
 198  
 199  @dataclass
 200  class MountMapping:
 201      """
 202      Bidirectional mapping between sandbox and Docker paths.
 203  
 204      Attributes:
 205          sandbox_path: Path as seen in bubblewrap (e.g., /workspace)
 206          docker_path: Path as seen in Docker (e.g., /users/greg/sessions/xxx/workspace)
 207          mode: Access mode ('ro' for read-only, 'rw' for read-write)
 208          mount_type: Category of mount for logging/debugging
 209      """
 210      sandbox_path: str
 211      docker_path: str
 212      mode: str = "ro"  # 'ro' or 'rw'
 213      mount_type: str = "unknown"
 214  
 215      def matches_sandbox_path(self, path: str) -> bool:
 216          """Check if path starts with this mount's sandbox path."""
 217          # Exact match or path with trailing component
 218          return path == self.sandbox_path or path.startswith(self.sandbox_path + "/")
 219  
 220      def matches_docker_path(self, path: str) -> bool:
 221          """Check if path starts with this mount's docker path."""
 222          return path == self.docker_path or path.startswith(self.docker_path + "/")
 223  
 224      def sandbox_to_docker(self, sandbox_path: str) -> str:
 225          """Convert a sandbox path to docker path using this mount."""
 226          if sandbox_path == self.sandbox_path:
 227              return self.docker_path
 228          suffix = sandbox_path[len(self.sandbox_path):]
 229          return self.docker_path + suffix
 230  
 231      def docker_to_sandbox(self, docker_path: str) -> str:
 232          """Convert a docker path to sandbox path using this mount."""
 233          if docker_path == self.docker_path:
 234              return self.sandbox_path
 235          suffix = docker_path[len(self.docker_path):]
 236          return self.sandbox_path + suffix
 237  
 238  
 239  @dataclass
 240  class SandboxPathContext:
 241      """
 242      Session-specific mount configuration.
 243  
 244      Contains all the mount mappings for a specific session, enabling
 245      bidirectional path translation between sandbox and Docker contexts.
 246      """
 247      session_id: str
 248      username: str
 249  
 250      # Core mounts (always present)
 251      workspace_sandbox: str = "/workspace"
 252      workspace_docker: str = ""  # Set in __post_init__
 253  
 254      venv_sandbox: str = "/venv"
 255      venv_docker: str = ""  # Set in __post_init__
 256  
 257      # Skills mounts
 258      global_skills_sandbox: str = "/skills"
 259      global_skills_docker: str = "/skills"  # Same path in both contexts
 260  
 261      user_skills_sandbox: str = ""  # Set in __post_init__
 262      user_skills_docker: str = ""  # Set in __post_init__
 263  
 264      # Persistent storage
 265      # Agent sees: /workspace/persistent (symlink in sandbox pointing to /persistent)
 266      # Docker sees: /users/{user}/ag3ntum/persistent (actual directory)
 267      persistent_sandbox: str = "/workspace/persistent"
 268      persistent_docker: str = ""  # Set in __post_init__
 269  
 270      # Global mounts (name -> container_path mappings, flattened structure)
 271      # With flattened mounts, all mounts are at /mounts/{name}
 272      global_mounts_ro: dict[str, str] = field(default_factory=dict)
 273      global_mounts_rw: dict[str, str] = field(default_factory=dict)
 274  
 275      # Per-user mounts (name -> container_path mappings)
 276      user_mounts_ro: dict[str, str] = field(default_factory=dict)
 277      user_mounts_rw: dict[str, str] = field(default_factory=dict)
 278  
 279      # All computed mounts (populated in __post_init__)
 280      _mounts: list[MountMapping] = field(default_factory=list, repr=False)
 281  
 282      def __post_init__(self):
 283          """Compute all mount mappings after initialization."""
 284          # Set dynamic paths based on username and session_id
 285          if not self.workspace_docker:
 286              self.workspace_docker = f"/users/{self.username}/sessions/{self.session_id}/workspace"
 287          if not self.venv_docker:
 288              self.venv_docker = f"/users/{self.username}/venv"
 289          if not self.user_skills_sandbox:
 290              self.user_skills_sandbox = "/user-skills"
 291          if not self.user_skills_docker:
 292              self.user_skills_docker = "/user-skills"
 293          # persistent_sandbox is always /workspace/persistent (hardcoded above)
 294          if not self.persistent_docker:
 295              self.persistent_docker = f"/users/{self.username}/ag3ntum/persistent"
 296  
 297          self._build_mounts()
 298  
 299      def _build_mounts(self) -> None:
 300          """Build the list of mount mappings, ordered by specificity (longest first)."""
 301          mounts = []
 302  
 303          # Workspace (most common)
 304          mounts.append(MountMapping(
 305              sandbox_path=self.workspace_sandbox,
 306              docker_path=self.workspace_docker,
 307              mode="rw",
 308              mount_type="workspace",
 309          ))
 310  
 311          # User venv
 312          mounts.append(MountMapping(
 313              sandbox_path=self.venv_sandbox,
 314              docker_path=self.venv_docker,
 315              mode="ro",
 316              mount_type="venv",
 317          ))
 318  
 319          # Global skills (same path in both contexts)
 320          mounts.append(MountMapping(
 321              sandbox_path=self.global_skills_sandbox,
 322              docker_path=self.global_skills_docker,
 323              mode="ro",
 324              mount_type="global_skills",
 325          ))
 326  
 327          # User skills (same path in both contexts)
 328          if self.user_skills_sandbox:
 329              mounts.append(MountMapping(
 330                  sandbox_path=self.user_skills_sandbox,
 331                  docker_path=self.user_skills_docker,
 332                  mode="ro",
 333                  mount_type="user_skills",
 334              ))
 335  
 336          # Persistent storage (same path in both contexts)
 337          if self.persistent_sandbox:
 338              mounts.append(MountMapping(
 339                  sandbox_path=self.persistent_sandbox,
 340                  docker_path=self.persistent_docker,
 341                  mode="rw",
 342                  mount_type="persistent",
 343              ))
 344  
 345          # Global mounts (flattened structure: /mounts/{name})
 346          # Sandbox and docker paths are the same for global mounts
 347          for name, container_path in self.global_mounts_ro.items():
 348              mounts.append(MountMapping(
 349                  sandbox_path=container_path,  # e.g., /mounts/global_var_log
 350                  docker_path=container_path,   # Same path in docker
 351                  mode="ro",
 352                  mount_type="global_mount_ro",
 353              ))
 354  
 355          for name, container_path in self.global_mounts_rw.items():
 356              mounts.append(MountMapping(
 357                  sandbox_path=container_path,
 358                  docker_path=container_path,
 359                  mode="rw",
 360                  mount_type="global_mount_rw",
 361              ))
 362  
 363          # Per-user mounts (flattened structure: /mounts/{name})
 364          for name, container_path in self.user_mounts_ro.items():
 365              # User-ro mounts: sandbox sees /workspace/external/user-ro/{name}
 366              # which is a symlink to /mounts/{name}
 367              mounts.append(MountMapping(
 368                  sandbox_path=f"/workspace/external/user-ro/{name}",
 369                  docker_path=container_path,
 370                  mode="ro",
 371                  mount_type="user_mount_ro",
 372              ))
 373  
 374          for name, container_path in self.user_mounts_rw.items():
 375              mounts.append(MountMapping(
 376                  sandbox_path=f"/workspace/external/user-rw/{name}",
 377                  docker_path=container_path,
 378                  mode="rw",
 379                  mount_type="user_mount_rw",
 380              ))
 381  
 382          # Sort by sandbox_path length (descending) for longest-prefix matching
 383          mounts.sort(key=lambda m: len(m.sandbox_path), reverse=True)
 384  
 385          self._mounts = mounts
 386  
 387      @property
 388      def mounts(self) -> list[MountMapping]:
 389          """Get all mount mappings."""
 390          return self._mounts
 391  
 392      def find_mount_for_sandbox_path(self, sandbox_path: str) -> Optional[MountMapping]:
 393          """Find the mount mapping that matches a sandbox path (longest prefix match)."""
 394          for mount in self._mounts:
 395              if mount.matches_sandbox_path(sandbox_path):
 396                  return mount
 397          return None
 398  
 399      def find_mount_for_docker_path(self, docker_path: str) -> Optional[MountMapping]:
 400          """Find the mount mapping that matches a docker path (longest prefix match)."""
 401          for mount in self._mounts:
 402              if mount.matches_docker_path(docker_path):
 403                  return mount
 404          return None
 405  
 406  
 407  # =============================================================================
 408  # Path Resolver
 409  # =============================================================================
 410  
 411  class PathResolutionError(Exception):
 412      """Raised when path resolution fails."""
 413  
 414      def __init__(self, message: str, path: str, reason: str):
 415          super().__init__(message)
 416          self.path = path
 417          self.reason = reason
 418  
 419  
 420  class SandboxPathResolver:
 421      """
 422      Resolves paths between sandbox and Docker contexts.
 423  
 424      This is the central path resolution component that:
 425      1. Normalizes paths to canonical sandbox format
 426      2. Translates sandbox paths to Docker paths (and vice versa)
 427      3. Handles workspace-relative paths (./foo, foo)
 428      4. Resolves external mount symlinks
 429      5. Validates paths are within allowed boundaries
 430  
 431      Thread Safety:
 432          This class is thread-safe. The context is immutable after creation,
 433          and path resolution is a pure function with no side effects.
 434  
 435      Usage:
 436          resolver = SandboxPathResolver(context)
 437  
 438          # Normalize any path to canonical sandbox format
 439          canonical = resolver.normalize("./file.txt")  # → /workspace/file.txt
 440  
 441          # Convert to Docker path for file operations
 442          docker_path = resolver.sandbox_to_docker("/workspace/file.txt")
 443  
 444          # Auto-detect context and resolve accordingly
 445          actual_path = resolver.resolve("/workspace/file.txt")
 446      """
 447  
 448      def __init__(self, context: SandboxPathContext):
 449          """
 450          Initialize resolver with session-specific context.
 451  
 452          Args:
 453              context: The mount configuration for this session
 454          """
 455          self._context = context
 456          self._execution_context = detect_execution_context()
 457  
 458      @property
 459      def context(self) -> SandboxPathContext:
 460          """Get the path context configuration."""
 461          return self._context
 462  
 463      @property
 464      def execution_context(self) -> ExecutionContext:
 465          """Get the current execution context."""
 466          return self._execution_context
 467  
 468      def normalize(self, path: str) -> str:
 469          """
 470          Normalize any path to canonical sandbox format.
 471  
 472          Handles:
 473          - Relative paths: ./foo, foo → /workspace/foo
 474          - Workspace paths: /workspace/foo → /workspace/foo
 475          - Persistent shortcut: persistent/foo → /workspace/persistent/foo
 476          - External mount shortcuts: external/ro/foo → /workspace/external/ro/foo
 477          - Path normalization: /workspace/./foo/../bar → /workspace/bar
 478  
 479          Args:
 480              path: Input path in any format
 481  
 482          Returns:
 483              Canonical sandbox path (absolute, normalized)
 484  
 485          Raises:
 486              PathResolutionError: If path contains invalid components
 487          """
 488          if not path:
 489              raise PathResolutionError("Empty path", path="", reason="EMPTY_PATH")
 490  
 491          path = path.strip()
 492  
 493          # Handle null bytes (security)
 494          if '\x00' in path:
 495              raise PathResolutionError(
 496                  f"Path contains null bytes: {path!r}",
 497                  path=path,
 498                  reason="NULL_BYTES",
 499              )
 500  
 501          # Parse as POSIX path
 502          p = PurePosixPath(path)
 503          path_str = str(p)
 504  
 505          # Handle relative paths - prepend /workspace
 506          if not p.is_absolute():
 507              clean_path = _strip_relative_prefix(path_str)
 508              path_str = f"/workspace/{clean_path}"
 509              p = PurePosixPath(path_str)
 510  
 511          # Resolve . and .. components
 512          parts = []
 513          for part in p.parts:
 514              if part == "/" or part == ".":
 515                  # Skip root and current directory markers
 516                  continue
 517              elif part == "..":
 518                  if parts:  # Don't go above root
 519                      parts.pop()
 520                  # Note: We don't raise on .. that would escape root
 521                  # The boundary check later will catch invalid escapes
 522              else:
 523                  parts.append(part)
 524  
 525          # Reconstruct as absolute path
 526          normalized = "/" + "/".join(parts) if parts else "/"
 527  
 528          return normalized
 529  
 530      def sandbox_to_docker(self, sandbox_path: str) -> str:
 531          """
 532          Convert a sandbox path to Docker path.
 533  
 534          This is used by MCP tools and API endpoints that run in Docker
 535          but receive paths in sandbox format.
 536  
 537          Args:
 538              sandbox_path: Path in sandbox format (e.g., /workspace/file.txt)
 539  
 540          Returns:
 541              Equivalent path in Docker format
 542  
 543          Raises:
 544              PathResolutionError: If path is not within any allowed mount
 545          """
 546          # Normalize first
 547          normalized = self.normalize(sandbox_path)
 548  
 549          # Handle persistent storage at workspace root
 550          # Agent sees: /workspace/persistent/foo (symlink to /persistent in sandbox)
 551          # Docker sees: /users/{user}/ag3ntum/persistent/foo
 552          if normalized.startswith("/workspace/persistent/") or normalized == "/workspace/persistent":
 553              if normalized == "/workspace/persistent":
 554                  return self._context.persistent_docker
 555              suffix = normalized[len("/workspace/persistent/"):]
 556              return f"{self._context.persistent_docker}/{suffix}"
 557  
 558          # Handle special workspace external paths
 559          if normalized.startswith("/workspace/external/"):
 560              external_part = normalized[len("/workspace/external/"):]
 561  
 562              if external_part.startswith("persistent/") or external_part == "persistent":
 563                  # DEPRECATED: /workspace/external/persistent/* is deprecated
 564                  # Use /workspace/persistent/* instead
 565                  logger.warning(
 566                      f"Deprecated path: {sandbox_path}. Use ./persistent/ instead of ./external/persistent/"
 567                  )
 568                  suffix = external_part[len("persistent"):].lstrip("/")
 569                  if suffix:
 570                      return f"{self._context.persistent_docker}/{suffix}"
 571                  return self._context.persistent_docker
 572  
 573              elif external_part.startswith("ro/") or external_part == "ro":
 574                  # Map to global read-only mount (flattened: /mounts/{name})
 575                  if external_part == "ro":
 576                      raise PathResolutionError(
 577                          "Cannot resolve external/ro without mount name",
 578                          path=sandbox_path,
 579                          reason="INVALID_PATH",
 580                      )
 581                  # Parse mount name from path: ro/{name}/... or ro/{name}
 582                  remaining = external_part[len("ro/"):]
 583                  if "/" in remaining:
 584                      mount_name, suffix = remaining.split("/", 1)
 585                  else:
 586                      mount_name = remaining
 587                      suffix = ""
 588  
 589                  if mount_name in self._context.global_mounts_ro:
 590                      container_path = self._context.global_mounts_ro[mount_name]
 591                      if suffix:
 592                          return f"{container_path}/{suffix}"
 593                      return container_path
 594                  else:
 595                      raise PathResolutionError(
 596                          f"Unknown global-ro mount: {mount_name}",
 597                          path=sandbox_path,
 598                          reason="UNKNOWN_MOUNT",
 599                      )
 600  
 601              elif external_part.startswith("rw/") or external_part == "rw":
 602                  # Map to global read-write mount (flattened: /mounts/{name})
 603                  if external_part == "rw":
 604                      raise PathResolutionError(
 605                          "Cannot resolve external/rw without mount name",
 606                          path=sandbox_path,
 607                          reason="INVALID_PATH",
 608                      )
 609                  # Parse mount name from path: rw/{name}/... or rw/{name}
 610                  remaining = external_part[len("rw/"):]
 611                  if "/" in remaining:
 612                      mount_name, suffix = remaining.split("/", 1)
 613                  else:
 614                      mount_name = remaining
 615                      suffix = ""
 616  
 617                  if mount_name in self._context.global_mounts_rw:
 618                      container_path = self._context.global_mounts_rw[mount_name]
 619                      if suffix:
 620                          return f"{container_path}/{suffix}"
 621                      return container_path
 622                  else:
 623                      raise PathResolutionError(
 624                          f"Unknown global-rw mount: {mount_name}",
 625                          path=sandbox_path,
 626                          reason="UNKNOWN_MOUNT",
 627                      )
 628  
 629              elif external_part.startswith("user-ro/"):
 630                  # Map to per-user RO mount
 631                  remaining = external_part[len("user-ro/"):]
 632                  if "/" in remaining:
 633                      mount_name, suffix = remaining.split("/", 1)
 634                  else:
 635                      mount_name = remaining
 636                      suffix = ""
 637  
 638                  if mount_name in self._context.user_mounts_ro:
 639                      host_path = self._context.user_mounts_ro[mount_name]
 640                      if suffix:
 641                          return f"{host_path}/{suffix}"
 642                      return host_path
 643                  else:
 644                      raise PathResolutionError(
 645                          f"Unknown user-ro mount: {mount_name}",
 646                          path=sandbox_path,
 647                          reason="UNKNOWN_MOUNT",
 648                      )
 649  
 650              elif external_part.startswith("user-rw/"):
 651                  # Map to per-user RW mount
 652                  remaining = external_part[len("user-rw/"):]
 653                  if "/" in remaining:
 654                      mount_name, suffix = remaining.split("/", 1)
 655                  else:
 656                      mount_name = remaining
 657                      suffix = ""
 658  
 659                  if mount_name in self._context.user_mounts_rw:
 660                      host_path = self._context.user_mounts_rw[mount_name]
 661                      if suffix:
 662                          return f"{host_path}/{suffix}"
 663                      return host_path
 664                  else:
 665                      raise PathResolutionError(
 666                          f"Unknown user-rw mount: {mount_name}",
 667                          path=sandbox_path,
 668                          reason="UNKNOWN_MOUNT",
 669                      )
 670  
 671          # Find matching mount
 672          mount = self._context.find_mount_for_sandbox_path(normalized)
 673          if mount:
 674              return mount.sandbox_to_docker(normalized)
 675  
 676          # No mount found - path is outside allowed directories
 677          raise PathResolutionError(
 678              f"Path not within any allowed mount: {sandbox_path}",
 679              path=sandbox_path,
 680              reason="OUTSIDE_MOUNTS",
 681          )
 682  
 683      def docker_to_sandbox(self, docker_path: str) -> str:
 684          """
 685          Convert a Docker path to sandbox path.
 686  
 687          This is used to translate error messages or paths from Docker
 688          processes back to the canonical sandbox format.
 689  
 690          Args:
 691              docker_path: Path in Docker format
 692  
 693          Returns:
 694              Equivalent path in sandbox format
 695  
 696          Raises:
 697              PathResolutionError: If path is not within any allowed mount
 698          """
 699          # Normalize the docker path (resolve . and ..)
 700          p = PurePosixPath(docker_path)
 701          parts = []
 702          for part in p.parts:
 703              if part == "/" or part == ".":
 704                  # Skip root and current directory markers
 705                  continue
 706              elif part == "..":
 707                  if parts:
 708                      parts.pop()
 709              else:
 710                  parts.append(part)
 711          # Reconstruct as absolute path
 712          normalized = "/" + "/".join(parts) if parts else "/"
 713  
 714          # Find matching mount
 715          mount = self._context.find_mount_for_docker_path(normalized)
 716          if mount:
 717              return mount.docker_to_sandbox(normalized)
 718  
 719          # Check for persistent storage path
 720          # Docker: /users/{user}/ag3ntum/persistent/* -> Sandbox: /workspace/persistent/*
 721          if normalized.startswith(self._context.persistent_docker):
 722              suffix = normalized[len(self._context.persistent_docker):].lstrip("/")
 723              if suffix:
 724                  return f"/workspace/persistent/{suffix}"
 725              return "/workspace/persistent"
 726  
 727          raise PathResolutionError(
 728              f"Docker path not within any allowed mount: {docker_path}",
 729              path=docker_path,
 730              reason="OUTSIDE_MOUNTS",
 731          )
 732  
 733      def resolve(self, path: str) -> str:
 734          """
 735          Resolve a sandbox path for the current execution context.
 736  
 737          This is the main entry point for path resolution:
 738          - If running in SANDBOX context: return normalized sandbox path
 739          - If running in DOCKER context: translate to Docker path
 740  
 741          Args:
 742              path: Input path (can be relative or absolute)
 743  
 744          Returns:
 745              Path appropriate for current execution context
 746          """
 747          normalized = self.normalize(path)
 748  
 749          if self._execution_context == ExecutionContext.SANDBOX:
 750              return normalized
 751          else:
 752              return self.sandbox_to_docker(normalized)
 753  
 754      def resolve_to_docker(self, path: str) -> str:
 755          """
 756          Always resolve to Docker path, regardless of context.
 757  
 758          Use this when you explicitly need the Docker path (e.g., for
 759          Python file operations that always run in Docker context).
 760  
 761          Args:
 762              path: Input path (can be relative or absolute)
 763  
 764          Returns:
 765              Docker filesystem path
 766          """
 767          return self.sandbox_to_docker(path)
 768  
 769      def is_path_writable(self, sandbox_path: str) -> bool:
 770          """
 771          Check if a sandbox path is writable.
 772  
 773          Args:
 774              sandbox_path: Path in sandbox format
 775  
 776          Returns:
 777              True if path is within a writable mount
 778          """
 779          try:
 780              normalized = self.normalize(sandbox_path)
 781          except PathResolutionError:
 782              return False
 783  
 784          # Check workspace external paths
 785          if normalized.startswith("/workspace/external/"):
 786              external_part = normalized[len("/workspace/external/"):]
 787  
 788              if external_part.startswith("persistent") or external_part == "persistent":
 789                  return True  # Persistent is writable
 790              elif external_part.startswith("ro/") or external_part == "ro":
 791                  return False  # Read-only mount
 792              elif external_part.startswith("rw/") or external_part == "rw":
 793                  return True  # Read-write mount
 794              elif external_part.startswith("user-ro/"):
 795                  return False  # Per-user read-only mount
 796              elif external_part.startswith("user-rw/"):
 797                  return True  # Per-user read-write mount
 798  
 799          # Find matching mount
 800          mount = self._context.find_mount_for_sandbox_path(normalized)
 801          if mount:
 802              return mount.mode == "rw"
 803  
 804          return False
 805  
 806      def get_mount_type(self, sandbox_path: str) -> Optional[str]:
 807          """
 808          Get the mount type for a sandbox path.
 809  
 810          Args:
 811              sandbox_path: Path in sandbox format
 812  
 813          Returns:
 814              Mount type string (e.g., 'workspace', 'persistent', 'external_ro')
 815              or None if path is not in any mount
 816          """
 817          try:
 818              normalized = self.normalize(sandbox_path)
 819          except PathResolutionError:
 820              return None
 821  
 822          # Check workspace external paths
 823          if normalized.startswith("/workspace/external/"):
 824              external_part = normalized[len("/workspace/external/"):]
 825  
 826              if external_part.startswith("persistent") or external_part == "persistent":
 827                  return "persistent"
 828              elif external_part.startswith("ro/") or external_part == "ro":
 829                  return "external_ro"
 830              elif external_part.startswith("rw/") or external_part == "rw":
 831                  return "external_rw"
 832              elif external_part.startswith("user-ro/"):
 833                  return "user_mount_ro"
 834              elif external_part.startswith("user-rw/"):
 835                  return "user_mount_rw"
 836  
 837          mount = self._context.find_mount_for_sandbox_path(normalized)
 838          if mount:
 839              return mount.mount_type
 840  
 841          return None
 842  
 843      def translate_error_paths(self, error_message: str) -> str:
 844          """
 845          Translate Docker paths in error messages to sandbox paths.
 846  
 847          This makes error messages more user-friendly by showing paths
 848          in the format the agent understands.
 849  
 850          Args:
 851              error_message: Error message that may contain Docker paths
 852  
 853          Returns:
 854              Error message with Docker paths replaced by sandbox paths
 855          """
 856          # Pattern to find Docker workspace paths
 857          # Matches: /users/username/sessions/session_id/workspace/...
 858          workspace_pattern = re.compile(
 859              r'/users/[^/]+/sessions/[^/]+/workspace(/[^\s\'"]*)?'
 860          )
 861  
 862          def replace_workspace(match: re.Match) -> str:
 863              docker_path = match.group(0)
 864              try:
 865                  return self.docker_to_sandbox(docker_path)
 866              except PathResolutionError:
 867                  return docker_path
 868  
 869          result = workspace_pattern.sub(replace_workspace, error_message)
 870  
 871          # Also translate persistent storage paths
 872          persistent_pattern = re.compile(
 873              r'/users/[^/]+/ag3ntum/persistent(/[^\s\'"]*)?'
 874          )
 875  
 876          def replace_persistent(match: re.Match) -> str:
 877              suffix = match.group(1) or ""
 878              return f"/workspace/persistent{suffix}"
 879  
 880          result = persistent_pattern.sub(replace_persistent, result)
 881  
 882          return result
 883  
 884  
 885  # =============================================================================
 886  # Session-Scoped Resolver Management
 887  # =============================================================================
 888  
 889  # Session-scoped resolvers (each session has its own)
 890  _session_resolvers: dict[str, SandboxPathResolver] = {}
 891  
 892  
 893  def get_sandbox_path_resolver(session_id: str) -> SandboxPathResolver:
 894      """
 895      Get the path resolver for a session.
 896  
 897      Args:
 898          session_id: The session ID
 899  
 900      Returns:
 901          The configured SandboxPathResolver for this session
 902  
 903      Raises:
 904          RuntimeError: If resolver not configured for this session
 905      """
 906      if session_id not in _session_resolvers:
 907          raise RuntimeError(
 908              f"SandboxPathResolver not configured for session {session_id}. "
 909              "Call configure_sandbox_path_resolver() first."
 910          )
 911      return _session_resolvers[session_id]
 912  
 913  
 914  def configure_sandbox_path_resolver(
 915      session_id: str,
 916      username: str,
 917      workspace_docker: Optional[str] = None,
 918      global_mounts_ro: Optional[dict[str, str]] = None,
 919      global_mounts_rw: Optional[dict[str, str]] = None,
 920      user_mounts_ro: Optional[dict[str, str]] = None,
 921      user_mounts_rw: Optional[dict[str, str]] = None,
 922  ) -> SandboxPathResolver:
 923      """
 924      Configure and return path resolver for a session.
 925  
 926      This should be called during session creation, before any file
 927      operations are performed.
 928  
 929      Args:
 930          session_id: The session ID
 931          username: The username for this session
 932          workspace_docker: Override the Docker workspace path
 933          global_mounts_ro: Global read-only mounts {name: container_path}
 934          global_mounts_rw: Global read-write mounts {name: container_path}
 935          user_mounts_ro: Per-user read-only mounts {name: container_path}
 936          user_mounts_rw: Per-user read-write mounts {name: container_path}
 937  
 938      Returns:
 939          The configured SandboxPathResolver
 940      """
 941      context = SandboxPathContext(
 942          session_id=session_id,
 943          username=username,
 944          workspace_docker=workspace_docker or f"/users/{username}/sessions/{session_id}/workspace",
 945          global_mounts_ro=global_mounts_ro or {},
 946          global_mounts_rw=global_mounts_rw or {},
 947          user_mounts_ro=user_mounts_ro or {},
 948          user_mounts_rw=user_mounts_rw or {},
 949      )
 950  
 951      resolver = SandboxPathResolver(context)
 952      _session_resolvers[session_id] = resolver
 953  
 954      logger.info(
 955          f"SANDBOX_PATH_RESOLVER: Configured for session {session_id}, "
 956          f"user={username}, context={resolver.execution_context.value}"
 957      )
 958  
 959      return resolver
 960  
 961  
 962  def cleanup_sandbox_path_resolver(session_id: str) -> None:
 963      """
 964      Remove path resolver when session ends.
 965  
 966      Args:
 967          session_id: The session ID to clean up
 968      """
 969      if session_id in _session_resolvers:
 970          del _session_resolvers[session_id]
 971          logger.info(f"SANDBOX_PATH_RESOLVER: Cleaned up resolver for session {session_id}")
 972  
 973  
 974  def has_sandbox_path_resolver(session_id: str) -> bool:
 975      """
 976      Check if a path resolver is configured for a session.
 977  
 978      Args:
 979          session_id: The session ID to check
 980  
 981      Returns:
 982          True if resolver is configured, False otherwise
 983      """
 984      return session_id in _session_resolvers
 985  
 986  
 987  # =============================================================================
 988  # Utility Functions
 989  # =============================================================================
 990  
 991  def create_resolver_for_session(
 992      session_id: str,
 993      username: str,
 994      workspace_path: Path,
 995      user_mounts_ro: Optional[dict[str, Path]] = None,
 996      user_mounts_rw: Optional[dict[str, Path]] = None,
 997  ) -> SandboxPathResolver:
 998      """
 999      Create a resolver with Path objects (convenience function).
1000  
1001      This is a convenience wrapper that accepts Path objects and converts
1002      them to strings for the SandboxPathContext.
1003  
1004      Args:
1005          session_id: The session ID
1006          username: The username
1007          workspace_path: Docker path to workspace (Path object)
1008          user_mounts_ro: Per-user RO mounts {name: Path}
1009          user_mounts_rw: Per-user RW mounts {name: Path}
1010  
1011      Returns:
1012          Configured SandboxPathResolver
1013      """
1014      return configure_sandbox_path_resolver(
1015          session_id=session_id,
1016          username=username,
1017          workspace_docker=str(workspace_path),
1018          user_mounts_ro={k: str(v) for k, v in (user_mounts_ro or {}).items()},
1019          user_mounts_rw={k: str(v) for k, v in (user_mounts_rw or {}).items()},
1020      )