/ tools / credential_files.py
credential_files.py
  1  """File passthrough registry for remote terminal backends.
  2  
  3  Remote backends (Docker, Modal, SSH) create sandboxes with no host files.
  4  This module ensures that credential files, skill directories, and host-side
  5  cache directories (documents, images, audio, screenshots) are mounted or
  6  synced into those sandboxes so the agent can access them.
  7  
  8  **Credentials and skills** — session-scoped registry fed by skill declarations
  9  (``required_credential_files``) and user config (``terminal.credential_files``).
 10  
 11  **Cache directories** — gateway-cached uploads, browser screenshots, TTS
 12  audio, and processed images.  Mounted read-only so the remote terminal can
 13  reference files the host side created (e.g. ``unzip`` an uploaded archive).
 14  
 15  Remote backends call :func:`get_credential_file_mounts`,
 16  :func:`get_skills_directory_mount` / :func:`iter_skills_files`, and
 17  :func:`get_cache_directory_mounts` / :func:`iter_cache_files` at sandbox
 18  creation time and before each command (for resync on Modal).
 19  """
 20  
 21  from __future__ import annotations
 22  
 23  import logging
 24  import os
 25  from contextvars import ContextVar
 26  from pathlib import Path
 27  from typing import Dict, List
 28  from hermes_cli.config import cfg_get
 29  
 30  logger = logging.getLogger(__name__)
 31  
 32  # Session-scoped list of credential files to mount.
 33  # Backed by ContextVar to prevent cross-session data bleed in the gateway pipeline.
 34  _registered_files_var: ContextVar[Dict[str, str]] = ContextVar("_registered_files")
 35  
 36  
 37  def _get_registered() -> Dict[str, str]:
 38      """Get or create the registered credential files dict for the current context/session."""
 39      try:
 40          return _registered_files_var.get()
 41      except LookupError:
 42          val: Dict[str, str] = {}
 43          _registered_files_var.set(val)
 44          return val
 45  
 46  
 47  # Cache for config-based file list (loaded once per process).
 48  _config_files: List[Dict[str, str]] | None = None
 49  
 50  
 51  def _resolve_hermes_home() -> Path:
 52      from hermes_constants import get_hermes_home
 53      return get_hermes_home()
 54  
 55  
 56  def register_credential_file(
 57      relative_path: str,
 58      container_base: str = "/root/.hermes",
 59  ) -> bool:
 60      """Register a credential file for mounting into remote sandboxes.
 61  
 62      *relative_path* is relative to ``HERMES_HOME`` (e.g. ``google_token.json``).
 63      Returns True if the file exists on the host and was registered.
 64  
 65      Security: rejects absolute paths and path traversal sequences (``..``).
 66      The resolved host path must remain inside HERMES_HOME so that a malicious
 67      skill cannot declare ``required_credential_files: ['../../.ssh/id_rsa']``
 68      and exfiltrate sensitive host files into a container sandbox.
 69      """
 70      hermes_home = _resolve_hermes_home()
 71  
 72      # Reject absolute paths — they bypass the HERMES_HOME sandbox entirely.
 73      if os.path.isabs(relative_path):
 74          logger.warning(
 75              "credential_files: rejected absolute path %r (must be relative to HERMES_HOME)",
 76              relative_path,
 77          )
 78          return False
 79  
 80      host_path = hermes_home / relative_path
 81  
 82      # Resolve symlinks and normalise ``..`` before the containment check so
 83      # that traversal like ``../. ssh/id_rsa`` cannot escape HERMES_HOME.
 84      from tools.path_security import validate_within_dir
 85  
 86      containment_error = validate_within_dir(host_path, hermes_home)
 87      if containment_error:
 88          logger.warning(
 89              "credential_files: rejected path traversal %r (%s)",
 90              relative_path,
 91              containment_error,
 92          )
 93          return False
 94  
 95      resolved = host_path.resolve()
 96      if not resolved.is_file():
 97          logger.debug("credential_files: skipping %s (not found)", resolved)
 98          return False
 99  
