/ scripts / build_skills_index.py
build_skills_index.py
  1  #!/usr/bin/env python3
  2  """Build the Hermes Skills Index — a centralized JSON catalog of all skills.
  3  
  4  This script crawls every skill source (skills.sh, GitHub taps, official,
  5  clawhub, lobehub, claude-marketplace) and writes a JSON index with resolved
  6  GitHub paths. The index is served as a static file on the docs site so that
  7  `hermes skills search/install` can use it without hitting the GitHub API.
  8  
  9  Usage:
 10      # Local (uses gh CLI or GITHUB_TOKEN for auth)
 11      python scripts/build_skills_index.py
 12  
 13      # CI (set GITHUB_TOKEN as secret)
 14      GITHUB_TOKEN=ghp_... python scripts/build_skills_index.py
 15  
 16  Output: website/static/api/skills-index.json
 17  """
 18  
 19  import json
 20  import os
 21  import sys
 22  import time
 23  from collections import defaultdict
 24  from concurrent.futures import ThreadPoolExecutor, as_completed
 25  from datetime import datetime, timezone
 26  
 27  # Allow importing from repo root
 28  REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 29  sys.path.insert(0, REPO_ROOT)
 30  
 31  # Ensure HERMES_HOME is set (needed by tools/skills_hub.py imports)
 32  os.environ.setdefault("HERMES_HOME", os.path.join(os.path.expanduser("~"), ".hermes"))
 33  
 34  from tools.skills_hub import (
 35      GitHubAuth,
 36      GitHubSource,
 37      SkillsShSource,
 38      OptionalSkillSource,
 39      WellKnownSkillSource,
 40      ClawHubSource,
 41      ClaudeMarketplaceSource,
 42      LobeHubSource,
 43      SkillMeta,
 44  )
 45  import httpx
 46  
 47  OUTPUT_PATH = os.path.join(REPO_ROOT, "website", "static", "api", "skills-index.json")
 48  INDEX_VERSION = 1
 49  
 50  
 51  def _meta_to_dict(meta: SkillMeta) -> dict:
 52      """Convert a SkillMeta to a serializable dict."""
 53      return {
 54          "name": meta.name,
 55          "description": meta.description,
 56          "source": meta.source,
 57          "identifier": meta.identifier,
 58          "trust_level": meta.trust_level,
 59          "repo": meta.repo or "",
 60          "path": meta.path or "",
 61          "tags": meta.tags or [],
 62          "extra": meta.extra or {},
 63      }
 64  
 65  
 66  def crawl_source(source, source_name: str, limit: int) -> list:
 67      """Crawl a single source and return skill dicts."""
 68      print(f"  Crawling {source_name}...", flush=True)
 69      start = time.time()
 70      try:
 71          results = source.search("", limit=limit)
 72      except Exception as e:
 73          print(f"  Error crawling {source_name}: {e}", file=sys.stderr)
 74          return []
 75      skills = [_meta_to_dict(m) for m in results]
 76      elapsed = time.time() - start
 77      print(f"  {source_name}: {len(skills)} skills ({elapsed:.1f}s)", flush=True)
 78      return skills
 79  
 80  
 81  def crawl_skills_sh(source: SkillsShSource) -> list:
 82      """Crawl skills.sh using popular queries for broad coverage."""
 83      print("  Crawling skills.sh (popular queries)...", flush=True)
 84      start = time.time()
 85  
 86      queries = [
 87          "",  # featured
 88          "react", "python", "web", "api", "database", "docker",
 89          "testing", "scraping", "design", "typescript", "git",
 90          "aws", "security", "data", "ml", "ai", "devops",
 91          "frontend", "backend", "mobile", "cli", "documentation",
 92          "kubernetes", "terraform", "rust", "go", "java",
 93      ]
 94  
 95      all_skills: dict[str, dict] = {}
 96      for query in queries:
 97          try:
 98              results = source.search(query, limit=50)
 99              for meta in results:
