mount_path_encoder.py
1 """ 2 Mount path encoder for original-path mounts. 3 4 Provides encoding/decoding functions for paths that need to be accessible 5 at their original locations (e.g., /var/log) within the sandbox. 6 7 Path Encoding: 8 /var/log -> _var_log 9 /data/output -> _data_output 10 /home/user/docs -> _home_user_docs 11 12 Docker Mount Path: 13 /var/log -> /mounts/paths/_var_log 14 """ 15 from __future__ import annotations 16 17 import re 18 from pathlib import Path 19 20 # Reserved paths that cannot be mounted (system-critical) 21 RESERVED_PATHS = frozenset({ 22 "/", 23 "/bin", 24 "/sbin", 25 "/lib", 26 "/lib64", 27 "/usr", 28 "/etc", 29 "/proc", 30 "/sys", 31 "/dev", 32 "/tmp", 33 "/workspace", # Agent workspace 34 "/mounts", # Mount namespace 35 "/persistent", # Persistent storage 36 "/skills", # Skills directory 37 "/venv", # Python venv 38 "/root", # Root home 39 }) 40 41 # Paths that are always blocked (even as subpaths) 42 BLOCKED_PATTERNS = ( 43 r"^/proc(/|$)", 44 r"^/sys(/|$)", 45 r"^/dev(/|$)", 46 ) 47 48 49 def encode_path(path: str) -> str: 50 """ 51 Encode a path for use as a mount directory name. 52 53 Replaces slashes with underscores. 54 55 Args: 56 path: The original path (e.g., "/var/log") 57 58 Returns: 59 The encoded name (e.g., "_var_log") 60 61 Examples: 62 >>> encode_path("/var/log") 63 '_var_log' 64 >>> encode_path("/data/output") 65 '_data_output' 66 """ 67 # Normalize the path (remove trailing slashes, resolve .) 68 normalized = str(Path(path).resolve()) if path else "" 69 70 # Replace slashes with underscores 71 return normalized.replace("/", "_") 72 73 74 def decode_path(encoded: str) -> str: 75 """ 76 Decode an encoded path back to the original. 77 78 Replaces underscores with slashes. 79 80 Args: 81 encoded: The encoded name (e.g., "_var_log") 82 83 Returns: 84 The original path (e.g., "/var/log") 85 86 Examples: 87 >>> decode_path("_var_log") 88 '/var/log' 89 >>> decode_path("_data_output") 90 '/data/output' 91 """ 92 # Replace underscores with slashes 93 # Handle leading underscore -> leading slash 94 if encoded.startswith("_"): 95 return encoded.replace("_", "/") 96 return "/" + encoded.replace("_", "/") 97 98 99 def to_docker_path(original_path: str) -> str: 100 """ 101 Convert an original path to its Docker mount path. 102 103 Args: 104 original_path: The original filesystem path (e.g., "/var/log") 105 106 Returns: 107 The Docker container path (e.g., "/mounts/paths/_var_log") 108 109 Examples: 110 >>> to_docker_path("/var/log") 111 '/mounts/paths/_var_log' 112 """ 113 encoded = encode_path(original_path) 114 return f"/mounts/paths/{encoded}" 115 116 117 def from_docker_path(docker_path: str) -> str | None: 118 """ 119 Extract the original path from a Docker mount path. 120 121 Args: 122 docker_path: The Docker container path (e.g., "/mounts/paths/_var_log") 123 124 Returns: 125 The original path (e.g., "/var/log"), or None if not a valid mount path 126 """ 127 prefix = "/mounts/paths/" 128 if not docker_path.startswith(prefix): 129 return None 130 131 encoded = docker_path[len(prefix):] 132 # Remove any subpath (we just want the mount root) 133 if "/" in encoded: 134 encoded = encoded.split("/")[0] 135 136 return decode_path(encoded) 137 138 139 def is_reserved_path(path: str) -> bool: 140 """ 141 Check if a path is in the reserved blocklist. 142 143 Reserved paths are system-critical and cannot be mounted. 144 145 Args: 146 path: The path to check 147 148 Returns: 149 True if the path is reserved (blocked) 150 """ 151 normalized = str(Path(path).resolve()) if path else "" 152 153 # Check exact matches 154 if normalized in RESERVED_PATHS: 155 return True 156 157 # Check blocked patterns 158 return any(re.match(pattern, normalized) for pattern in BLOCKED_PATTERNS) 159 160 161 def is_path_under_reserved(path: str) -> bool: 162 """ 163 Check if a path is under any reserved path. 164 165 Args: 166 path: The path to check 167 168 Returns: 169 True if the path is under a reserved directory 170 """ 171 normalized = str(Path(path).resolve()) if path else "" 172 return any(normalized.startswith(reserved + "/") for reserved in RESERVED_PATHS) 173 174 175 def validate_original_path(path: str) -> tuple[bool, str | None]: 176 """ 177 Validate that a path can be used as an original-path mount. 178 179 Args: 180 path: The path to validate 181 182 Returns: 183 Tuple of (is_valid, error_message) 184 """ 185 if not path: 186 return False, "Path is empty" 187 188 if not path.startswith("/"): 189 return False, "Path must be absolute" 190 191 if is_reserved_path(path): 192 return False, f"Path '{path}' is reserved and cannot be mounted" 193 194 # Breadth validation: reject overly broad mounts 195 depth = len([p for p in path.strip("/").split("/") if p]) 196 if depth < 2: 197 return False, ( 198 f"Path '{path}' is too broad (depth {depth}, minimum 2). " 199 f"Mount a more specific path like '{path}/subdir'" 200 ) 201 202 # Check for encoding collisions 203 # (paths that encode to the same value) 204 encoded = encode_path(path) 205 decoded = decode_path(encoded) 206 if decoded != path: 207 return False, f"Path '{path}' cannot be safely encoded (collision)" 208 209 return True, None 210 211 212 def check_encoding_collision(path1: str, path2: str) -> bool: 213 """ 214 Check if two paths would encode to the same value. 215 216 Args: 217 path1: First path 218 path2: Second path 219 220 Returns: 221 True if there would be a collision 222 """ 223 return encode_path(path1) == encode_path(path2) and path1 != path2