/ utils.py
utils.py
1 """Shared utility functions for hermes-agent.""" 2 3 import json 4 import logging 5 import os 6 import stat 7 import tempfile 8 from pathlib import Path 9 from typing import Any, Union 10 from urllib.parse import urlparse 11 12 import yaml 13 14 logger = logging.getLogger(__name__) 15 16 17 TRUTHY_STRINGS = frozenset({"1", "true", "yes", "on"}) 18 19 20 def is_truthy_value(value: Any, default: bool = False) -> bool: 21 """Coerce bool-ish values using the project's shared truthy string set.""" 22 if value is None: 23 return default 24 if isinstance(value, bool): 25 return value 26 if isinstance(value, str): 27 return value.strip().lower() in TRUTHY_STRINGS 28 return bool(value) 29 30 31 def env_var_enabled(name: str, default: str = "") -> bool: 32 """Return True when an environment variable is set to a truthy value.""" 33 return is_truthy_value(os.getenv(name, default), default=False) 34 35 36 def _preserve_file_mode(path: Path) -> "int | None": 37 """Capture the permission bits of *path* if it exists, else ``None``.""" 38 try: 39 return stat.S_IMODE(path.stat().st_mode) if path.exists() else None 40 except OSError: 41 return None 42 43 44 def _restore_file_mode(path: Path, mode: "int | None") -> None: 45 """Re-apply *mode* to *path* after an atomic replace. 46 47 ``tempfile.mkstemp`` creates files with 0o600 (owner-only). After 48 ``os.replace`` swaps the temp file into place the target inherits 49 those restrictive permissions, breaking Docker / NAS volume mounts 50 that rely on broader permissions set by the user. Calling this 51 right after ``os.replace`` restores the original permissions. 52 """ 53 if mode is None: 54 return 55 try: 56 os.chmod(path, mode) 57 except OSError: 58 pass 59 60 61 def atomic_replace(tmp_path: Union[str, Path], target: Union[str, Path]) -> str: 62 """Atomically move *tmp_path* onto *target*, preserving symlinks. 63 64 ``os.replace(tmp, target)`` atomically swaps ``tmp`` into place at 65 ``target``. When ``target`` is a symlink, the symlink itself is 66 replaced with a regular file — silently detaching managed deployments 67 that symlink ``config.yaml`` / ``SOUL.md`` / ``auth.json`` etc. from 68 ``~/.hermes/`` to a git-tracked profile package or dotfiles repo 69 (GitHub #16743). 70 71 This helper resolves the symlink first so ``os.replace`` writes to 72 the real file in-place while the symlink survives. For non-symlink 73 and non-existent paths the behavior is identical to a plain 74 ``os.replace`` call. 75 76 Returns the resolved real path used for the replace, so callers that 77 need to re-apply permissions can target it instead of the symlink. 78 """ 79 target_str = str(target) 80 real_path = os.path.realpath(target_str) if os.path.islink(target_str) else target_str 81 os.replace(str(tmp_path), real_path) 82 return real_path 83 84 85 def atomic_json_write( 86 path: Union[str, Path], 87 data: Any, 88 *, 89 indent: int = 2, 90 **dump_kwargs: Any, 91 ) -> None: 92 """Write JSON data to a file atomically. 93 94 Uses temp file + fsync + os.replace to ensure the target file is never 95 left in a partially-written state. If the process crashes mid-write, 96 the previous version of the file remains intact. 97 98 Args: 99 path: Target file path (will be created or overwritten). 100 data: JSON-serializable data to write. 101 indent: JSON indentation (default 2). 102 **dump_kwargs: Additional keyword args forwarded to json.dump(), such 103 as default=str for non-native types. 104 """ 105 path = Path(path) 106 path.parent.mkdir(parents=True, exist_ok=True) 107 108 original_mode = _preserve_file_mode(path) 109 110 fd, tmp_path = tempfile.mkstemp( 111 dir=str(path.parent), 112 prefix=f".{path.stem}_", 113 suffix=".tmp", 114 ) 115 try: 116 with os.fdopen(fd, "w", encoding="utf-8") as f: 117 json.dump( 118 data, 119 f, 120 indent=indent, 121 ensure_ascii=False, 122 **dump_kwargs, 123 ) 124 f.flush() 125 os.fsync(f.fileno()) 126 # Preserve symlinks — swap in-place on the real file (GitHub #16743). 127 real_path = atomic_replace(tmp_path, path) 128 _restore_file_mode(real_path, original_mode) 129 except BaseException: 130 # Intentionally catch BaseException so temp-file cleanup still runs for 131 # KeyboardInterrupt/SystemExit before re-raising the original signal. 132 try: 133 os.unlink(tmp_path) 134 except OSError: 135 pass 136 raise 137 138 139 def atomic_yaml_write( 140 path: Union[str, Path], 141 data: Any, 142 *, 143 default_flow_style: bool = False, 144 sort_keys: bool = False, 145 extra_content: str | None = None, 146 ) -> None: 147 """Write YAML data to a file atomically. 148 149 Uses temp file + fsync + os.replace to ensure the target file is never 150 left in a partially-written state. If the process crashes mid-write, 151 the previous version of the file remains intact. 152 153 Args: 154 path: Target file path (will be created or overwritten). 155 data: YAML-serializable data to write. 156 default_flow_style: YAML flow style (default False). 157 sort_keys: Whether to sort dict keys (default False). 158 extra_content: Optional string to append after the YAML dump 159 (e.g. commented-out sections for user reference). 160 """ 161 path = Path(path) 162 path.parent.mkdir(parents=True, exist_ok=True) 163 164 original_mode = _preserve_file_mode(path) 165 166 fd, tmp_path = tempfile.mkstemp( 167 dir=str(path.parent), 168 prefix=f".{path.stem}_", 169 suffix=".tmp", 170 ) 171 try: 172 with os.fdopen(fd, "w", encoding="utf-8") as f: 173 yaml.dump(data, f, default_flow_style=default_flow_style, sort_keys=sort_keys) 174 if extra_content: 175 f.write(extra_content) 176 f.flush() 177 os.fsync(f.fileno()) 178 # Preserve symlinks — swap in-place on the real file (GitHub #16743). 179 real_path = atomic_replace(tmp_path, path) 180 _restore_file_mode(real_path, original_mode) 181 except BaseException: 182 # Match atomic_json_write: cleanup must also happen for process-level 183 # interruptions before we re-raise them. 184 try: 185 os.unlink(tmp_path) 186 except OSError: 187 pass 188 raise 189 190 191 # ─── JSON Helpers ───────────────────────────────────────────────────────────── 192 193 194 def safe_json_loads(text: str, default: Any = None) -> Any: 195 """Parse JSON, returning *default* on any parse error. 196 197 Replaces the ``try: json.loads(x) except (JSONDecodeError, TypeError)`` 198 pattern duplicated across display.py, anthropic_adapter.py, 199 auxiliary_client.py, and others. 200 """ 201 try: 202 return json.loads(text) 203 except (json.JSONDecodeError, TypeError, ValueError): 204 return default 205 206 207 # ─── Environment Variable Helpers ───────────────────────────────────────────── 208 209 210 def env_int(key: str, default: int = 0) -> int: 211 """Read an environment variable as an integer, with fallback.""" 212 raw = os.getenv(key, "").strip() 213 if not raw: 214 return default 215 try: 216 return int(raw) 217 except (ValueError, TypeError): 218 return default 219 220 221 def env_bool(key: str, default: bool = False) -> bool: 222 """Read an environment variable as a boolean.""" 223 return is_truthy_value(os.getenv(key, ""), default=default) 224 225 226 # ─── Proxy Helpers ──────────────────────────────────────────────────────────── 227 228 229 _PROXY_ENV_KEYS = ( 230 "HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY", 231 "https_proxy", "http_proxy", "all_proxy", 232 ) 233 234 235 def normalize_proxy_url(proxy_url: str | None) -> str | None: 236 """Normalize proxy URLs for httpx/aiohttp compatibility. 237 238 WSL/Clash-style environments often export SOCKS proxies as 239 ``socks://127.0.0.1:PORT``. httpx rejects that alias and expects the 240 explicit ``socks5://`` scheme instead. 241 """ 242 candidate = str(proxy_url or "").strip() 243 if not candidate: 244 return None 245 if candidate.lower().startswith("socks://"): 246 return f"socks5://{candidate[len('socks://'):]}" 247 return candidate 248 249 250 def normalize_proxy_env_vars() -> None: 251 """Rewrite supported proxy env vars to canonical URL forms in-place.""" 252 for key in _PROXY_ENV_KEYS: 253 value = os.getenv(key, "") 254 normalized = normalize_proxy_url(value) 255 if normalized and normalized != value: 256 os.environ[key] = normalized 257 258 259 # ─── URL Parsing Helpers ────────────────────────────────────────────────────── 260 261 262 def base_url_hostname(base_url: str) -> str: 263 """Return the lowercased hostname for a base URL, or ``""`` if absent. 264 265 Use exact-hostname comparisons against known provider hosts 266 (``api.openai.com``, ``api.x.ai``, ``api.anthropic.com``) instead of 267 substring matches on the raw URL. Substring checks treat attacker- or 268 proxy-controlled paths/hosts like ``https://api.openai.com.example/v1`` 269 or ``https://proxy.test/api.openai.com/v1`` as native endpoints, which 270 leads to wrong api_mode / auth routing. 271 """ 272 raw = (base_url or "").strip() 273 if not raw: 274 return "" 275 parsed = urlparse(raw if "://" in raw else f"//{raw}") 276 return (parsed.hostname or "").lower().rstrip(".") 277 278 279 def base_url_host_matches(base_url: str, domain: str) -> bool: 280 """Return True when the base URL's hostname is ``domain`` or a subdomain. 281 282 Safer counterpart to ``domain in base_url``, which is the substring 283 false-positive class documented on ``base_url_hostname``. Accepts bare 284 hosts, full URLs, and URLs with paths. 285 286 base_url_host_matches("https://api.moonshot.ai/v1", "moonshot.ai") == True 287 base_url_host_matches("https://moonshot.ai", "moonshot.ai") == True 288 base_url_host_matches("https://evil.com/moonshot.ai/v1", "moonshot.ai") == False 289 base_url_host_matches("https://moonshot.ai.evil/v1", "moonshot.ai") == False 290 """ 291 hostname = base_url_hostname(base_url) 292 if not hostname: 293 return False 294 domain = (domain or "").strip().lower().rstrip(".") 295 if not domain: 296 return False 297 return hostname == domain or hostname.endswith("." + domain)