100                  entry = _meta_to_dict(meta)
101                  if entry["identifier"] not in all_skills:
102                      all_skills[entry["identifier"]] = entry
103          except Exception as e:
104              print(f"    Warning: skills.sh search '{query}' failed: {e}",
105                    file=sys.stderr)
106  
107      elapsed = time.time() - start
108      print(f"  skills.sh: {len(all_skills)} unique skills ({elapsed:.1f}s)",
109            flush=True)
110      return list(all_skills.values())
111  
112  
113  def _fetch_repo_tree(repo: str, auth: GitHubAuth) -> list:
114      """Fetch the recursive tree for a repo. Returns list of tree entries."""
115      headers = auth.get_headers()
116      try:
117          resp = httpx.get(
118              f"https://api.github.com/repos/{repo}",
119              headers=headers, timeout=15, follow_redirects=True,
120          )
121          if resp.status_code != 200:
122              return []
123          branch = resp.json().get("default_branch", "main")
124  
125          resp = httpx.get(
126              f"https://api.github.com/repos/{repo}/git/trees/{branch}",
127              params={"recursive": "1"},
128              headers=headers, timeout=30, follow_redirects=True,
129          )
130          if resp.status_code != 200:
131              return []
132          data = resp.json()
133          if data.get("truncated"):
134              return []
135          return data.get("tree", [])
136      except Exception:
137          return []
138  
139  
140  def batch_resolve_paths(skills: list, auth: GitHubAuth) -> list:
141      """Resolve GitHub paths for skills.sh entries using batch tree lookups.
142  
143      Instead of resolving each skill individually (N×M API calls), we:
144      1. Group skills by repo
145      2. Fetch one tree per repo (2 API calls per repo)
146      3. Find all SKILL.md files in the tree
147      4. Match skills to their resolved paths
148      """
149      # Filter to skills.sh entries that need resolution
150      skills_sh = [s for s in skills if s["source"] in ("skills.sh", "skills-sh")]
151      if not skills_sh:
152          return skills
153  
154      print(f"  Resolving paths for {len(skills_sh)} skills.sh entries...",
155            flush=True)
156      start = time.time()
157  
158      # Group by repo
159      by_repo: dict[str, list] = defaultdict(list)
160      for s in skills_sh:
161          repo = s.get("repo", "")
162          if repo:
163              by_repo[repo].append(s)
164  
165      print(f"    {len(by_repo)} unique repos to scan", flush=True)
166  
167      resolved_count = 0
168  
169      # Fetch trees in parallel (up to 6 concurrent)
170      def _resolve_repo(repo: str, entries: list):
171          tree = _fetch_repo_tree(repo, auth)
172          if not tree:
173              return 0
174  
175          # Find all SKILL.md paths in this repo
176          skill_paths = {}  # skill_dir_name -> full_path
177          for item in tree:
178              if item.get("type") != "blob":
179                  continue
180              path = item.get("path", "")
181              if path.endswith("/SKILL.md"):
182                  skill_dir = path[: -len("/SKILL.md")]
183                  dir_name = skill_dir.split("/")[-1]
184                  skill_paths[dir_name.lower()] = f"{repo}/{skill_dir}"
185  
186                  # Also check SKILL.md frontmatter name if we can match by path
187                  # For now, just index by directory name
188              elif path == "SKILL.md":
189                  # Root-level SKILL.md
190                  skill_paths["_root_"] = f"{repo}"
191  
192          count = 0
193          for entry in entries:
194              # Try to match the skill's name/path to a tree entry
195              skill_name = entry.get("name", "").lower()
196              skill_path = entry.get("path", "").lower()
197              identifier = entry.get("identifier", "")
198  
199              # Extract the skill token from the identifier
200              # e.g. "skills-sh/d4vinci/scrapling/scrapling-official" -> "scrapling-official"
201              parts = identifier.replace("skills-sh/", "").replace("skills.sh/", "")
202              skill_token = parts.split("/")[-1].lower() if "/" in parts else ""
203  
204              # Try matching in order of likelihood
205              for candidate in [skill_token, skill_name, skill_path]:
206                  if not candidate:
207                      continue
208                  matched = skill_paths.get(candidate)
209                  if matched:
210                      entry["resolved_github_id"] = matched
211                      count += 1
212                      break
213              else:
214                  # Try fuzzy: skill_token with common transformations
215                  for tree_name, tree_path in skill_paths.items():
216                      if (skill_token and (
217                          tree_name.replace("-", "") == skill_token.replace("-", "")
218                          or skill_token in tree_name
219                          or tree_name in skill_token
220                      )):
221                          entry["resolved_github_id"] = tree_path
222                          count += 1
223                          break
224  
225          return count
226  
227      with ThreadPoolExecutor(max_workers=6) as pool:
228          futures = {
229              pool.submit(_resolve_repo, repo, entries): repo
230              for repo, entries in by_repo.items()
231          }
232          for future in as_completed(futures):
233              try:
234                  resolved_count += future.result()
235              except Exception as e:
236                  repo = futures[future]
237                  print(f"    Warning: {repo}: {e}", file=sys.stderr)
238  
239      elapsed = time.time() - start
240      print(f"  Resolved {resolved_count}/{len(skills_sh)} paths ({elapsed:.1f}s)",
241            flush=True)
242      return skills
243  
244  
245  def main():
246      print("Building Hermes Skills Index...", flush=True)
247      overall_start = time.time()
248  
249      auth = GitHubAuth()
250      print(f"GitHub auth: {auth.auth_method()}")
251      if auth.auth_method() == "anonymous":
252          print("WARNING: No GitHub authentication — rate limit is 60/hr. "
253                "Set GITHUB_TOKEN for better results.", file=sys.stderr)
254  
255      skills_sh_source = SkillsShSource(auth=auth)
256      sources = {
257          "official": OptionalSkillSource(),
258          "well-known": WellKnownSkillSource(),
259          "github": GitHubSource(auth=auth),
260          "clawhub": ClawHubSource(),
261          "claude-marketplace": ClaudeMarketplaceSource(auth=auth),
262          "lobehub": LobeHubSource(),
263      }
264  
265      all_skills: list[dict] = []
266  
267      # Crawl skills.sh
268      all_skills.extend(crawl_skills_sh(skills_sh_source))
269  
270      # Crawl other sources in parallel
271      with ThreadPoolExecutor(max_workers=4) as pool:
272          futures = {}
273          for name, source in sources.items():
274              futures[pool.submit(crawl_source, source, name, 500)] = name
275          for future in as_completed(futures):
276              try:
277                  all_skills.extend(future.result())
278              except Exception as e:
279                  print(f"  Error: {e}", file=sys.stderr)
280  
281      # Batch resolve GitHub paths for skills.sh entries
282      all_skills = batch_resolve_paths(all_skills, auth)
283  
284      # Deduplicate by identifier
285      seen: dict[str, dict] = {}
286      for skill in all_skills:
287          key = skill["identifier"]
288          if key not in seen:
289              seen[key] = skill
290      deduped = list(seen.values())
291  
292      # Sort
293      source_order = {"official": 0, "skills-sh": 1, "skills.sh": 1,
294                      "github": 2, "well-known": 3, "clawhub": 4,
295                      "claude-marketplace": 5, "lobehub": 6}
296      deduped.sort(key=lambda s: (source_order.get(s["source"], 99), s["name"]))
297  
298      # Build index
299      index = {
300          "version": INDEX_VERSION,
301          "generated_at": datetime.now(timezone.utc).isoformat(),
302          "skill_count": len(deduped),
303          "skills": deduped,
304      }
305  
306      os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
307      with open(OUTPUT_PATH, "w") as f:
308          json.dump(index, f, separators=(",", ":"), ensure_ascii=False)
309  
310      elapsed = time.time() - overall_start
311      file_size = os.path.getsize(OUTPUT_PATH)
312      print(f"\nDone! {len(deduped)} skills indexed in {elapsed:.0f}s")
313      print(f"Output: {OUTPUT_PATH} ({file_size / 1024:.0f} KB)")
314  
315      from collections import Counter
316      by_source = Counter(s["source"] for s in deduped)
317      for src, count in sorted(by_source.items(), key=lambda x: -x[1]):
318          resolved = sum(1 for s in deduped
319                         if s["source"] == src and s.get("resolved_github_id"))
320          extra = f" ({resolved} resolved)" if resolved else ""
321          print(f"  {src}: {count}{extra}")
322  
323  
324  if __name__ == "__main__":
325      main()