/ src / core / mount_path_encoder.py
mount_path_encoder.py
  1  """
  2  Mount path encoder for original-path mounts.
  3  
  4  Provides encoding/decoding functions for paths that need to be accessible
  5  at their original locations (e.g., /var/log) within the sandbox.
  6  
  7  Path Encoding:
  8      /var/log -> _var_log
  9      /data/output -> _data_output
 10      /home/user/docs -> _home_user_docs
 11  
 12  Docker Mount Path:
 13      /var/log -> /mounts/paths/_var_log
 14  """
 15  from __future__ import annotations
 16  
 17  import re
 18  from pathlib import Path
 19  
 20  # Reserved paths that cannot be mounted (system-critical)
 21  RESERVED_PATHS = frozenset({
 22      "/",
 23      "/bin",
 24      "/sbin",
 25      "/lib",
 26      "/lib64",
 27      "/usr",
 28      "/etc",
 29      "/proc",
 30      "/sys",
 31      "/dev",
 32      "/tmp",
 33      "/workspace",  # Agent workspace
 34      "/mounts",     # Mount namespace
 35      "/persistent", # Persistent storage
 36      "/skills",     # Skills directory
 37      "/venv",       # Python venv
 38      "/root",       # Root home
 39  })
 40  
 41  # Paths that are always blocked (even as subpaths)
 42  BLOCKED_PATTERNS = (
 43      r"^/proc(/|$)",
 44      r"^/sys(/|$)",
 45      r"^/dev(/|$)",
 46  )
 47  
 48  
 49  def encode_path(path: str) -> str:
 50      """
 51      Encode a path for use as a mount directory name.
 52  
 53      Replaces slashes with underscores.
 54  
 55      Args:
 56          path: The original path (e.g., "/var/log")
 57  
 58      Returns:
 59          The encoded name (e.g., "_var_log")
 60  
 61      Examples:
 62          >>> encode_path("/var/log")
 63          '_var_log'
 64          >>> encode_path("/data/output")
 65          '_data_output'
 66      """
 67      # Normalize the path (remove trailing slashes, resolve .)
 68      normalized = str(Path(path).resolve()) if path else ""
 69  
 70      # Replace slashes with underscores
 71      return normalized.replace("/", "_")
 72  
 73  
 74  def decode_path(encoded: str) -> str:
 75      """
 76      Decode an encoded path back to the original.
 77  
 78      Replaces underscores with slashes.
 79  
 80      Args:
 81          encoded: The encoded name (e.g., "_var_log")
 82  
 83      Returns:
 84          The original path (e.g., "/var/log")
 85  
 86      Examples:
 87          >>> decode_path("_var_log")
 88          '/var/log'
 89          >>> decode_path("_data_output")
 90          '/data/output'
 91      """
 92      # Replace underscores with slashes
 93      # Handle leading underscore -> leading slash
 94      if encoded.startswith("_"):
 95          return encoded.replace("_", "/")
 96      return "/" + encoded.replace("_", "/")
 97  
 98  
 99  def to_docker_path(original_path: str) -> str:
100      """
101      Convert an original path to its Docker mount path.
102  
103      Args:
104          original_path: The original filesystem path (e.g., "/var/log")
105  
106      Returns:
107          The Docker container path (e.g., "/mounts/paths/_var_log")
108  
109      Examples:
110          >>> to_docker_path("/var/log")
111          '/mounts/paths/_var_log'
112      """
113      encoded = encode_path(original_path)
114      return f"/mounts/paths/{encoded}"
115  
116  
117  def from_docker_path(docker_path: str) -> str | None:
118      """
119      Extract the original path from a Docker mount path.
120  
121      Args:
122          docker_path: The Docker container path (e.g., "/mounts/paths/_var_log")
123  
124      Returns:
125          The original path (e.g., "/var/log"), or None if not a valid mount path
126      """
127      prefix = "/mounts/paths/"
128      if not docker_path.startswith(prefix):
129          return None
130  
131      encoded = docker_path[len(prefix):]
132      # Remove any subpath (we just want the mount root)
133      if "/" in encoded:
134          encoded = encoded.split("/")[0]
135  
136      return decode_path(encoded)
137  
138  
139  def is_reserved_path(path: str) -> bool:
140      """
141      Check if a path is in the reserved blocklist.
142  
143      Reserved paths are system-critical and cannot be mounted.
144  
145      Args:
146          path: The path to check
147  
148      Returns:
149          True if the path is reserved (blocked)
150      """
151      normalized = str(Path(path).resolve()) if path else ""
152  
153      # Check exact matches
154      if normalized in RESERVED_PATHS:
155          return True
156  
157      # Check blocked patterns
158      return any(re.match(pattern, normalized) for pattern in BLOCKED_PATTERNS)
159  
160  
161  def is_path_under_reserved(path: str) -> bool:
162      """
163      Check if a path is under any reserved path.
164  
165      Args:
166          path: The path to check
167  
168      Returns:
169          True if the path is under a reserved directory
170      """
171      normalized = str(Path(path).resolve()) if path else ""
172      return any(normalized.startswith(reserved + "/") for reserved in RESERVED_PATHS)
173  
174  
175  def validate_original_path(path: str) -> tuple[bool, str | None]:
176      """
177      Validate that a path can be used as an original-path mount.
178  
179      Args:
180          path: The path to validate
181  
182      Returns:
183          Tuple of (is_valid, error_message)
184      """
185      if not path:
186          return False, "Path is empty"
187  
188      if not path.startswith("/"):
189          return False, "Path must be absolute"
190  
191      if is_reserved_path(path):
192          return False, f"Path '{path}' is reserved and cannot be mounted"
193  
194      # Breadth validation: reject overly broad mounts
195      depth = len([p for p in path.strip("/").split("/") if p])
196      if depth < 2:
197          return False, (
198              f"Path '{path}' is too broad (depth {depth}, minimum 2). "
199              f"Mount a more specific path like '{path}/subdir'"
200          )
201  
202      # Check for encoding collisions
203      # (paths that encode to the same value)
204      encoded = encode_path(path)
205      decoded = decode_path(encoded)
206      if decoded != path:
207          return False, f"Path '{path}' cannot be safely encoded (collision)"
208  
209      return True, None
210  
211  
212  def check_encoding_collision(path1: str, path2: str) -> bool:
213      """
214      Check if two paths would encode to the same value.
215  
216      Args:
217          path1: First path
218          path2: Second path
219  
220      Returns:
221          True if there would be a collision
222      """
223      return encode_path(path1) == encode_path(path2) and path1 != path2