credential_files.py
1 """File passthrough registry for remote terminal backends. 2 3 Remote backends (Docker, Modal, SSH) create sandboxes with no host files. 4 This module ensures that credential files, skill directories, and host-side 5 cache directories (documents, images, audio, screenshots) are mounted or 6 synced into those sandboxes so the agent can access them. 7 8 **Credentials and skills** — session-scoped registry fed by skill declarations 9 (``required_credential_files``) and user config (``terminal.credential_files``). 10 11 **Cache directories** — gateway-cached uploads, browser screenshots, TTS 12 audio, and processed images. Mounted read-only so the remote terminal can 13 reference files the host side created (e.g. ``unzip`` an uploaded archive). 14 15 Remote backends call :func:`get_credential_file_mounts`, 16 :func:`get_skills_directory_mount` / :func:`iter_skills_files`, and 17 :func:`get_cache_directory_mounts` / :func:`iter_cache_files` at sandbox 18 creation time and before each command (for resync on Modal). 19 """ 20 21 from __future__ import annotations 22 23 import logging 24 import os 25 from contextvars import ContextVar 26 from pathlib import Path 27 from typing import Dict, List 28 from hermes_cli.config import cfg_get 29 30 logger = logging.getLogger(__name__) 31 32 # Session-scoped list of credential files to mount. 33 # Backed by ContextVar to prevent cross-session data bleed in the gateway pipeline. 34 _registered_files_var: ContextVar[Dict[str, str]] = ContextVar("_registered_files") 35 36 37 def _get_registered() -> Dict[str, str]: 38 """Get or create the registered credential files dict for the current context/session.""" 39 try: 40 return _registered_files_var.get() 41 except LookupError: 42 val: Dict[str, str] = {} 43 _registered_files_var.set(val) 44 return val 45 46 47 # Cache for config-based file list (loaded once per process). 48 _config_files: List[Dict[str, str]] | None = None 49 50 51 def _resolve_hermes_home() -> Path: 52 from hermes_constants import get_hermes_home 53 return get_hermes_home() 54 55 56 def register_credential_file( 57 relative_path: str, 58 container_base: str = "/root/.hermes", 59 ) -> bool: 60 """Register a credential file for mounting into remote sandboxes. 61 62 *relative_path* is relative to ``HERMES_HOME`` (e.g. ``google_token.json``). 63 Returns True if the file exists on the host and was registered. 64 65 Security: rejects absolute paths and path traversal sequences (``..``). 66 The resolved host path must remain inside HERMES_HOME so that a malicious 67 skill cannot declare ``required_credential_files: ['../../.ssh/id_rsa']`` 68 and exfiltrate sensitive host files into a container sandbox. 69 """ 70 hermes_home = _resolve_hermes_home() 71 72 # Reject absolute paths — they bypass the HERMES_HOME sandbox entirely. 73 if os.path.isabs(relative_path): 74 logger.warning( 75 "credential_files: rejected absolute path %r (must be relative to HERMES_HOME)", 76 relative_path, 77 ) 78 return False 79 80 host_path = hermes_home / relative_path 81 82 # Resolve symlinks and normalise ``..`` before the containment check so 83 # that traversal like ``../. ssh/id_rsa`` cannot escape HERMES_HOME. 84 from tools.path_security import validate_within_dir 85 86 containment_error = validate_within_dir(host_path, hermes_home) 87 if containment_error: 88 logger.warning( 89 "credential_files: rejected path traversal %r (%s)", 90 relative_path, 91 containment_error, 92 ) 93 return False 94 95 resolved = host_path.resolve() 96 if not resolved.is_file(): 97 logger.debug("credential_files: skipping %s (not found)", resolved) 98 return False 99 100 container_path = f"{container_base.rstrip('/')}/{relative_path}" 101 _get_registered()[container_path] = str(resolved) 102 logger.debug("credential_files: registered %s -> %s", resolved, container_path) 103 return True 104 105 106 def register_credential_files( 107 entries: list, 108 container_base: str = "/root/.hermes", 109 ) -> List[str]: 110 """Register multiple credential files from skill frontmatter entries. 111 112 Each entry is either a string (relative path) or a dict with a ``path`` 113 key. Returns the list of relative paths that were NOT found on the host 114 (i.e. missing files). 115 """ 116 missing = [] 117 for entry in entries: 118 if isinstance(entry, str): 119 rel_path = entry.strip() 120 elif isinstance(entry, dict): 121 rel_path = (entry.get("path") or entry.get("name") or "").strip() 122 else: 123 continue 124 if not rel_path: 125 continue 126 if not register_credential_file(rel_path, container_base): 127 missing.append(rel_path) 128 return missing 129 130 131 def _load_config_files() -> List[Dict[str, str]]: 132 """Load ``terminal.credential_files`` from config.yaml (cached).""" 133 global _config_files 134 if _config_files is not None: 135 return _config_files 136 137 result: List[Dict[str, str]] = [] 138 try: 139 from hermes_cli.config import read_raw_config 140 hermes_home = _resolve_hermes_home() 141 cfg = read_raw_config() 142 cred_files = cfg_get(cfg, "terminal", "credential_files") 143 if isinstance(cred_files, list): 144 from tools.path_security import validate_within_dir 145 146 for item in cred_files: 147 if isinstance(item, str) and item.strip(): 148 rel = item.strip() 149 if os.path.isabs(rel): 150 logger.warning( 151 "credential_files: rejected absolute config path %r", rel, 152 ) 153 continue 154 host_path = hermes_home / rel 155 containment_error = validate_within_dir(host_path, hermes_home) 156 if containment_error: 157 logger.warning( 158 "credential_files: rejected config path traversal %r (%s)", 159 rel, containment_error, 160 ) 161 continue 162 resolved_path = host_path.resolve() 163 if resolved_path.is_file(): 164 container_path = f"/root/.hermes/{rel}" 165 result.append({ 166 "host_path": str(resolved_path), 167 "container_path": container_path, 168 }) 169 except Exception as e: 170 logger.warning("Could not read terminal.credential_files from config: %s", e) 171 172 _config_files = result 173 return _config_files 174 175 176 def get_credential_file_mounts() -> List[Dict[str, str]]: 177 """Return all credential files that should be mounted into remote sandboxes. 178 179 Each item has ``host_path`` and ``container_path`` keys. 180 Combines skill-registered files and user config. 181 """ 182 mounts: Dict[str, str] = {} 183 184 # Skill-registered files 185 for container_path, host_path in _get_registered().items(): 186 # Re-check existence (file may have been deleted since registration) 187 if Path(host_path).is_file(): 188 mounts[container_path] = host_path 189 190 # Config-based files 191 for entry in _load_config_files(): 192 cp = entry["container_path"] 193 if cp not in mounts and Path(entry["host_path"]).is_file(): 194 mounts[cp] = entry["host_path"] 195 196 return [ 197 {"host_path": hp, "container_path": cp} 198 for cp, hp in mounts.items() 199 ] 200 201 202 def get_skills_directory_mount( 203 container_base: str = "/root/.hermes", 204 ) -> list[Dict[str, str]]: 205 """Return mount info for all skill directories (local + external). 206 207 Skills may include ``scripts/``, ``templates/``, and ``references/`` 208 subdirectories that the agent needs to execute inside remote sandboxes. 209 210 **Security:** Bind mounts follow symlinks, so a malicious symlink inside 211 the skills tree could expose arbitrary host files to the container. When 212 symlinks are detected, this function creates a sanitized copy (regular 213 files only) in a temp directory and returns that path instead. When no 214 symlinks are present (the common case), the original directory is returned 215 directly with zero overhead. 216 217 Returns a list of dicts with ``host_path`` and ``container_path`` keys. 218 The local skills dir mounts at ``<container_base>/skills``, external dirs 219 at ``<container_base>/external_skills/<index>``. 220 """ 221 mounts = [] 222 hermes_home = _resolve_hermes_home() 223 skills_dir = hermes_home / "skills" 224 if skills_dir.is_dir(): 225 host_path = _safe_skills_path(skills_dir) 226 mounts.append({ 227 "host_path": host_path, 228 "container_path": f"{container_base.rstrip('/')}/skills", 229 }) 230 231 # Mount external skill dirs 232 try: 233 from agent.skill_utils import get_external_skills_dirs 234 for idx, ext_dir in enumerate(get_external_skills_dirs()): 235 if ext_dir.is_dir(): 236 host_path = _safe_skills_path(ext_dir) 237 mounts.append({ 238 "host_path": host_path, 239 "container_path": f"{container_base.rstrip('/')}/external_skills/{idx}", 240 }) 241 except ImportError: 242 pass 243 244 return mounts 245 246 247 _safe_skills_tempdir: Path | None = None 248 249 250 def _safe_skills_path(skills_dir: Path) -> str: 251 """Return *skills_dir* if symlink-free, else a sanitized temp copy.""" 252 global _safe_skills_tempdir 253 254 symlinks = [p for p in skills_dir.rglob("*") if p.is_symlink()] 255 if not symlinks: 256 return str(skills_dir) 257 258 for link in symlinks: 259 logger.warning("credential_files: skipping symlink in skills dir: %s -> %s", 260 link, os.readlink(link)) 261 262 import atexit 263 import shutil 264 import tempfile 265 266 # Reuse the same temp dir across calls to avoid accumulation. 267 if _safe_skills_tempdir and _safe_skills_tempdir.is_dir(): 268 shutil.rmtree(_safe_skills_tempdir, ignore_errors=True) 269 270 safe_dir = Path(tempfile.mkdtemp(prefix="hermes-skills-safe-")) 271 _safe_skills_tempdir = safe_dir 272 273 for item in skills_dir.rglob("*"): 274 if item.is_symlink(): 275 continue 276 rel = item.relative_to(skills_dir) 277 target = safe_dir / rel 278 if item.is_dir(): 279 target.mkdir(parents=True, exist_ok=True) 280 elif item.is_file(): 281 target.parent.mkdir(parents=True, exist_ok=True) 282 shutil.copy2(str(item), str(target)) 283 284 def _cleanup(): 285 if safe_dir.is_dir(): 286 shutil.rmtree(safe_dir, ignore_errors=True) 287 288 atexit.register(_cleanup) 289 logger.info("credential_files: created symlink-safe skills copy at %s", safe_dir) 290 return str(safe_dir) 291 292 293 def iter_skills_files( 294 container_base: str = "/root/.hermes", 295 ) -> List[Dict[str, str]]: 296 """Yield individual (host_path, container_path) entries for skills files. 297 298 Includes both the local skills dir and any external dirs configured via 299 skills.external_dirs. Skips symlinks entirely. Preferred for backends 300 that upload files individually (Daytona, Modal) rather than mounting a 301 directory. 302 """ 303 result: List[Dict[str, str]] = [] 304 305 hermes_home = _resolve_hermes_home() 306 skills_dir = hermes_home / "skills" 307 if skills_dir.is_dir(): 308 container_root = f"{container_base.rstrip('/')}/skills" 309 for item in skills_dir.rglob("*"): 310 if item.is_symlink() or not item.is_file(): 311 continue 312 rel = item.relative_to(skills_dir) 313 result.append({ 314 "host_path": str(item), 315 "container_path": f"{container_root}/{rel}", 316 }) 317 318 # Include external skill dirs 319 try: 320 from agent.skill_utils import get_external_skills_dirs 321 for idx, ext_dir in enumerate(get_external_skills_dirs()): 322 if not ext_dir.is_dir(): 323 continue 324 container_root = f"{container_base.rstrip('/')}/external_skills/{idx}" 325 for item in ext_dir.rglob("*"): 326 if item.is_symlink() or not item.is_file(): 327 continue 328 rel = item.relative_to(ext_dir) 329 result.append({ 330 "host_path": str(item), 331 "container_path": f"{container_root}/{rel}", 332 }) 333 except ImportError: 334 pass 335 336 return result 337 338 339 # --------------------------------------------------------------------------- 340 # Cache directory mounts (documents, images, audio, screenshots) 341 # --------------------------------------------------------------------------- 342 343 # The four cache subdirectories that should be mirrored into remote backends. 344 # Each tuple is (new_subpath, old_name) matching hermes_constants.get_hermes_dir(). 345 _CACHE_DIRS: list[tuple[str, str]] = [ 346 ("cache/documents", "document_cache"), 347 ("cache/images", "image_cache"), 348 ("cache/audio", "audio_cache"), 349 ("cache/screenshots", "browser_screenshots"), 350 ] 351 352 353 def get_cache_directory_mounts( 354 container_base: str = "/root/.hermes", 355 ) -> List[Dict[str, str]]: 356 """Return mount entries for each cache directory that exists on disk. 357 358 Used by Docker to create bind mounts. Each entry has ``host_path`` and 359 ``container_path`` keys. The host path is resolved via 360 ``get_hermes_dir()`` for backward compatibility with old directory layouts. 361 """ 362 from hermes_constants import get_hermes_dir 363 364 mounts: List[Dict[str, str]] = [] 365 for new_subpath, old_name in _CACHE_DIRS: 366 host_dir = get_hermes_dir(new_subpath, old_name) 367 if host_dir.is_dir(): 368 # Always map to the *new* container layout regardless of host layout. 369 container_path = f"{container_base.rstrip('/')}/{new_subpath}" 370 mounts.append({ 371 "host_path": str(host_dir), 372 "container_path": container_path, 373 }) 374 return mounts 375 376 377 def iter_cache_files( 378 container_base: str = "/root/.hermes", 379 ) -> List[Dict[str, str]]: 380 """Return individual (host_path, container_path) entries for cache files. 381 382 Used by Modal to upload files individually and resync before each command. 383 Skips symlinks. The container paths use the new ``cache/<subdir>`` layout. 384 """ 385 from hermes_constants import get_hermes_dir 386 387 result: List[Dict[str, str]] = [] 388 for new_subpath, old_name in _CACHE_DIRS: 389 host_dir = get_hermes_dir(new_subpath, old_name) 390 if not host_dir.is_dir(): 391 continue 392 container_root = f"{container_base.rstrip('/')}/{new_subpath}" 393 for item in host_dir.rglob("*"): 394 if item.is_symlink() or not item.is_file(): 395 continue 396 rel = item.relative_to(host_dir) 397 result.append({ 398 "host_path": str(item), 399 "container_path": f"{container_root}/{rel}", 400 }) 401 return result 402 403 404 def clear_credential_files() -> None: 405 """Reset the skill-scoped registry (e.g. on session reset).""" 406 _get_registered().clear() 407 408