/ utils.py
utils.py
  1  """Shared utility functions for hermes-agent."""
  2  
  3  import json
  4  import logging
  5  import os
  6  import stat
  7  import tempfile
  8  from pathlib import Path
  9  from typing import Any, Union
 10  from urllib.parse import urlparse
 11  
 12  import yaml
 13  
 14  logger = logging.getLogger(__name__)
 15  
 16  
 17  TRUTHY_STRINGS = frozenset({"1", "true", "yes", "on"})
 18  
 19  
 20  def is_truthy_value(value: Any, default: bool = False) -> bool:
 21      """Coerce bool-ish values using the project's shared truthy string set."""
 22      if value is None:
 23          return default
 24      if isinstance(value, bool):
 25          return value
 26      if isinstance(value, str):
 27          return value.strip().lower() in TRUTHY_STRINGS
 28      return bool(value)
 29  
 30  
 31  def env_var_enabled(name: str, default: str = "") -> bool:
 32      """Return True when an environment variable is set to a truthy value."""
 33      return is_truthy_value(os.getenv(name, default), default=False)
 34  
 35  
 36  def _preserve_file_mode(path: Path) -> "int | None":
 37      """Capture the permission bits of *path* if it exists, else ``None``."""
 38      try:
 39          return stat.S_IMODE(path.stat().st_mode) if path.exists() else None
 40      except OSError:
 41          return None
 42  
 43  
 44  def _restore_file_mode(path: Path, mode: "int | None") -> None:
 45      """Re-apply *mode* to *path* after an atomic replace.
 46  
 47      ``tempfile.mkstemp`` creates files with 0o600 (owner-only).  After
 48      ``os.replace`` swaps the temp file into place the target inherits
 49      those restrictive permissions, breaking Docker / NAS volume mounts
 50      that rely on broader permissions set by the user.  Calling this
 51      right after ``os.replace`` restores the original permissions.
 52      """
 53      if mode is None:
 54          return
 55      try:
 56          os.chmod(path, mode)
 57      except OSError:
 58          pass
 59  
 60  
 61  def atomic_replace(tmp_path: Union[str, Path], target: Union[str, Path]) -> str:
 62      """Atomically move *tmp_path* onto *target*, preserving symlinks.
 63  
 64      ``os.replace(tmp, target)`` atomically swaps ``tmp`` into place at
 65      ``target``.  When ``target`` is a symlink, the symlink itself is
 66      replaced with a regular file — silently detaching managed deployments
 67      that symlink ``config.yaml`` / ``SOUL.md`` / ``auth.json`` etc. from
 68      ``~/.hermes/`` to a git-tracked profile package or dotfiles repo
 69      (GitHub #16743).
 70  
 71      This helper resolves the symlink first so ``os.replace`` writes to
 72      the real file in-place while the symlink survives.  For non-symlink
 73      and non-existent paths the behavior is identical to a plain
 74      ``os.replace`` call.
 75  
 76      Returns the resolved real path used for the replace, so callers that
 77      need to re-apply permissions can target it instead of the symlink.
 78      """
 79      target_str = str(target)
 80      real_path = os.path.realpath(target_str) if os.path.islink(target_str) else target_str
 81      os.replace(str(tmp_path), real_path)
 82      return real_path
 83  
 84  
 85  def atomic_json_write(
 86      path: Union[str, Path],
 87      data: Any,
 88      *,
 89      indent: int = 2,
 90      **dump_kwargs: Any,
 91  ) -> None:
 92      """Write JSON data to a file atomically.
 93  
 94      Uses temp file + fsync + os.replace to ensure the target file is never
 95      left in a partially-written state. If the process crashes mid-write,
 96      the previous version of the file remains intact.
 97  
 98      Args:
 99          path: Target file path (will be created or overwritten).
100          data: JSON-serializable data to write.
101          indent: JSON indentation (default 2).
102          **dump_kwargs: Additional keyword args forwarded to json.dump(), such
103              as default=str for non-native types.
104      """
105      path = Path(path)
106      path.parent.mkdir(parents=True, exist_ok=True)
107  
108      original_mode = _preserve_file_mode(path)
109  
110      fd, tmp_path = tempfile.mkstemp(
111          dir=str(path.parent),
112          prefix=f".{path.stem}_",
113          suffix=".tmp",
114      )
115      try:
116          with os.fdopen(fd, "w", encoding="utf-8") as f:
117              json.dump(
118                  data,
119                  f,
120                  indent=indent,
121                  ensure_ascii=False,
122                  **dump_kwargs,
123              )
124              f.flush()
125              os.fsync(f.fileno())
126          # Preserve symlinks — swap in-place on the real file (GitHub #16743).
127          real_path = atomic_replace(tmp_path, path)
128          _restore_file_mode(real_path, original_mode)
129      except BaseException:
130          # Intentionally catch BaseException so temp-file cleanup still runs for
131          # KeyboardInterrupt/SystemExit before re-raising the original signal.
132          try:
133              os.unlink(tmp_path)
134          except OSError:
135              pass
136          raise
137  
138  
139  def atomic_yaml_write(
140      path: Union[str, Path],
141      data: Any,
142      *,
143      default_flow_style: bool = False,
144      sort_keys: bool = False,
145      extra_content: str | None = None,
146  ) -> None:
147      """Write YAML data to a file atomically.
148  
149      Uses temp file + fsync + os.replace to ensure the target file is never
150      left in a partially-written state.  If the process crashes mid-write,
151      the previous version of the file remains intact.
152  
153      Args:
154          path: Target file path (will be created or overwritten).
155          data: YAML-serializable data to write.
156          default_flow_style: YAML flow style (default False).
157          sort_keys: Whether to sort dict keys (default False).
158          extra_content: Optional string to append after the YAML dump
159              (e.g. commented-out sections for user reference).
160      """
161      path = Path(path)
162      path.parent.mkdir(parents=True, exist_ok=True)
163  
164      original_mode = _preserve_file_mode(path)
165  
166      fd, tmp_path = tempfile.mkstemp(
167          dir=str(path.parent),
168          prefix=f".{path.stem}_",
169          suffix=".tmp",
170      )
171      try:
172          with os.fdopen(fd, "w", encoding="utf-8") as f:
173              yaml.dump(data, f, default_flow_style=default_flow_style, sort_keys=sort_keys)
174              if extra_content:
175                  f.write(extra_content)
176              f.flush()
177              os.fsync(f.fileno())
178          # Preserve symlinks — swap in-place on the real file (GitHub #16743).
179          real_path = atomic_replace(tmp_path, path)
180          _restore_file_mode(real_path, original_mode)
181      except BaseException:
182          # Match atomic_json_write: cleanup must also happen for process-level
183          # interruptions before we re-raise them.
184          try:
185              os.unlink(tmp_path)
186          except OSError:
187              pass
188          raise
189  
190  
191  # ─── JSON Helpers ─────────────────────────────────────────────────────────────
192  
193  
194  def safe_json_loads(text: str, default: Any = None) -> Any:
195      """Parse JSON, returning *default* on any parse error.
196  
197      Replaces the ``try: json.loads(x) except (JSONDecodeError, TypeError)``
198      pattern duplicated across display.py, anthropic_adapter.py,
199      auxiliary_client.py, and others.
200      """
201      try:
202          return json.loads(text)
203      except (json.JSONDecodeError, TypeError, ValueError):
204          return default
205  
206  
207  # ─── Environment Variable Helpers ─────────────────────────────────────────────
208  
209  
210  def env_int(key: str, default: int = 0) -> int:
211      """Read an environment variable as an integer, with fallback."""
212      raw = os.getenv(key, "").strip()
213      if not raw:
214          return default
215      try:
216          return int(raw)
217      except (ValueError, TypeError):
218          return default
219  
220  
221  def env_bool(key: str, default: bool = False) -> bool:
222      """Read an environment variable as a boolean."""
223      return is_truthy_value(os.getenv(key, ""), default=default)
224  
225  
226  # ─── Proxy Helpers ────────────────────────────────────────────────────────────
227  
228  
229  _PROXY_ENV_KEYS = (
230      "HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY",
231      "https_proxy", "http_proxy", "all_proxy",
232  )
233  
234  
235  def normalize_proxy_url(proxy_url: str | None) -> str | None:
236      """Normalize proxy URLs for httpx/aiohttp compatibility.
237  
238      WSL/Clash-style environments often export SOCKS proxies as
239      ``socks://127.0.0.1:PORT``. httpx rejects that alias and expects the
240      explicit ``socks5://`` scheme instead.
241      """
242      candidate = str(proxy_url or "").strip()
243      if not candidate:
244          return None
245      if candidate.lower().startswith("socks://"):
246          return f"socks5://{candidate[len('socks://'):]}"
247      return candidate
248  
249  
250  def normalize_proxy_env_vars() -> None:
251      """Rewrite supported proxy env vars to canonical URL forms in-place."""
252      for key in _PROXY_ENV_KEYS:
253          value = os.getenv(key, "")
254          normalized = normalize_proxy_url(value)
255          if normalized and normalized != value:
256              os.environ[key] = normalized
257  
258  
259  # ─── URL Parsing Helpers ──────────────────────────────────────────────────────
260  
261  
262  def base_url_hostname(base_url: str) -> str:
263      """Return the lowercased hostname for a base URL, or ``""`` if absent.
264  
265      Use exact-hostname comparisons against known provider hosts
266      (``api.openai.com``, ``api.x.ai``, ``api.anthropic.com``) instead of
267      substring matches on the raw URL. Substring checks treat attacker- or
268      proxy-controlled paths/hosts like ``https://api.openai.com.example/v1``
269      or ``https://proxy.test/api.openai.com/v1`` as native endpoints, which
270      leads to wrong api_mode / auth routing.
271      """
272      raw = (base_url or "").strip()
273      if not raw:
274          return ""
275      parsed = urlparse(raw if "://" in raw else f"//{raw}")
276      return (parsed.hostname or "").lower().rstrip(".")
277  
278  
279  def base_url_host_matches(base_url: str, domain: str) -> bool:
280      """Return True when the base URL's hostname is ``domain`` or a subdomain.
281  
282      Safer counterpart to ``domain in base_url``, which is the substring
283      false-positive class documented on ``base_url_hostname``. Accepts bare
284      hosts, full URLs, and URLs with paths.
285  
286          base_url_host_matches("https://api.moonshot.ai/v1", "moonshot.ai") == True
287          base_url_host_matches("https://moonshot.ai", "moonshot.ai")        == True
288          base_url_host_matches("https://evil.com/moonshot.ai/v1", "moonshot.ai") == False
289          base_url_host_matches("https://moonshot.ai.evil/v1", "moonshot.ai")     == False
290      """
291      hostname = base_url_hostname(base_url)
292      if not hostname:
293          return False
294      domain = (domain or "").strip().lower().rstrip(".")
295      if not domain:
296          return False
297      return hostname == domain or hostname.endswith("." + domain)