/ agent / skill_utils.py
skill_utils.py
  1  """Lightweight skill metadata utilities shared by prompt_builder and skills_tool.
  2  
  3  This module intentionally avoids importing the tool registry, CLI config, or any
  4  heavy dependency chain.  It is safe to import at module level without triggering
  5  tool registration or provider resolution.
  6  """
  7  
  8  import logging
  9  import os
 10  import re
 11  import sys
 12  from pathlib import Path
 13  from typing import Any, Dict, List, Optional, Set, Tuple
 14  
 15  from hermes_constants import get_config_path, get_skills_dir
 16  
 17  logger = logging.getLogger(__name__)
 18  
 19  # ── Platform mapping ──────────────────────────────────────────────────────
 20  
 21  PLATFORM_MAP = {
 22      "macos": "darwin",
 23      "linux": "linux",
 24      "windows": "win32",
 25  }
 26  
 27  EXCLUDED_SKILL_DIRS = frozenset((".git", ".github", ".hub", ".archive"))
 28  
 29  # ── Lazy YAML loader ─────────────────────────────────────────────────────
 30  
 31  _yaml_load_fn = None
 32  
 33  
 34  def yaml_load(content: str):
 35      """Parse YAML with lazy import and CSafeLoader preference."""
 36      global _yaml_load_fn
 37      if _yaml_load_fn is None:
 38          import yaml
 39  
 40          loader = getattr(yaml, "CSafeLoader", None) or yaml.SafeLoader
 41  
 42          def _load(value: str):
 43              return yaml.load(value, Loader=loader)
 44  
 45          _yaml_load_fn = _load
 46      return _yaml_load_fn(content)
 47  
 48  
 49  # ── Frontmatter parsing ──────────────────────────────────────────────────
 50  
 51  
 52  def parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]:
 53      """Parse YAML frontmatter from a markdown string.
 54  
 55      Uses yaml with CSafeLoader for full YAML support (nested metadata, lists)
 56      with a fallback to simple key:value splitting for robustness.
 57  
 58      Returns:
 59          (frontmatter_dict, remaining_body)
 60      """
 61      frontmatter: Dict[str, Any] = {}
 62      body = content
 63  
 64      if not content.startswith("---"):
 65          return frontmatter, body
 66  
 67      end_match = re.search(r"\n---\s*\n", content[3:])
 68      if not end_match:
 69          return frontmatter, body
 70  
 71      yaml_content = content[3 : end_match.start() + 3]
 72      body = content[end_match.end() + 3 :]
 73  
 74      try:
 75          parsed = yaml_load(yaml_content)
 76          if isinstance(parsed, dict):
 77              frontmatter = parsed
 78      except Exception:
 79          # Fallback: simple key:value parsing for malformed YAML
 80          for line in yaml_content.strip().split("\n"):
 81              if ":" not in line:
 82                  continue
 83              key, value = line.split(":", 1)
 84              frontmatter[key.strip()] = value.strip()
 85  
 86      return frontmatter, body
 87  
 88  
 89  # ── Platform matching ─────────────────────────────────────────────────────
 90  
 91  
 92  def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool:
 93      """Return True when the skill is compatible with the current OS.
 94  
 95      Skills declare platform requirements via a top-level ``platforms`` list
 96      in their YAML frontmatter::
 97  
 98          platforms: [macos]          # macOS only
 99          platforms: [macos, linux]   # macOS and Linux
100  
101      If the field is absent or empty the skill is compatible with **all**
102      platforms (backward-compatible default).
103      """
104      platforms = frontmatter.get("platforms")
105      if not platforms:
106          return True
107      if not isinstance(platforms, list):
108          platforms = [platforms]
109      current = sys.platform
110      for platform in platforms:
111          normalized = str(platform).lower().strip()
112          mapped = PLATFORM_MAP.get(normalized, normalized)
113          if current.startswith(mapped):
114              return True
115      return False
116  
117  
118  # ── Disabled skills ───────────────────────────────────────────────────────
119  
120  
121  def get_disabled_skill_names(platform: str | None = None) -> Set[str]:
122      """Read disabled skill names from config.yaml.
123  
124      Args:
125          platform: Explicit platform name (e.g. ``"telegram"``).  When
126              *None*, resolves from ``HERMES_PLATFORM`` or
127              ``HERMES_SESSION_PLATFORM`` env vars.  Falls back to the
128              global disabled list when no platform is determined.
129  
130      Reads the config file directly (no CLI config imports) to stay
131      lightweight.
132      """
133      config_path = get_config_path()
134      if not config_path.exists():
135          return set()
136      try:
137          parsed = yaml_load(config_path.read_text(encoding="utf-8"))
138      except Exception as e:
139          logger.debug("Could not read skill config %s: %s", config_path, e)
140          return set()
141      if not isinstance(parsed, dict):
142          return set()
143  
144      skills_cfg = parsed.get("skills")
145      if not isinstance(skills_cfg, dict):
146          return set()
147  
148      from gateway.session_context import get_session_env
149      resolved_platform = (
150          platform
151          or os.getenv("HERMES_PLATFORM")
152          or get_session_env("HERMES_SESSION_PLATFORM")
153      )
154      if resolved_platform:
155          platform_disabled = (skills_cfg.get("platform_disabled") or {}).get(
156              resolved_platform
157          )
158          if platform_disabled is not None:
159              return _normalize_string_set(platform_disabled)
160      return _normalize_string_set(skills_cfg.get("disabled"))
161  
162  
163  def _normalize_string_set(values) -> Set[str]:
164      if values is None:
165          return set()
166      if isinstance(values, str):
167          values = [values]
168      return {str(v).strip() for v in values if str(v).strip()}
169  
170  
171  # ── External skills directories ──────────────────────────────────────────
172  
173  
174  def get_external_skills_dirs() -> List[Path]:
175      """Read ``skills.external_dirs`` from config.yaml and return validated paths.
176  
177      Each entry is expanded (``~`` and ``${VAR}``) and resolved to an absolute
178      path.  Only directories that actually exist are returned.  Duplicates and
179      paths that resolve to the local ``~/.hermes/skills/`` are silently skipped.
180      """
181      config_path = get_config_path()
182      if not config_path.exists():
183          return []
184      try:
185          parsed = yaml_load(config_path.read_text(encoding="utf-8"))
186      except Exception:
187          return []
188      if not isinstance(parsed, dict):
189          return []
190  
191      skills_cfg = parsed.get("skills")
192      if not isinstance(skills_cfg, dict):
193          return []
194  
195      raw_dirs = skills_cfg.get("external_dirs")
196      if not raw_dirs:
197          return []
198      if isinstance(raw_dirs, str):
199          raw_dirs = [raw_dirs]
200      if not isinstance(raw_dirs, list):
201          return []
202  
203      from hermes_constants import get_hermes_home
204  
205      hermes_home = get_hermes_home()
206      local_skills = get_skills_dir().resolve()
207      seen: Set[Path] = set()
208      result: List[Path] = []
209  
210      for entry in raw_dirs:
211          entry = str(entry).strip()
212          if not entry:
213              continue
214          # Expand ~ and environment variables
215          expanded = os.path.expanduser(os.path.expandvars(entry))
216          p = Path(expanded)
217          # Resolve relative paths against HERMES_HOME, not cwd
218          if not p.is_absolute():
219              p = (hermes_home / p).resolve()
220          else:
221              p = p.resolve()
222          if p == local_skills:
223              continue
224          if p in seen:
225              continue
226          if p.is_dir():
227              seen.add(p)
228              result.append(p)
229          else:
230              logger.debug("External skills dir does not exist, skipping: %s", p)
231  
232      return result
233  
234  
235  def get_all_skills_dirs() -> List[Path]:
236      """Return all skill directories: local ``~/.hermes/skills/`` first, then external.
237  
238      The local dir is always first (and always included even if it doesn't exist
239      yet — callers handle that).  External dirs follow in config order.
240      """
241      dirs = [get_skills_dir()]
242      dirs.extend(get_external_skills_dirs())
243      return dirs
244  
245  
246  # ── Condition extraction ──────────────────────────────────────────────────
247  
248  
249  def extract_skill_conditions(frontmatter: Dict[str, Any]) -> Dict[str, List]:
250      """Extract conditional activation fields from parsed frontmatter."""
251      metadata = frontmatter.get("metadata")
252      # Handle cases where metadata is not a dict (e.g., a string from malformed YAML)
253      if not isinstance(metadata, dict):
254          metadata = {}
255      hermes = metadata.get("hermes") or {}
256      if not isinstance(hermes, dict):
257          hermes = {}
258      return {
259          "fallback_for_toolsets": hermes.get("fallback_for_toolsets", []),
260          "requires_toolsets": hermes.get("requires_toolsets", []),
261          "fallback_for_tools": hermes.get("fallback_for_tools", []),
262          "requires_tools": hermes.get("requires_tools", []),
263      }
264  
265  
266  # ── Skill config extraction ───────────────────────────────────────────────
267  
268  
269  def extract_skill_config_vars(frontmatter: Dict[str, Any]) -> List[Dict[str, Any]]:
270      """Extract config variable declarations from parsed frontmatter.
271  
272      Skills declare config.yaml settings they need via::
273  
274          metadata:
275            hermes:
276              config:
277                - key: wiki.path
278                  description: Path to the LLM Wiki knowledge base directory
279                  default: "~/wiki"
280                  prompt: Wiki directory path
281  
282      Returns a list of dicts with keys: ``key``, ``description``, ``default``,
283      ``prompt``.  Invalid or incomplete entries are silently skipped.
284      """
285      metadata = frontmatter.get("metadata")
286      if not isinstance(metadata, dict):
287          return []
288      hermes = metadata.get("hermes")
289      if not isinstance(hermes, dict):
290          return []
291      raw = hermes.get("config")
292      if not raw:
293          return []
294      if isinstance(raw, dict):
295          raw = [raw]
296      if not isinstance(raw, list):
297          return []
298  
299      result: List[Dict[str, Any]] = []
300      seen: set = set()
301      for item in raw:
302          if not isinstance(item, dict):
303              continue
304          key = str(item.get("key", "")).strip()
305          if not key or key in seen:
306              continue
307          # Must have at least key and description
308          desc = str(item.get("description", "")).strip()
309          if not desc:
310              continue
311          entry: Dict[str, Any] = {
312              "key": key,
313              "description": desc,
314          }
315          default = item.get("default")
316          if default is not None:
317              entry["default"] = default
318          prompt_text = item.get("prompt")
319          if isinstance(prompt_text, str) and prompt_text.strip():
320              entry["prompt"] = prompt_text.strip()
321          else:
322              entry["prompt"] = desc
323          seen.add(key)
324          result.append(entry)
325      return result
326  
327  
328  def discover_all_skill_config_vars() -> List[Dict[str, Any]]:
329      """Scan all enabled skills and collect their config variable declarations.
330  
331      Walks every skills directory, parses each SKILL.md frontmatter, and returns
332      a deduplicated list of config var dicts.  Each dict also includes a
333      ``skill`` key with the skill name for attribution.
334  
335      Disabled and platform-incompatible skills are excluded.
336      """
337      all_vars: List[Dict[str, Any]] = []
338      seen_keys: set = set()
339  
340      disabled = get_disabled_skill_names()
341      for skills_dir in get_all_skills_dirs():
342          if not skills_dir.is_dir():
343              continue
344          for skill_file in iter_skill_index_files(skills_dir, "SKILL.md"):
345              try:
346                  raw = skill_file.read_text(encoding="utf-8")
347                  frontmatter, _ = parse_frontmatter(raw)
348              except Exception:
349                  continue
350  
351              skill_name = frontmatter.get("name") or skill_file.parent.name
352              if str(skill_name) in disabled:
353                  continue
354              if not skill_matches_platform(frontmatter):
355                  continue
356  
357              config_vars = extract_skill_config_vars(frontmatter)
358              for var in config_vars:
359                  if var["key"] not in seen_keys:
360                      var["skill"] = str(skill_name)
361                      all_vars.append(var)
362                      seen_keys.add(var["key"])
363  
364      return all_vars
365  
366  
367  # Storage prefix: all skill config vars are stored under skills.config.*
368  # in config.yaml.  Skill authors declare logical keys (e.g. "wiki.path");
369  # the system adds this prefix for storage and strips it for display.
370  SKILL_CONFIG_PREFIX = "skills.config"
371  
372  
373  def _resolve_dotpath(config: Dict[str, Any], dotted_key: str):
374      """Walk a nested dict following a dotted key.  Returns None if any part is missing."""
375      parts = dotted_key.split(".")
376      current = config
377      for part in parts:
378          if isinstance(current, dict) and part in current:
379              current = current[part]
380          else:
381              return None
382      return current
383  
384  
385  def resolve_skill_config_values(
386      config_vars: List[Dict[str, Any]],
387  ) -> Dict[str, Any]:
388      """Resolve current values for skill config vars from config.yaml.
389  
390      Skill config is stored under ``skills.config.<key>`` in config.yaml.
391      Returns a dict mapping **logical** keys (as declared by skills) to their
392      current values (or the declared default if the key isn't set).
393      Path values are expanded via ``os.path.expanduser``.
394      """
395      config_path = get_config_path()
396      config: Dict[str, Any] = {}
397      if config_path.exists():
398          try:
399              parsed = yaml_load(config_path.read_text(encoding="utf-8"))
400              if isinstance(parsed, dict):
401                  config = parsed
402          except Exception:
403              pass
404  
405      resolved: Dict[str, Any] = {}
406      for var in config_vars:
407          logical_key = var["key"]
408          storage_key = f"{SKILL_CONFIG_PREFIX}.{logical_key}"
409          value = _resolve_dotpath(config, storage_key)
410  
411          if value is None or (isinstance(value, str) and not value.strip()):
412              value = var.get("default", "")
413  
414          # Expand ~ in path-like values
415          if isinstance(value, str) and ("~" in value or "${" in value):
416              value = os.path.expanduser(os.path.expandvars(value))
417  
418          resolved[logical_key] = value
419  
420      return resolved
421  
422  
423  # ── Description extraction ────────────────────────────────────────────────
424  
425  
426  def extract_skill_description(frontmatter: Dict[str, Any]) -> str:
427      """Extract a truncated description from parsed frontmatter."""
428      raw_desc = frontmatter.get("description", "")
429      if not raw_desc:
430          return ""
431      desc = str(raw_desc).strip().strip("'\"")
432      if len(desc) > 60:
433          return desc[:57] + "..."
434      return desc
435  
436  
437  # ── File iteration ────────────────────────────────────────────────────────
438  
439  
440  def iter_skill_index_files(skills_dir: Path, filename: str):
441      """Walk skills_dir yielding sorted paths matching *filename*.
442  
443      Excludes ``.git``, ``.github``, ``.hub``, ``.archive`` directories.
444      """
445      matches = []
446      for root, dirs, files in os.walk(skills_dir, followlinks=True):
447          dirs[:] = [d for d in dirs if d not in EXCLUDED_SKILL_DIRS]
448          if filename in files:
449              matches.append(Path(root) / filename)
450      for path in sorted(matches, key=lambda p: str(p.relative_to(skills_dir))):
451          yield path
452  
453  
454  # ── Namespace helpers for plugin-provided skills ───────────────────────────
455  
456  _NAMESPACE_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
457  
458  
459  def parse_qualified_name(name: str) -> Tuple[Optional[str], str]:
460      """Split ``'namespace:skill-name'`` into ``(namespace, bare_name)``.
461  
462      Returns ``(None, name)`` when there is no ``':'``.
463      """
464      if ":" not in name:
465          return None, name
466      return tuple(name.split(":", 1))  # type: ignore[return-value]
467  
468  
469  def is_valid_namespace(candidate: Optional[str]) -> bool:
470      """Check whether *candidate* is a valid namespace (``[a-zA-Z0-9_-]+``)."""
471      if not candidate:
472          return False
473      return bool(_NAMESPACE_RE.match(candidate))