sandbox_path_resolver.py
1 """ 2 Sandbox Path Resolver - Unified path translation for Ag3ntum. 3 4 This module provides a centralized, context-aware path resolution system that 5 translates between sandbox paths (what the agent sees inside bubblewrap) and 6 Docker paths (the real filesystem paths inside the Docker container). 7 8 ARCHITECTURE OVERVIEW: 9 ====================== 10 11 Ground Truth: SANDBOX PATHS 12 --------------------------- 13 All paths in Ag3ntum are expressed in "sandbox path" format - the paths as 14 they appear inside the bubblewrap sandbox: 15 16 /workspace/file.txt - Main workspace file 17 /workspace/persistent/img.png - Persistent storage 18 /workspace/external/ro/name/file.csv - Read-only external mount 19 /workspace/external/rw/name/file.txt - Read-write external mount 20 /venv/bin/python3 - User's Python virtual environment 21 /skills/.claude/skills/name/ - Global skills 22 /user-skills/name/ - User skills (per-user mount for isolation) 23 24 These paths are the CANONICAL representation used throughout the system. 25 26 Docker Paths (Translation Target): 27 ---------------------------------- 28 When code runs in Docker (not inside bwrap), paths must be translated: 29 30 /workspace/file.txt → /users/{user}/sessions/{sid}/workspace/file.txt 31 /workspace/persistent/img.png → /users/{user}/ag3ntum/persistent/img.png 32 /workspace/external/ro/name/file.csv → /mounts/name/file.csv 33 /venv/bin/python3 → /users/{user}/venv/bin/python3 34 35 EXECUTION CONTEXTS: 36 ================== 37 38 1. SANDBOX (bubblewrap): 39 - Bash commands run here via mcp__ag3ntum__Bash 40 - Paths work as-is, no translation needed 41 - Environment variable: AG3NTUM_CONTEXT=sandbox 42 43 2. DOCKER (main Python process): 44 - MCP tools (Read, Write, Edit, etc.) run here 45 - API endpoints run here 46 - Paths must be translated from sandbox → Docker format 47 48 USAGE: 49 ====== 50 51 # Get resolver for a session 52 resolver = get_sandbox_path_resolver(session_id) 53 54 # Convert sandbox path to current context 55 actual_path = resolver.resolve("./file.txt") 56 57 # Explicit conversions 58 docker_path = resolver.sandbox_to_docker("/workspace/file.txt") 59 sandbox_path = resolver.docker_to_sandbox("/users/greg/sessions/xxx/workspace/file.txt") 60 61 # Normalize any path to canonical sandbox format 62 canonical = resolver.normalize("/workspace/./foo/../bar.txt") # → /workspace/bar.txt 63 64 SECURITY: 65 ========= 66 67 - All paths are validated to be within allowed mount boundaries 68 - Symlink resolution is controlled to prevent escape attacks 69 - Path traversal (../) is blocked or resolved within boundaries 70 - Unicode normalization prevents homograph attacks 71 """ 72 73 from __future__ import annotations 74 75 import logging 76 import os 77 import re 78 from dataclasses import dataclass, field 79 from enum import Enum 80 from pathlib import Path, PurePosixPath 81 from typing import Optional 82 83 logger = logging.getLogger(__name__) 84 85 86 # ============================================================================= 87 # Path Utilities 88 # ============================================================================= 89 90 def _strip_relative_prefix(path: str) -> str: 91 """ 92 Strip './' prefix from path without affecting hidden directories. 93 94 NOTE: Do NOT use lstrip("./") - it strips ALL '.' and '/' characters, 95 which corrupts hidden directories like ".tmp" → "tmp". 96 """ 97 if path.startswith("./"): 98 return path[2:] 99 return path 100 101 102 # ============================================================================= 103 # Execution Context Detection 104 # ============================================================================= 105 106 class ExecutionContext(Enum): 107 """ 108 Execution environment where code is running. 109 110 SANDBOX: Inside bubblewrap sandbox (bash commands, skill scripts) 111 DOCKER: Inside Docker container but outside bwrap (API, MCP tools) 112 """ 113 SANDBOX = "sandbox" 114 DOCKER = "docker" 115 116 117 # Cache the detected context (it doesn't change during process lifetime) 118 _cached_context: Optional[ExecutionContext] = None 119 120 121 def detect_execution_context() -> ExecutionContext: 122 """ 123 Detect the current execution context. 124 125 Detection strategy (in order): 126 1. Check AG3NTUM_CONTEXT environment variable (set by bwrap via --setenv) 127 2. Check filesystem markers (/workspace as mount vs subdirectory) 128 3. Default to DOCKER (fail-safe for API/Python processes) 129 130 Returns: 131 ExecutionContext.SANDBOX if inside bubblewrap 132 ExecutionContext.DOCKER if in main Docker container 133 """ 134 global _cached_context 135 136 if _cached_context is not None: 137 return _cached_context 138 139 # Method 1: Explicit environment variable (most reliable) 140 # Bubblewrap sets this via: --setenv AG3NTUM_CONTEXT sandbox 141 context_env = os.environ.get("AG3NTUM_CONTEXT", "").lower() 142 if context_env == "sandbox": 143 _cached_context = ExecutionContext.SANDBOX 144 logger.debug("Execution context: SANDBOX (from AG3NTUM_CONTEXT env)") 145 return _cached_context 146 147 # Method 2: Check filesystem structure 148 # In sandbox: /workspace exists and HOME=/workspace 149 # In Docker: /workspace doesn't exist at root level 150 workspace_root = Path("/workspace") 151 home_env = os.environ.get("HOME", "") 152 153 if workspace_root.exists() and workspace_root.is_dir() and home_env == "/workspace": 154 # Additional verification: in sandbox, /users/*/sessions/* shouldn't be accessible 155 # (only specific user paths are mounted) 156 users_sessions = Path("/users") 157 try: 158 # In sandbox, listing /users should show limited content 159 # In Docker, it shows all user directories 160 if users_sessions.exists(): 161 subdirs = list(users_sessions.iterdir()) 162 # Heuristic: if we can see multiple session directories, we're in Docker 163 session_count = 0 164 for user_dir in subdirs[:5]: # Check first 5 users max 165 sessions_dir = user_dir / "sessions" 166 if sessions_dir.exists(): 167 try: 168 session_count += len(list(sessions_dir.iterdir())[:10]) 169 except PermissionError: 170 pass 171 if session_count > 5: 172 # Can see many sessions - likely Docker, not sandbox 173 _cached_context = ExecutionContext.DOCKER 174 logger.debug("Execution context: DOCKER (filesystem heuristic)") 175 return _cached_context 176 except (PermissionError, OSError): 177 pass 178 179 _cached_context = ExecutionContext.SANDBOX 180 logger.debug("Execution context: SANDBOX (filesystem check)") 181 return _cached_context 182 183 # Default to Docker (safe default for API and MCP tools) 184 _cached_context = ExecutionContext.DOCKER 185 logger.debug("Execution context: DOCKER (default)") 186 return _cached_context 187 188 189 def reset_context_cache() -> None: 190 """Reset the cached execution context (for testing).""" 191 global _cached_context 192 _cached_context = None 193 194 195 # ============================================================================= 196 # Mount Configuration 197 # ============================================================================= 198 199 @dataclass 200 class MountMapping: 201 """ 202 Bidirectional mapping between sandbox and Docker paths. 203 204 Attributes: 205 sandbox_path: Path as seen in bubblewrap (e.g., /workspace) 206 docker_path: Path as seen in Docker (e.g., /users/greg/sessions/xxx/workspace) 207 mode: Access mode ('ro' for read-only, 'rw' for read-write) 208 mount_type: Category of mount for logging/debugging 209 """ 210 sandbox_path: str 211 docker_path: str 212 mode: str = "ro" # 'ro' or 'rw' 213 mount_type: str = "unknown" 214 215 def matches_sandbox_path(self, path: str) -> bool: 216 """Check if path starts with this mount's sandbox path.""" 217 # Exact match or path with trailing component 218 return path == self.sandbox_path or path.startswith(self.sandbox_path + "/") 219 220 def matches_docker_path(self, path: str) -> bool: 221 """Check if path starts with this mount's docker path.""" 222 return path == self.docker_path or path.startswith(self.docker_path + "/") 223 224 def sandbox_to_docker(self, sandbox_path: str) -> str: 225 """Convert a sandbox path to docker path using this mount.""" 226 if sandbox_path == self.sandbox_path: 227 return self.docker_path 228 suffix = sandbox_path[len(self.sandbox_path):] 229 return self.docker_path + suffix 230 231 def docker_to_sandbox(self, docker_path: str) -> str: 232 """Convert a docker path to sandbox path using this mount.""" 233 if docker_path == self.docker_path: 234 return self.sandbox_path 235 suffix = docker_path[len(self.docker_path):] 236 return self.sandbox_path + suffix 237 238 239 @dataclass 240 class SandboxPathContext: 241 """ 242 Session-specific mount configuration. 243 244 Contains all the mount mappings for a specific session, enabling 245 bidirectional path translation between sandbox and Docker contexts. 246 """ 247 session_id: str 248 username: str 249 250 # Core mounts (always present) 251 workspace_sandbox: str = "/workspace" 252 workspace_docker: str = "" # Set in __post_init__ 253 254 venv_sandbox: str = "/venv" 255 venv_docker: str = "" # Set in __post_init__ 256 257 # Skills mounts 258 global_skills_sandbox: str = "/skills" 259 global_skills_docker: str = "/skills" # Same path in both contexts 260 261 user_skills_sandbox: str = "" # Set in __post_init__ 262 user_skills_docker: str = "" # Set in __post_init__ 263 264 # Persistent storage 265 # Agent sees: /workspace/persistent (symlink in sandbox pointing to /persistent) 266 # Docker sees: /users/{user}/ag3ntum/persistent (actual directory) 267 persistent_sandbox: str = "/workspace/persistent" 268 persistent_docker: str = "" # Set in __post_init__ 269 270 # Global mounts (name -> container_path mappings, flattened structure) 271 # With flattened mounts, all mounts are at /mounts/{name} 272 global_mounts_ro: dict[str, str] = field(default_factory=dict) 273 global_mounts_rw: dict[str, str] = field(default_factory=dict) 274 275 # Per-user mounts (name -> container_path mappings) 276 user_mounts_ro: dict[str, str] = field(default_factory=dict) 277 user_mounts_rw: dict[str, str] = field(default_factory=dict) 278 279 # All computed mounts (populated in __post_init__) 280 _mounts: list[MountMapping] = field(default_factory=list, repr=False) 281 282 def __post_init__(self): 283 """Compute all mount mappings after initialization.""" 284 # Set dynamic paths based on username and session_id 285 if not self.workspace_docker: 286 self.workspace_docker = f"/users/{self.username}/sessions/{self.session_id}/workspace" 287 if not self.venv_docker: 288 self.venv_docker = f"/users/{self.username}/venv" 289 if not self.user_skills_sandbox: 290 self.user_skills_sandbox = "/user-skills" 291 if not self.user_skills_docker: 292 self.user_skills_docker = "/user-skills" 293 # persistent_sandbox is always /workspace/persistent (hardcoded above) 294 if not self.persistent_docker: 295 self.persistent_docker = f"/users/{self.username}/ag3ntum/persistent" 296 297 self._build_mounts() 298 299 def _build_mounts(self) -> None: 300 """Build the list of mount mappings, ordered by specificity (longest first).""" 301 mounts = [] 302 303 # Workspace (most common) 304 mounts.append(MountMapping( 305 sandbox_path=self.workspace_sandbox, 306 docker_path=self.workspace_docker, 307 mode="rw", 308 mount_type="workspace", 309 )) 310 311 # User venv 312 mounts.append(MountMapping( 313 sandbox_path=self.venv_sandbox, 314 docker_path=self.venv_docker, 315 mode="ro", 316 mount_type="venv", 317 )) 318 319 # Global skills (same path in both contexts) 320 mounts.append(MountMapping( 321 sandbox_path=self.global_skills_sandbox, 322 docker_path=self.global_skills_docker, 323 mode="ro", 324 mount_type="global_skills", 325 )) 326 327 # User skills (same path in both contexts) 328 if self.user_skills_sandbox: 329 mounts.append(MountMapping( 330 sandbox_path=self.user_skills_sandbox, 331 docker_path=self.user_skills_docker, 332 mode="ro", 333 mount_type="user_skills", 334 )) 335 336 # Persistent storage (same path in both contexts) 337 if self.persistent_sandbox: 338 mounts.append(MountMapping( 339 sandbox_path=self.persistent_sandbox, 340 docker_path=self.persistent_docker, 341 mode="rw", 342 mount_type="persistent", 343 )) 344 345 # Global mounts (flattened structure: /mounts/{name}) 346 # Sandbox and docker paths are the same for global mounts 347 for name, container_path in self.global_mounts_ro.items(): 348 mounts.append(MountMapping( 349 sandbox_path=container_path, # e.g., /mounts/global_var_log 350 docker_path=container_path, # Same path in docker 351 mode="ro", 352 mount_type="global_mount_ro", 353 )) 354 355 for name, container_path in self.global_mounts_rw.items(): 356 mounts.append(MountMapping( 357 sandbox_path=container_path, 358 docker_path=container_path, 359 mode="rw", 360 mount_type="global_mount_rw", 361 )) 362 363 # Per-user mounts (flattened structure: /mounts/{name}) 364 for name, container_path in self.user_mounts_ro.items(): 365 # User-ro mounts: sandbox sees /workspace/external/user-ro/{name} 366 # which is a symlink to /mounts/{name} 367 mounts.append(MountMapping( 368 sandbox_path=f"/workspace/external/user-ro/{name}", 369 docker_path=container_path, 370 mode="ro", 371 mount_type="user_mount_ro", 372 )) 373 374 for name, container_path in self.user_mounts_rw.items(): 375 mounts.append(MountMapping( 376 sandbox_path=f"/workspace/external/user-rw/{name}", 377 docker_path=container_path, 378 mode="rw", 379 mount_type="user_mount_rw", 380 )) 381 382 # Sort by sandbox_path length (descending) for longest-prefix matching 383 mounts.sort(key=lambda m: len(m.sandbox_path), reverse=True) 384 385 self._mounts = mounts 386 387 @property 388 def mounts(self) -> list[MountMapping]: 389 """Get all mount mappings.""" 390 return self._mounts 391 392 def find_mount_for_sandbox_path(self, sandbox_path: str) -> Optional[MountMapping]: 393 """Find the mount mapping that matches a sandbox path (longest prefix match).""" 394 for mount in self._mounts: 395 if mount.matches_sandbox_path(sandbox_path): 396 return mount 397 return None 398 399 def find_mount_for_docker_path(self, docker_path: str) -> Optional[MountMapping]: 400 """Find the mount mapping that matches a docker path (longest prefix match).""" 401 for mount in self._mounts: 402 if mount.matches_docker_path(docker_path): 403 return mount 404 return None 405 406 407 # ============================================================================= 408 # Path Resolver 409 # ============================================================================= 410 411 class PathResolutionError(Exception): 412 """Raised when path resolution fails.""" 413 414 def __init__(self, message: str, path: str, reason: str): 415 super().__init__(message) 416 self.path = path 417 self.reason = reason 418 419 420 class SandboxPathResolver: 421 """ 422 Resolves paths between sandbox and Docker contexts. 423 424 This is the central path resolution component that: 425 1. Normalizes paths to canonical sandbox format 426 2. Translates sandbox paths to Docker paths (and vice versa) 427 3. Handles workspace-relative paths (./foo, foo) 428 4. Resolves external mount symlinks 429 5. Validates paths are within allowed boundaries 430 431 Thread Safety: 432 This class is thread-safe. The context is immutable after creation, 433 and path resolution is a pure function with no side effects. 434 435 Usage: 436 resolver = SandboxPathResolver(context) 437 438 # Normalize any path to canonical sandbox format 439 canonical = resolver.normalize("./file.txt") # → /workspace/file.txt 440 441 # Convert to Docker path for file operations 442 docker_path = resolver.sandbox_to_docker("/workspace/file.txt") 443 444 # Auto-detect context and resolve accordingly 445 actual_path = resolver.resolve("/workspace/file.txt") 446 """ 447 448 def __init__(self, context: SandboxPathContext): 449 """ 450 Initialize resolver with session-specific context. 451 452 Args: 453 context: The mount configuration for this session 454 """ 455 self._context = context 456 self._execution_context = detect_execution_context() 457 458 @property 459 def context(self) -> SandboxPathContext: 460 """Get the path context configuration.""" 461 return self._context 462 463 @property 464 def execution_context(self) -> ExecutionContext: 465 """Get the current execution context.""" 466 return self._execution_context 467 468 def normalize(self, path: str) -> str: 469 """ 470 Normalize any path to canonical sandbox format. 471 472 Handles: 473 - Relative paths: ./foo, foo → /workspace/foo 474 - Workspace paths: /workspace/foo → /workspace/foo 475 - Persistent shortcut: persistent/foo → /workspace/persistent/foo 476 - External mount shortcuts: external/ro/foo → /workspace/external/ro/foo 477 - Path normalization: /workspace/./foo/../bar → /workspace/bar 478 479 Args: 480 path: Input path in any format 481 482 Returns: 483 Canonical sandbox path (absolute, normalized) 484 485 Raises: 486 PathResolutionError: If path contains invalid components 487 """ 488 if not path: 489 raise PathResolutionError("Empty path", path="", reason="EMPTY_PATH") 490 491 path = path.strip() 492 493 # Handle null bytes (security) 494 if '\x00' in path: 495 raise PathResolutionError( 496 f"Path contains null bytes: {path!r}", 497 path=path, 498 reason="NULL_BYTES", 499 ) 500 501 # Parse as POSIX path 502 p = PurePosixPath(path) 503 path_str = str(p) 504 505 # Handle relative paths - prepend /workspace 506 if not p.is_absolute(): 507 clean_path = _strip_relative_prefix(path_str) 508 path_str = f"/workspace/{clean_path}" 509 p = PurePosixPath(path_str) 510 511 # Resolve . and .. components 512 parts = [] 513 for part in p.parts: 514 if part == "/" or part == ".": 515 # Skip root and current directory markers 516 continue 517 elif part == "..": 518 if parts: # Don't go above root 519 parts.pop() 520 # Note: We don't raise on .. that would escape root 521 # The boundary check later will catch invalid escapes 522 else: 523 parts.append(part) 524 525 # Reconstruct as absolute path 526 normalized = "/" + "/".join(parts) if parts else "/" 527 528 return normalized 529 530 def sandbox_to_docker(self, sandbox_path: str) -> str: 531 """ 532 Convert a sandbox path to Docker path. 533 534 This is used by MCP tools and API endpoints that run in Docker 535 but receive paths in sandbox format. 536 537 Args: 538 sandbox_path: Path in sandbox format (e.g., /workspace/file.txt) 539 540 Returns: 541 Equivalent path in Docker format 542 543 Raises: 544 PathResolutionError: If path is not within any allowed mount 545 """ 546 # Normalize first 547 normalized = self.normalize(sandbox_path) 548 549 # Handle persistent storage at workspace root 550 # Agent sees: /workspace/persistent/foo (symlink to /persistent in sandbox) 551 # Docker sees: /users/{user}/ag3ntum/persistent/foo 552 if normalized.startswith("/workspace/persistent/") or normalized == "/workspace/persistent": 553 if normalized == "/workspace/persistent": 554 return self._context.persistent_docker 555 suffix = normalized[len("/workspace/persistent/"):] 556 return f"{self._context.persistent_docker}/{suffix}" 557 558 # Handle special workspace external paths 559 if normalized.startswith("/workspace/external/"): 560 external_part = normalized[len("/workspace/external/"):] 561 562 if external_part.startswith("persistent/") or external_part == "persistent": 563 # DEPRECATED: /workspace/external/persistent/* is deprecated 564 # Use /workspace/persistent/* instead 565 logger.warning( 566 f"Deprecated path: {sandbox_path}. Use ./persistent/ instead of ./external/persistent/" 567 ) 568 suffix = external_part[len("persistent"):].lstrip("/") 569 if suffix: 570 return f"{self._context.persistent_docker}/{suffix}" 571 return self._context.persistent_docker 572 573 elif external_part.startswith("ro/") or external_part == "ro": 574 # Map to global read-only mount (flattened: /mounts/{name}) 575 if external_part == "ro": 576 raise PathResolutionError( 577 "Cannot resolve external/ro without mount name", 578 path=sandbox_path, 579 reason="INVALID_PATH", 580 ) 581 # Parse mount name from path: ro/{name}/... or ro/{name} 582 remaining = external_part[len("ro/"):] 583 if "/" in remaining: 584 mount_name, suffix = remaining.split("/", 1) 585 else: 586 mount_name = remaining 587 suffix = "" 588 589 if mount_name in self._context.global_mounts_ro: 590 container_path = self._context.global_mounts_ro[mount_name] 591 if suffix: 592 return f"{container_path}/{suffix}" 593 return container_path 594 else: 595 raise PathResolutionError( 596 f"Unknown global-ro mount: {mount_name}", 597 path=sandbox_path, 598 reason="UNKNOWN_MOUNT", 599 ) 600 601 elif external_part.startswith("rw/") or external_part == "rw": 602 # Map to global read-write mount (flattened: /mounts/{name}) 603 if external_part == "rw": 604 raise PathResolutionError( 605 "Cannot resolve external/rw without mount name", 606 path=sandbox_path, 607 reason="INVALID_PATH", 608 ) 609 # Parse mount name from path: rw/{name}/... or rw/{name} 610 remaining = external_part[len("rw/"):] 611 if "/" in remaining: 612 mount_name, suffix = remaining.split("/", 1) 613 else: 614 mount_name = remaining 615 suffix = "" 616 617 if mount_name in self._context.global_mounts_rw: 618 container_path = self._context.global_mounts_rw[mount_name] 619 if suffix: 620 return f"{container_path}/{suffix}" 621 return container_path 622 else: 623 raise PathResolutionError( 624 f"Unknown global-rw mount: {mount_name}", 625 path=sandbox_path, 626 reason="UNKNOWN_MOUNT", 627 ) 628 629 elif external_part.startswith("user-ro/"): 630 # Map to per-user RO mount 631 remaining = external_part[len("user-ro/"):] 632 if "/" in remaining: 633 mount_name, suffix = remaining.split("/", 1) 634 else: 635 mount_name = remaining 636 suffix = "" 637 638 if mount_name in self._context.user_mounts_ro: 639 host_path = self._context.user_mounts_ro[mount_name] 640 if suffix: 641 return f"{host_path}/{suffix}" 642 return host_path 643 else: 644 raise PathResolutionError( 645 f"Unknown user-ro mount: {mount_name}", 646 path=sandbox_path, 647 reason="UNKNOWN_MOUNT", 648 ) 649 650 elif external_part.startswith("user-rw/"): 651 # Map to per-user RW mount 652 remaining = external_part[len("user-rw/"):] 653 if "/" in remaining: 654 mount_name, suffix = remaining.split("/", 1) 655 else: 656 mount_name = remaining 657 suffix = "" 658 659 if mount_name in self._context.user_mounts_rw: 660 host_path = self._context.user_mounts_rw[mount_name] 661 if suffix: 662 return f"{host_path}/{suffix}" 663 return host_path 664 else: 665 raise PathResolutionError( 666 f"Unknown user-rw mount: {mount_name}", 667 path=sandbox_path, 668 reason="UNKNOWN_MOUNT", 669 ) 670 671 # Find matching mount 672 mount = self._context.find_mount_for_sandbox_path(normalized) 673 if mount: 674 return mount.sandbox_to_docker(normalized) 675 676 # No mount found - path is outside allowed directories 677 raise PathResolutionError( 678 f"Path not within any allowed mount: {sandbox_path}", 679 path=sandbox_path, 680 reason="OUTSIDE_MOUNTS", 681 ) 682 683 def docker_to_sandbox(self, docker_path: str) -> str: 684 """ 685 Convert a Docker path to sandbox path. 686 687 This is used to translate error messages or paths from Docker 688 processes back to the canonical sandbox format. 689 690 Args: 691 docker_path: Path in Docker format 692 693 Returns: 694 Equivalent path in sandbox format 695 696 Raises: 697 PathResolutionError: If path is not within any allowed mount 698 """ 699 # Normalize the docker path (resolve . and ..) 700 p = PurePosixPath(docker_path) 701 parts = [] 702 for part in p.parts: 703 if part == "/" or part == ".": 704 # Skip root and current directory markers 705 continue 706 elif part == "..": 707 if parts: 708 parts.pop() 709 else: 710 parts.append(part) 711 # Reconstruct as absolute path 712 normalized = "/" + "/".join(parts) if parts else "/" 713 714 # Find matching mount 715 mount = self._context.find_mount_for_docker_path(normalized) 716 if mount: 717 return mount.docker_to_sandbox(normalized) 718 719 # Check for persistent storage path 720 # Docker: /users/{user}/ag3ntum/persistent/* -> Sandbox: /workspace/persistent/* 721 if normalized.startswith(self._context.persistent_docker): 722 suffix = normalized[len(self._context.persistent_docker):].lstrip("/") 723 if suffix: 724 return f"/workspace/persistent/{suffix}" 725 return "/workspace/persistent" 726 727 raise PathResolutionError( 728 f"Docker path not within any allowed mount: {docker_path}", 729 path=docker_path, 730 reason="OUTSIDE_MOUNTS", 731 ) 732 733 def resolve(self, path: str) -> str: 734 """ 735 Resolve a sandbox path for the current execution context. 736 737 This is the main entry point for path resolution: 738 - If running in SANDBOX context: return normalized sandbox path 739 - If running in DOCKER context: translate to Docker path 740 741 Args: 742 path: Input path (can be relative or absolute) 743 744 Returns: 745 Path appropriate for current execution context 746 """ 747 normalized = self.normalize(path) 748 749 if self._execution_context == ExecutionContext.SANDBOX: 750 return normalized 751 else: 752 return self.sandbox_to_docker(normalized) 753 754 def resolve_to_docker(self, path: str) -> str: 755 """ 756 Always resolve to Docker path, regardless of context. 757 758 Use this when you explicitly need the Docker path (e.g., for 759 Python file operations that always run in Docker context). 760 761 Args: 762 path: Input path (can be relative or absolute) 763 764 Returns: 765 Docker filesystem path 766 """ 767 return self.sandbox_to_docker(path) 768 769 def is_path_writable(self, sandbox_path: str) -> bool: 770 """ 771 Check if a sandbox path is writable. 772 773 Args: 774 sandbox_path: Path in sandbox format 775 776 Returns: 777 True if path is within a writable mount 778 """ 779 try: 780 normalized = self.normalize(sandbox_path) 781 except PathResolutionError: 782 return False 783 784 # Check workspace external paths 785 if normalized.startswith("/workspace/external/"): 786 external_part = normalized[len("/workspace/external/"):] 787 788 if external_part.startswith("persistent") or external_part == "persistent": 789 return True # Persistent is writable 790 elif external_part.startswith("ro/") or external_part == "ro": 791 return False # Read-only mount 792 elif external_part.startswith("rw/") or external_part == "rw": 793 return True # Read-write mount 794 elif external_part.startswith("user-ro/"): 795 return False # Per-user read-only mount 796 elif external_part.startswith("user-rw/"): 797 return True # Per-user read-write mount 798 799 # Find matching mount 800 mount = self._context.find_mount_for_sandbox_path(normalized) 801 if mount: 802 return mount.mode == "rw" 803 804 return False 805 806 def get_mount_type(self, sandbox_path: str) -> Optional[str]: 807 """ 808 Get the mount type for a sandbox path. 809 810 Args: 811 sandbox_path: Path in sandbox format 812 813 Returns: 814 Mount type string (e.g., 'workspace', 'persistent', 'external_ro') 815 or None if path is not in any mount 816 """ 817 try: 818 normalized = self.normalize(sandbox_path) 819 except PathResolutionError: 820 return None 821 822 # Check workspace external paths 823 if normalized.startswith("/workspace/external/"): 824 external_part = normalized[len("/workspace/external/"):] 825 826 if external_part.startswith("persistent") or external_part == "persistent": 827 return "persistent" 828 elif external_part.startswith("ro/") or external_part == "ro": 829 return "external_ro" 830 elif external_part.startswith("rw/") or external_part == "rw": 831 return "external_rw" 832 elif external_part.startswith("user-ro/"): 833 return "user_mount_ro" 834 elif external_part.startswith("user-rw/"): 835 return "user_mount_rw" 836 837 mount = self._context.find_mount_for_sandbox_path(normalized) 838 if mount: 839 return mount.mount_type 840 841 return None 842 843 def translate_error_paths(self, error_message: str) -> str: 844 """ 845 Translate Docker paths in error messages to sandbox paths. 846 847 This makes error messages more user-friendly by showing paths 848 in the format the agent understands. 849 850 Args: 851 error_message: Error message that may contain Docker paths 852 853 Returns: 854 Error message with Docker paths replaced by sandbox paths 855 """ 856 # Pattern to find Docker workspace paths 857 # Matches: /users/username/sessions/session_id/workspace/... 858 workspace_pattern = re.compile( 859 r'/users/[^/]+/sessions/[^/]+/workspace(/[^\s\'"]*)?' 860 ) 861 862 def replace_workspace(match: re.Match) -> str: 863 docker_path = match.group(0) 864 try: 865 return self.docker_to_sandbox(docker_path) 866 except PathResolutionError: 867 return docker_path 868 869 result = workspace_pattern.sub(replace_workspace, error_message) 870 871 # Also translate persistent storage paths 872 persistent_pattern = re.compile( 873 r'/users/[^/]+/ag3ntum/persistent(/[^\s\'"]*)?' 874 ) 875 876 def replace_persistent(match: re.Match) -> str: 877 suffix = match.group(1) or "" 878 return f"/workspace/persistent{suffix}" 879 880 result = persistent_pattern.sub(replace_persistent, result) 881 882 return result 883 884 885 # ============================================================================= 886 # Session-Scoped Resolver Management 887 # ============================================================================= 888 889 # Session-scoped resolvers (each session has its own) 890 _session_resolvers: dict[str, SandboxPathResolver] = {} 891 892 893 def get_sandbox_path_resolver(session_id: str) -> SandboxPathResolver: 894 """ 895 Get the path resolver for a session. 896 897 Args: 898 session_id: The session ID 899 900 Returns: 901 The configured SandboxPathResolver for this session 902 903 Raises: 904 RuntimeError: If resolver not configured for this session 905 """ 906 if session_id not in _session_resolvers: 907 raise RuntimeError( 908 f"SandboxPathResolver not configured for session {session_id}. " 909 "Call configure_sandbox_path_resolver() first." 910 ) 911 return _session_resolvers[session_id] 912 913 914 def configure_sandbox_path_resolver( 915 session_id: str, 916 username: str, 917 workspace_docker: Optional[str] = None, 918 global_mounts_ro: Optional[dict[str, str]] = None, 919 global_mounts_rw: Optional[dict[str, str]] = None, 920 user_mounts_ro: Optional[dict[str, str]] = None, 921 user_mounts_rw: Optional[dict[str, str]] = None, 922 ) -> SandboxPathResolver: 923 """ 924 Configure and return path resolver for a session. 925 926 This should be called during session creation, before any file 927 operations are performed. 928 929 Args: 930 session_id: The session ID 931 username: The username for this session 932 workspace_docker: Override the Docker workspace path 933 global_mounts_ro: Global read-only mounts {name: container_path} 934 global_mounts_rw: Global read-write mounts {name: container_path} 935 user_mounts_ro: Per-user read-only mounts {name: container_path} 936 user_mounts_rw: Per-user read-write mounts {name: container_path} 937 938 Returns: 939 The configured SandboxPathResolver 940 """ 941 context = SandboxPathContext( 942 session_id=session_id, 943 username=username, 944 workspace_docker=workspace_docker or f"/users/{username}/sessions/{session_id}/workspace", 945 global_mounts_ro=global_mounts_ro or {}, 946 global_mounts_rw=global_mounts_rw or {}, 947 user_mounts_ro=user_mounts_ro or {}, 948 user_mounts_rw=user_mounts_rw or {}, 949 ) 950 951 resolver = SandboxPathResolver(context) 952 _session_resolvers[session_id] = resolver 953 954 logger.info( 955 f"SANDBOX_PATH_RESOLVER: Configured for session {session_id}, " 956 f"user={username}, context={resolver.execution_context.value}" 957 ) 958 959 return resolver 960 961 962 def cleanup_sandbox_path_resolver(session_id: str) -> None: 963 """ 964 Remove path resolver when session ends. 965 966 Args: 967 session_id: The session ID to clean up 968 """ 969 if session_id in _session_resolvers: 970 del _session_resolvers[session_id] 971 logger.info(f"SANDBOX_PATH_RESOLVER: Cleaned up resolver for session {session_id}") 972 973 974 def has_sandbox_path_resolver(session_id: str) -> bool: 975 """ 976 Check if a path resolver is configured for a session. 977 978 Args: 979 session_id: The session ID to check 980 981 Returns: 982 True if resolver is configured, False otherwise 983 """ 984 return session_id in _session_resolvers 985 986 987 # ============================================================================= 988 # Utility Functions 989 # ============================================================================= 990 991 def create_resolver_for_session( 992 session_id: str, 993 username: str, 994 workspace_path: Path, 995 user_mounts_ro: Optional[dict[str, Path]] = None, 996 user_mounts_rw: Optional[dict[str, Path]] = None, 997 ) -> SandboxPathResolver: 998 """ 999 Create a resolver with Path objects (convenience function). 1000 1001 This is a convenience wrapper that accepts Path objects and converts 1002 them to strings for the SandboxPathContext. 1003 1004 Args: 1005 session_id: The session ID 1006 username: The username 1007 workspace_path: Docker path to workspace (Path object) 1008 user_mounts_ro: Per-user RO mounts {name: Path} 1009 user_mounts_rw: Per-user RW mounts {name: Path} 1010 1011 Returns: 1012 Configured SandboxPathResolver 1013 """ 1014 return configure_sandbox_path_resolver( 1015 session_id=session_id, 1016 username=username, 1017 workspace_docker=str(workspace_path), 1018 user_mounts_ro={k: str(v) for k, v in (user_mounts_ro or {}).items()}, 1019 user_mounts_rw={k: str(v) for k, v in (user_mounts_rw or {}).items()}, 1020 )