100      container_path = f"{container_base.rstrip('/')}/{relative_path}"
101      _get_registered()[container_path] = str(resolved)
102      logger.debug("credential_files: registered %s -> %s", resolved, container_path)
103      return True
104  
105  
106  def register_credential_files(
107      entries: list,
108      container_base: str = "/root/.hermes",
109  ) -> List[str]:
110      """Register multiple credential files from skill frontmatter entries.
111  
112      Each entry is either a string (relative path) or a dict with a ``path``
113      key.  Returns the list of relative paths that were NOT found on the host
114      (i.e. missing files).
115      """
116      missing = []
117      for entry in entries:
118          if isinstance(entry, str):
119              rel_path = entry.strip()
120          elif isinstance(entry, dict):
121              rel_path = (entry.get("path") or entry.get("name") or "").strip()
122          else:
123              continue
124          if not rel_path:
125              continue
126          if not register_credential_file(rel_path, container_base):
127              missing.append(rel_path)
128      return missing
129  
130  
131  def _load_config_files() -> List[Dict[str, str]]:
132      """Load ``terminal.credential_files`` from config.yaml (cached)."""
133      global _config_files
134      if _config_files is not None:
135          return _config_files
136  
137      result: List[Dict[str, str]] = []
138      try:
139          from hermes_cli.config import read_raw_config
140          hermes_home = _resolve_hermes_home()
141          cfg = read_raw_config()
142          cred_files = cfg_get(cfg, "terminal", "credential_files")
143          if isinstance(cred_files, list):
144              from tools.path_security import validate_within_dir
145  
146              for item in cred_files:
147                  if isinstance(item, str) and item.strip():
148                      rel = item.strip()
149                      if os.path.isabs(rel):
150                          logger.warning(
151                              "credential_files: rejected absolute config path %r", rel,
152                          )
153                          continue
154                      host_path = hermes_home / rel
155                      containment_error = validate_within_dir(host_path, hermes_home)
156                      if containment_error:
157                          logger.warning(
158                              "credential_files: rejected config path traversal %r (%s)",
159                              rel, containment_error,
160                          )
161                          continue
162                      resolved_path = host_path.resolve()
163                      if resolved_path.is_file():
164                          container_path = f"/root/.hermes/{rel}"
165                          result.append({
166                              "host_path": str(resolved_path),
167                              "container_path": container_path,
168                          })
169      except Exception as e:
170          logger.warning("Could not read terminal.credential_files from config: %s", e)
171  
172      _config_files = result
173      return _config_files
174  
175  
176  def get_credential_file_mounts() -> List[Dict[str, str]]:
177      """Return all credential files that should be mounted into remote sandboxes.
178  
179      Each item has ``host_path`` and ``container_path`` keys.
180      Combines skill-registered files and user config.
181      """
182      mounts: Dict[str, str] = {}
183  
184      # Skill-registered files
185      for container_path, host_path in _get_registered().items():
186          # Re-check existence (file may have been deleted since registration)
187          if Path(host_path).is_file():
188              mounts[container_path] = host_path
189  
190      # Config-based files
191      for entry in _load_config_files():
192          cp = entry["container_path"]
193          if cp not in mounts and Path(entry["host_path"]).is_file():
194              mounts[cp] = entry["host_path"]
195  
196      return [
197          {"host_path": hp, "container_path": cp}
198          for cp, hp in mounts.items()
199      ]
200  
201  
202  def get_skills_directory_mount(
203      container_base: str = "/root/.hermes",
204  ) -> list[Dict[str, str]]:
205      """Return mount info for all skill directories (local + external).
206  
207      Skills may include ``scripts/``, ``templates/``, and ``references/``
208      subdirectories that the agent needs to execute inside remote sandboxes.
209  
210      **Security:** Bind mounts follow symlinks, so a malicious symlink inside
211      the skills tree could expose arbitrary host files to the container.  When
212      symlinks are detected, this function creates a sanitized copy (regular
213      files only) in a temp directory and returns that path instead.  When no
214      symlinks are present (the common case), the original directory is returned
215      directly with zero overhead.
216  
217      Returns a list of dicts with ``host_path`` and ``container_path`` keys.
218      The local skills dir mounts at ``<container_base>/skills``, external dirs
219      at ``<container_base>/external_skills/<index>``.
220      """
221      mounts = []
222      hermes_home = _resolve_hermes_home()
223      skills_dir = hermes_home / "skills"
224      if skills_dir.is_dir():
225          host_path = _safe_skills_path(skills_dir)
226          mounts.append({
227              "host_path": host_path,
228              "container_path": f"{container_base.rstrip('/')}/skills",
229          })
230  
231      # Mount external skill dirs
232      try:
233          from agent.skill_utils import get_external_skills_dirs
234          for idx, ext_dir in enumerate(get_external_skills_dirs()):
235              if ext_dir.is_dir():
236                  host_path = _safe_skills_path(ext_dir)
237                  mounts.append({
238                      "host_path": host_path,
239                      "container_path": f"{container_base.rstrip('/')}/external_skills/{idx}",
240                  })
241      except ImportError:
242          pass
243  
244      return mounts
245  
246  
247  _safe_skills_tempdir: Path | None = None
248  
249  
250  def _safe_skills_path(skills_dir: Path) -> str:
251      """Return *skills_dir* if symlink-free, else a sanitized temp copy."""
252      global _safe_skills_tempdir
253  
254      symlinks = [p for p in skills_dir.rglob("*") if p.is_symlink()]
255      if not symlinks:
256          return str(skills_dir)
257  
258      for link in symlinks:
259          logger.warning("credential_files: skipping symlink in skills dir: %s -> %s",
260                         link, os.readlink(link))
261  
262      import atexit
263      import shutil
264      import tempfile
265  
266      # Reuse the same temp dir across calls to avoid accumulation.
267      if _safe_skills_tempdir and _safe_skills_tempdir.is_dir():
268          shutil.rmtree(_safe_skills_tempdir, ignore_errors=True)
269  
270      safe_dir = Path(tempfile.mkdtemp(prefix="hermes-skills-safe-"))
271      _safe_skills_tempdir = safe_dir
272  
273      for item in skills_dir.rglob("*"):
274          if item.is_symlink():
275              continue
276          rel = item.relative_to(skills_dir)
277          target = safe_dir / rel
278          if item.is_dir():
279              target.mkdir(parents=True, exist_ok=True)
280          elif item.is_file():
281              target.parent.mkdir(parents=True, exist_ok=True)
282              shutil.copy2(str(item), str(target))
283  
284      def _cleanup():
285          if safe_dir.is_dir():
286              shutil.rmtree(safe_dir, ignore_errors=True)
287  
288      atexit.register(_cleanup)
289      logger.info("credential_files: created symlink-safe skills copy at %s", safe_dir)
290      return str(safe_dir)
291  
292  
293  def iter_skills_files(
294      container_base: str = "/root/.hermes",
295  ) -> List[Dict[str, str]]:
296      """Yield individual (host_path, container_path) entries for skills files.
297  
298      Includes both the local skills dir and any external dirs configured via
299      skills.external_dirs.  Skips symlinks entirely.  Preferred for backends
300      that upload files individually (Daytona, Modal) rather than mounting a
301      directory.
302      """
303      result: List[Dict[str, str]] = []
304  
305      hermes_home = _resolve_hermes_home()
306      skills_dir = hermes_home / "skills"
307      if skills_dir.is_dir():
308          container_root = f"{container_base.rstrip('/')}/skills"
309          for item in skills_dir.rglob("*"):
310              if item.is_symlink() or not item.is_file():
311                  continue
312              rel = item.relative_to(skills_dir)
313              result.append({
314                  "host_path": str(item),
315                  "container_path": f"{container_root}/{rel}",
316              })
317  
318      # Include external skill dirs
319      try:
320          from agent.skill_utils import get_external_skills_dirs
321          for idx, ext_dir in enumerate(get_external_skills_dirs()):
322              if not ext_dir.is_dir():
323                  continue
324              container_root = f"{container_base.rstrip('/')}/external_skills/{idx}"
325              for item in ext_dir.rglob("*"):
326                  if item.is_symlink() or not item.is_file():
327                      continue
328                  rel = item.relative_to(ext_dir)
329                  result.append({
330                      "host_path": str(item),
331                      "container_path": f"{container_root}/{rel}",
332                  })
333      except ImportError:
334          pass
335  
336      return result
337  
338  
339  # ---------------------------------------------------------------------------
340  # Cache directory mounts (documents, images, audio, screenshots)
341  # ---------------------------------------------------------------------------
342  
343  # The four cache subdirectories that should be mirrored into remote backends.
344  # Each tuple is (new_subpath, old_name) matching hermes_constants.get_hermes_dir().
345  _CACHE_DIRS: list[tuple[str, str]] = [
346      ("cache/documents", "document_cache"),
347      ("cache/images", "image_cache"),
348      ("cache/audio", "audio_cache"),
349      ("cache/screenshots", "browser_screenshots"),
350  ]
351  
352  
353  def get_cache_directory_mounts(
354      container_base: str = "/root/.hermes",
355  ) -> List[Dict[str, str]]:
356      """Return mount entries for each cache directory that exists on disk.
357  
358      Used by Docker to create bind mounts.  Each entry has ``host_path`` and
359      ``container_path`` keys.  The host path is resolved via
360      ``get_hermes_dir()`` for backward compatibility with old directory layouts.
361      """
362      from hermes_constants import get_hermes_dir
363  
364      mounts: List[Dict[str, str]] = []
365      for new_subpath, old_name in _CACHE_DIRS:
366          host_dir = get_hermes_dir(new_subpath, old_name)
367          if host_dir.is_dir():
368              # Always map to the *new* container layout regardless of host layout.
369              container_path = f"{container_base.rstrip('/')}/{new_subpath}"
370              mounts.append({
371                  "host_path": str(host_dir),
372                  "container_path": container_path,
373              })
374      return mounts
375  
376  
377  def iter_cache_files(
378      container_base: str = "/root/.hermes",
379  ) -> List[Dict[str, str]]:
380      """Return individual (host_path, container_path) entries for cache files.
381  
382      Used by Modal to upload files individually and resync before each command.
383      Skips symlinks.  The container paths use the new ``cache/<subdir>`` layout.
384      """
385      from hermes_constants import get_hermes_dir
386  
387      result: List[Dict[str, str]] = []
388      for new_subpath, old_name in _CACHE_DIRS:
389          host_dir = get_hermes_dir(new_subpath, old_name)
390          if not host_dir.is_dir():
391              continue
392          container_root = f"{container_base.rstrip('/')}/{new_subpath}"
393          for item in host_dir.rglob("*"):
394              if item.is_symlink() or not item.is_file():
395                  continue
396              rel = item.relative_to(host_dir)
397              result.append({
398                  "host_path": str(item),
399                  "container_path": f"{container_root}/{rel}",
400              })
401      return result
402  
403  
404  def clear_credential_files() -> None:
405      """Reset the skill-scoped registry (e.g. on session reset)."""
406      _get_registered().clear()
407  
408