skill_utils.py
1 """Lightweight skill metadata utilities shared by prompt_builder and skills_tool. 2 3 This module intentionally avoids importing the tool registry, CLI config, or any 4 heavy dependency chain. It is safe to import at module level without triggering 5 tool registration or provider resolution. 6 """ 7 8 import logging 9 import os 10 import re 11 import sys 12 from pathlib import Path 13 from typing import Any, Dict, List, Optional, Set, Tuple 14 15 from hermes_constants import get_config_path, get_skills_dir 16 17 logger = logging.getLogger(__name__) 18 19 # ── Platform mapping ────────────────────────────────────────────────────── 20 21 PLATFORM_MAP = { 22 "macos": "darwin", 23 "linux": "linux", 24 "windows": "win32", 25 } 26 27 EXCLUDED_SKILL_DIRS = frozenset((".git", ".github", ".hub", ".archive")) 28 29 # ── Lazy YAML loader ───────────────────────────────────────────────────── 30 31 _yaml_load_fn = None 32 33 34 def yaml_load(content: str): 35 """Parse YAML with lazy import and CSafeLoader preference.""" 36 global _yaml_load_fn 37 if _yaml_load_fn is None: 38 import yaml 39 40 loader = getattr(yaml, "CSafeLoader", None) or yaml.SafeLoader 41 42 def _load(value: str): 43 return yaml.load(value, Loader=loader) 44 45 _yaml_load_fn = _load 46 return _yaml_load_fn(content) 47 48 49 # ── Frontmatter parsing ────────────────────────────────────────────────── 50 51 52 def parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]: 53 """Parse YAML frontmatter from a markdown string. 54 55 Uses yaml with CSafeLoader for full YAML support (nested metadata, lists) 56 with a fallback to simple key:value splitting for robustness. 57 58 Returns: 59 (frontmatter_dict, remaining_body) 60 """ 61 frontmatter: Dict[str, Any] = {} 62 body = content 63 64 if not content.startswith("---"): 65 return frontmatter, body 66 67 end_match = re.search(r"\n---\s*\n", content[3:]) 68 if not end_match: 69 return frontmatter, body 70 71 yaml_content = content[3 : end_match.start() + 3] 72 body = content[end_match.end() + 3 :] 73 74 try: 75 parsed = yaml_load(yaml_content) 76 if isinstance(parsed, dict): 77 frontmatter = parsed 78 except Exception: 79 # Fallback: simple key:value parsing for malformed YAML 80 for line in yaml_content.strip().split("\n"): 81 if ":" not in line: 82 continue 83 key, value = line.split(":", 1) 84 frontmatter[key.strip()] = value.strip() 85 86 return frontmatter, body 87 88 89 # ── Platform matching ───────────────────────────────────────────────────── 90 91 92 def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool: 93 """Return True when the skill is compatible with the current OS. 94 95 Skills declare platform requirements via a top-level ``platforms`` list 96 in their YAML frontmatter:: 97 98 platforms: [macos] # macOS only 99 platforms: [macos, linux] # macOS and Linux 100 101 If the field is absent or empty the skill is compatible with **all** 102 platforms (backward-compatible default). 103 """ 104 platforms = frontmatter.get("platforms") 105 if not platforms: 106 return True 107 if not isinstance(platforms, list): 108 platforms = [platforms] 109 current = sys.platform 110 for platform in platforms: 111 normalized = str(platform).lower().strip() 112 mapped = PLATFORM_MAP.get(normalized, normalized) 113 if current.startswith(mapped): 114 return True 115 return False 116 117 118 # ── Disabled skills ─────────────────────────────────────────────────────── 119 120 121 def get_disabled_skill_names(platform: str | None = None) -> Set[str]: 122 """Read disabled skill names from config.yaml. 123 124 Args: 125 platform: Explicit platform name (e.g. ``"telegram"``). When 126 *None*, resolves from ``HERMES_PLATFORM`` or 127 ``HERMES_SESSION_PLATFORM`` env vars. Falls back to the 128 global disabled list when no platform is determined. 129 130 Reads the config file directly (no CLI config imports) to stay 131 lightweight. 132 """ 133 config_path = get_config_path() 134 if not config_path.exists(): 135 return set() 136 try: 137 parsed = yaml_load(config_path.read_text(encoding="utf-8")) 138 except Exception as e: 139 logger.debug("Could not read skill config %s: %s", config_path, e) 140 return set() 141 if not isinstance(parsed, dict): 142 return set() 143 144 skills_cfg = parsed.get("skills") 145 if not isinstance(skills_cfg, dict): 146 return set() 147 148 from gateway.session_context import get_session_env 149 resolved_platform = ( 150 platform 151 or os.getenv("HERMES_PLATFORM") 152 or get_session_env("HERMES_SESSION_PLATFORM") 153 ) 154 if resolved_platform: 155 platform_disabled = (skills_cfg.get("platform_disabled") or {}).get( 156 resolved_platform 157 ) 158 if platform_disabled is not None: 159 return _normalize_string_set(platform_disabled) 160 return _normalize_string_set(skills_cfg.get("disabled")) 161 162 163 def _normalize_string_set(values) -> Set[str]: 164 if values is None: 165 return set() 166 if isinstance(values, str): 167 values = [values] 168 return {str(v).strip() for v in values if str(v).strip()} 169 170 171 # ── External skills directories ────────────────────────────────────────── 172 173 174 def get_external_skills_dirs() -> List[Path]: 175 """Read ``skills.external_dirs`` from config.yaml and return validated paths. 176 177 Each entry is expanded (``~`` and ``${VAR}``) and resolved to an absolute 178 path. Only directories that actually exist are returned. Duplicates and 179 paths that resolve to the local ``~/.hermes/skills/`` are silently skipped. 180 """ 181 config_path = get_config_path() 182 if not config_path.exists(): 183 return [] 184 try: 185 parsed = yaml_load(config_path.read_text(encoding="utf-8")) 186 except Exception: 187 return [] 188 if not isinstance(parsed, dict): 189 return [] 190 191 skills_cfg = parsed.get("skills") 192 if not isinstance(skills_cfg, dict): 193 return [] 194 195 raw_dirs = skills_cfg.get("external_dirs") 196 if not raw_dirs: 197 return [] 198 if isinstance(raw_dirs, str): 199 raw_dirs = [raw_dirs] 200 if not isinstance(raw_dirs, list): 201 return [] 202 203 from hermes_constants import get_hermes_home 204 205 hermes_home = get_hermes_home() 206 local_skills = get_skills_dir().resolve() 207 seen: Set[Path] = set() 208 result: List[Path] = [] 209 210 for entry in raw_dirs: 211 entry = str(entry).strip() 212 if not entry: 213 continue 214 # Expand ~ and environment variables 215 expanded = os.path.expanduser(os.path.expandvars(entry)) 216 p = Path(expanded) 217 # Resolve relative paths against HERMES_HOME, not cwd 218 if not p.is_absolute(): 219 p = (hermes_home / p).resolve() 220 else: 221 p = p.resolve() 222 if p == local_skills: 223 continue 224 if p in seen: 225 continue 226 if p.is_dir(): 227 seen.add(p) 228 result.append(p) 229 else: 230 logger.debug("External skills dir does not exist, skipping: %s", p) 231 232 return result 233 234 235 def get_all_skills_dirs() -> List[Path]: 236 """Return all skill directories: local ``~/.hermes/skills/`` first, then external. 237 238 The local dir is always first (and always included even if it doesn't exist 239 yet — callers handle that). External dirs follow in config order. 240 """ 241 dirs = [get_skills_dir()] 242 dirs.extend(get_external_skills_dirs()) 243 return dirs 244 245 246 # ── Condition extraction ────────────────────────────────────────────────── 247 248 249 def extract_skill_conditions(frontmatter: Dict[str, Any]) -> Dict[str, List]: 250 """Extract conditional activation fields from parsed frontmatter.""" 251 metadata = frontmatter.get("metadata") 252 # Handle cases where metadata is not a dict (e.g., a string from malformed YAML) 253 if not isinstance(metadata, dict): 254 metadata = {} 255 hermes = metadata.get("hermes") or {} 256 if not isinstance(hermes, dict): 257 hermes = {} 258 return { 259 "fallback_for_toolsets": hermes.get("fallback_for_toolsets", []), 260 "requires_toolsets": hermes.get("requires_toolsets", []), 261 "fallback_for_tools": hermes.get("fallback_for_tools", []), 262 "requires_tools": hermes.get("requires_tools", []), 263 } 264 265 266 # ── Skill config extraction ─────────────────────────────────────────────── 267 268 269 def extract_skill_config_vars(frontmatter: Dict[str, Any]) -> List[Dict[str, Any]]: 270 """Extract config variable declarations from parsed frontmatter. 271 272 Skills declare config.yaml settings they need via:: 273 274 metadata: 275 hermes: 276 config: 277 - key: wiki.path 278 description: Path to the LLM Wiki knowledge base directory 279 default: "~/wiki" 280 prompt: Wiki directory path 281 282 Returns a list of dicts with keys: ``key``, ``description``, ``default``, 283 ``prompt``. Invalid or incomplete entries are silently skipped. 284 """ 285 metadata = frontmatter.get("metadata") 286 if not isinstance(metadata, dict): 287 return [] 288 hermes = metadata.get("hermes") 289 if not isinstance(hermes, dict): 290 return [] 291 raw = hermes.get("config") 292 if not raw: 293 return [] 294 if isinstance(raw, dict): 295 raw = [raw] 296 if not isinstance(raw, list): 297 return [] 298 299 result: List[Dict[str, Any]] = [] 300 seen: set = set() 301 for item in raw: 302 if not isinstance(item, dict): 303 continue 304 key = str(item.get("key", "")).strip() 305 if not key or key in seen: 306 continue 307 # Must have at least key and description 308 desc = str(item.get("description", "")).strip() 309 if not desc: 310 continue 311 entry: Dict[str, Any] = { 312 "key": key, 313 "description": desc, 314 } 315 default = item.get("default") 316 if default is not None: 317 entry["default"] = default 318 prompt_text = item.get("prompt") 319 if isinstance(prompt_text, str) and prompt_text.strip(): 320 entry["prompt"] = prompt_text.strip() 321 else: 322 entry["prompt"] = desc 323 seen.add(key) 324 result.append(entry) 325 return result 326 327 328 def discover_all_skill_config_vars() -> List[Dict[str, Any]]: 329 """Scan all enabled skills and collect their config variable declarations. 330 331 Walks every skills directory, parses each SKILL.md frontmatter, and returns 332 a deduplicated list of config var dicts. Each dict also includes a 333 ``skill`` key with the skill name for attribution. 334 335 Disabled and platform-incompatible skills are excluded. 336 """ 337 all_vars: List[Dict[str, Any]] = [] 338 seen_keys: set = set() 339 340 disabled = get_disabled_skill_names() 341 for skills_dir in get_all_skills_dirs(): 342 if not skills_dir.is_dir(): 343 continue 344 for skill_file in iter_skill_index_files(skills_dir, "SKILL.md"): 345 try: 346 raw = skill_file.read_text(encoding="utf-8") 347 frontmatter, _ = parse_frontmatter(raw) 348 except Exception: 349 continue 350 351 skill_name = frontmatter.get("name") or skill_file.parent.name 352 if str(skill_name) in disabled: 353 continue 354 if not skill_matches_platform(frontmatter): 355 continue 356 357 config_vars = extract_skill_config_vars(frontmatter) 358 for var in config_vars: 359 if var["key"] not in seen_keys: 360 var["skill"] = str(skill_name) 361 all_vars.append(var) 362 seen_keys.add(var["key"]) 363 364 return all_vars 365 366 367 # Storage prefix: all skill config vars are stored under skills.config.* 368 # in config.yaml. Skill authors declare logical keys (e.g. "wiki.path"); 369 # the system adds this prefix for storage and strips it for display. 370 SKILL_CONFIG_PREFIX = "skills.config" 371 372 373 def _resolve_dotpath(config: Dict[str, Any], dotted_key: str): 374 """Walk a nested dict following a dotted key. Returns None if any part is missing.""" 375 parts = dotted_key.split(".") 376 current = config 377 for part in parts: 378 if isinstance(current, dict) and part in current: 379 current = current[part] 380 else: 381 return None 382 return current 383 384 385 def resolve_skill_config_values( 386 config_vars: List[Dict[str, Any]], 387 ) -> Dict[str, Any]: 388 """Resolve current values for skill config vars from config.yaml. 389 390 Skill config is stored under ``skills.config.<key>`` in config.yaml. 391 Returns a dict mapping **logical** keys (as declared by skills) to their 392 current values (or the declared default if the key isn't set). 393 Path values are expanded via ``os.path.expanduser``. 394 """ 395 config_path = get_config_path() 396 config: Dict[str, Any] = {} 397 if config_path.exists(): 398 try: 399 parsed = yaml_load(config_path.read_text(encoding="utf-8")) 400 if isinstance(parsed, dict): 401 config = parsed 402 except Exception: 403 pass 404 405 resolved: Dict[str, Any] = {} 406 for var in config_vars: 407 logical_key = var["key"] 408 storage_key = f"{SKILL_CONFIG_PREFIX}.{logical_key}" 409 value = _resolve_dotpath(config, storage_key) 410 411 if value is None or (isinstance(value, str) and not value.strip()): 412 value = var.get("default", "") 413 414 # Expand ~ in path-like values 415 if isinstance(value, str) and ("~" in value or "${" in value): 416 value = os.path.expanduser(os.path.expandvars(value)) 417 418 resolved[logical_key] = value 419 420 return resolved 421 422 423 # ── Description extraction ──────────────────────────────────────────────── 424 425 426 def extract_skill_description(frontmatter: Dict[str, Any]) -> str: 427 """Extract a truncated description from parsed frontmatter.""" 428 raw_desc = frontmatter.get("description", "") 429 if not raw_desc: 430 return "" 431 desc = str(raw_desc).strip().strip("'\"") 432 if len(desc) > 60: 433 return desc[:57] + "..." 434 return desc 435 436 437 # ── File iteration ──────────────────────────────────────────────────────── 438 439 440 def iter_skill_index_files(skills_dir: Path, filename: str): 441 """Walk skills_dir yielding sorted paths matching *filename*. 442 443 Excludes ``.git``, ``.github``, ``.hub``, ``.archive`` directories. 444 """ 445 matches = [] 446 for root, dirs, files in os.walk(skills_dir, followlinks=True): 447 dirs[:] = [d for d in dirs if d not in EXCLUDED_SKILL_DIRS] 448 if filename in files: 449 matches.append(Path(root) / filename) 450 for path in sorted(matches, key=lambda p: str(p.relative_to(skills_dir))): 451 yield path 452 453 454 # ── Namespace helpers for plugin-provided skills ─────────────────────────── 455 456 _NAMESPACE_RE = re.compile(r"^[a-zA-Z0-9_-]+$") 457 458 459 def parse_qualified_name(name: str) -> Tuple[Optional[str], str]: 460 """Split ``'namespace:skill-name'`` into ``(namespace, bare_name)``. 461 462 Returns ``(None, name)`` when there is no ``':'``. 463 """ 464 if ":" not in name: 465 return None, name 466 return tuple(name.split(":", 1)) # type: ignore[return-value] 467 468 469 def is_valid_namespace(candidate: Optional[str]) -> bool: 470 """Check whether *candidate* is a valid namespace (``[a-zA-Z0-9_-]+``).""" 471 if not candidate: 472 return False 473 return bool(_NAMESPACE_RE.match(candidate))