/ tools / skill_usage.py
skill_usage.py
  1  """Skill usage telemetry + provenance tracking for the Curator feature.
  2  
  3  Tracks per-skill usage metadata in a sidecar JSON file (~/.hermes/skills/.usage.json)
  4  keyed by skill name. Counters are bumped by the existing skill tools (skill_view,
  5  skill_manage); the curator orchestrator reads the derived activity timestamp to
  6  decide lifecycle transitions.
  7  
  8  Design notes:
  9    - Sidecar, not frontmatter. Keeps operational telemetry out of user-authored
 10      SKILL.md content and avoids conflict pressure for bundled/hub skills.
 11    - Atomic writes via tempfile + os.replace (same pattern as .bundled_manifest).
 12    - All counter bumps are best-effort: failures log at DEBUG and return silently.
 13      A broken sidecar never breaks the underlying tool call.
 14    - Provenance filter: curator-managed skills are explicitly marked when
 15      created through skill_manage. Bundled / hub-installed skills stay
 16      off-limits, and manually authored skills are not inferred from location.
 17  
 18  Lifecycle states:
 19      active    -> default
 20      stale     -> unused > stale_after_days (config)
 21      archived  -> unused > archive_after_days (config); moved to .archive/
 22      pinned    -> opt-out from auto transitions (boolean flag, orthogonal to state)
 23  """
 24  
 25  from __future__ import annotations
 26  
 27  import json
 28  import logging
 29  import os
 30  import tempfile
 31  from datetime import datetime, timezone
 32  from pathlib import Path
 33  from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
 34  
 35  from hermes_constants import get_hermes_home
 36  
 37  logger = logging.getLogger(__name__)
 38  
 39  
 40  STATE_ACTIVE = "active"
 41  STATE_STALE = "stale"
 42  STATE_ARCHIVED = "archived"
 43  _VALID_STATES = {STATE_ACTIVE, STATE_STALE, STATE_ARCHIVED}
 44  
 45  
 46  def _skills_dir() -> Path:
 47      return get_hermes_home() / "skills"
 48  
 49  
 50  def _usage_file() -> Path:
 51      return _skills_dir() / ".usage.json"
 52  
 53  
 54  def _archive_dir() -> Path:
 55      return _skills_dir() / ".archive"
 56  
 57  
 58  def _now_iso() -> str:
 59      return datetime.now(timezone.utc).isoformat()
 60  
 61  
 62  def _parse_iso_timestamp(value: Any) -> Optional[datetime]:
 63      """Parse an ISO timestamp defensively for activity comparisons."""
 64      if not value:
 65          return None
 66      try:
 67          parsed = datetime.fromisoformat(str(value))
 68      except (TypeError, ValueError):
 69          return None
 70      if parsed.tzinfo is None:
 71          parsed = parsed.replace(tzinfo=timezone.utc)
 72      return parsed
 73  
 74  
 75  def latest_activity_at(record: Dict[str, Any]) -> Optional[str]:
 76      """Return the newest actual activity timestamp for a usage record.
 77  
 78      "Activity" means a skill was used, viewed, or patched. Creation time is
 79      intentionally excluded so callers can still distinguish never-active skills;
 80      lifecycle code can fall back to ``created_at`` as its own anchor.
 81      """
 82      latest_dt: Optional[datetime] = None
 83      latest_raw: Optional[str] = None
 84      for key in ("last_used_at", "last_viewed_at", "last_patched_at"):
 85          raw = record.get(key)
 86          dt = _parse_iso_timestamp(raw)
 87          if dt is None:
 88              continue
 89          if latest_dt is None or dt > latest_dt:
 90              latest_dt = dt
 91              latest_raw = str(raw)
 92      return latest_raw
 93  
 94  
 95  def activity_count(record: Dict[str, Any]) -> int:
 96      """Return the total observed activity count across use/view/patch events."""
 97      total = 0
 98      for key in ("use_count", "view_count", "patch_count"):
 99          try:
100              total += int(record.get(key) or 0)
101          except (TypeError, ValueError):
102              continue
103      return total
104  
105  
106  # ---------------------------------------------------------------------------
107  # Provenance — which skills are agent-created (and thus eligible for curation)
108  # ---------------------------------------------------------------------------
109  
110  def _read_bundled_manifest_names() -> Set[str]:
111      """Return the set of skill names that were seeded from the bundled repo.
112  
113      Reads ~/.hermes/skills/.bundled_manifest (format: "name:hash" per line).
114      Returns empty set if the file is missing or unreadable.
115      """
116      manifest = _skills_dir() / ".bundled_manifest"
117      if not manifest.exists():
118          return set()
119      names: Set[str] = set()
120      try:
121          for line in manifest.read_text(encoding="utf-8").splitlines():
122              line = line.strip()
123              if not line:
124                  continue
125              name = line.split(":", 1)[0].strip()
126              if name:
127                  names.add(name)
128      except OSError as e:
129          logger.debug("Failed to read bundled manifest: %s", e)
130      return names
131  
132  
133  def _read_hub_installed_names() -> Set[str]:
134      """Return the set of skill names installed via the Skills Hub.
135  
136      Reads ~/.hermes/skills/.hub/lock.json (see tools/skills_hub.py :: HubLockFile).
137      """
138      lock_path = _skills_dir() / ".hub" / "lock.json"
139      if not lock_path.exists():
140          return set()
141      try:
142          data = json.loads(lock_path.read_text(encoding="utf-8"))
143          if isinstance(data, dict):
144              installed = data.get("installed") or {}
145              if isinstance(installed, dict):
146                  return {str(k) for k in installed.keys()}
147      except (OSError, json.JSONDecodeError) as e:
148          logger.debug("Failed to read hub lock file: %s", e)
149      return set()
150  
151  
152  def list_agent_created_skill_names() -> List[str]:
153      """Enumerate skills explicitly authored by the agent.
154  
155      The curator operates exclusively on this set. Skills are only eligible
156      after ``skill_manage(action="create")`` marks them in ``.usage.json``;
157      manually authored skills must not be inferred from filesystem location.
158      Bundled / hub skills are maintained by their upstream sources and must
159      never be pruned here.
160      """
161      base = _skills_dir()
162      if not base.exists():
163          return []
164      bundled = _read_bundled_manifest_names()
165      hub = _read_hub_installed_names()
166      off_limits = bundled | hub
167      usage = load_usage()
168  
169      names: List[str] = []
170      # Top-level SKILL.md files (flat layout) AND nested category/skill/SKILL.md
171      for skill_md in base.rglob("SKILL.md"):
172          # Skip anything under .archive or .hub
173          try:
174              rel = skill_md.relative_to(base)
175          except ValueError:
176              continue
177          parts = rel.parts
178          if parts and (parts[0].startswith(".") or parts[0] == "node_modules"):
179              continue
180          name = _read_skill_name(skill_md, fallback=skill_md.parent.name)
181          if name in off_limits:
182              continue
183          if not _is_curator_managed_record(usage.get(name)):
184              continue
185          names.append(name)
186      return sorted(set(names))
187  
188  
189  def _read_skill_name(skill_md: Path, fallback: str) -> str:
190      """Parse the `name:` field from a SKILL.md YAML frontmatter."""
191      try:
192          text = skill_md.read_text(encoding="utf-8", errors="replace")[:4000]
193      except OSError:
194          return fallback
195      in_frontmatter = False
196      for line in text.split("\n"):
197          stripped = line.strip()
198          if stripped == "---":
199              if in_frontmatter:
200                  break
201              in_frontmatter = True
202              continue
203          if in_frontmatter and stripped.startswith("name:"):
204              value = stripped.split(":", 1)[1].strip().strip("\"'")
205              if value:
206                  return value
207      return fallback
208  
209  
210  def is_agent_created(skill_name: str) -> bool:
211      """Whether *skill_name* is neither bundled nor hub-installed."""
212      off_limits = _read_bundled_manifest_names() | _read_hub_installed_names()
213      return skill_name not in off_limits
214  
215  
216  def _is_curator_managed_record(record: Any) -> bool:
217      """Return True when a usage record opts a skill into curator management."""
218      if not isinstance(record, dict):
219          return False
220      return record.get("created_by") == "agent" or record.get("agent_created") is True
221  
222  
223  # ---------------------------------------------------------------------------
224  # Sidecar I/O
225  # ---------------------------------------------------------------------------
226  
227  def _empty_record() -> Dict[str, Any]:
228      return {
229          "created_by": None,
230          "use_count": 0,
231          "view_count": 0,
232          "last_used_at": None,
233          "last_viewed_at": None,
234          "patch_count": 0,
235          "last_patched_at": None,
236          "created_at": _now_iso(),
237          "state": STATE_ACTIVE,
238          "pinned": False,
239          "archived_at": None,
240      }
241  
242  
243  def load_usage() -> Dict[str, Dict[str, Any]]:
244      """Read the entire .usage.json map. Returns empty dict on missing/corrupt."""
245      path = _usage_file()
246      if not path.exists():
247          return {}
248      try:
249          data = json.loads(path.read_text(encoding="utf-8"))
250      except (OSError, json.JSONDecodeError) as e:
251          logger.debug("Failed to read %s: %s", path, e)
252          return {}
253      if not isinstance(data, dict):
254          return {}
255      # Defensive: coerce any non-dict values to a fresh empty record
256      clean: Dict[str, Dict[str, Any]] = {}
257      for k, v in data.items():
258          if isinstance(v, dict):
259              clean[str(k)] = v
260      return clean
261  
262  
263  def save_usage(data: Dict[str, Dict[str, Any]]) -> None:
264      """Write the usage map atomically. Best-effort — errors are logged, not raised."""
265      path = _usage_file()
266      try:
267          path.parent.mkdir(parents=True, exist_ok=True)
268          fd, tmp_path = tempfile.mkstemp(
269              dir=str(path.parent), prefix=".usage_", suffix=".tmp"
270          )
271          try:
272              with os.fdopen(fd, "w", encoding="utf-8") as f:
273                  json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
274                  f.flush()
275                  os.fsync(f.fileno())
276              os.replace(tmp_path, path)
277          except BaseException:
278              try:
279                  os.unlink(tmp_path)
280              except OSError:
281                  pass
282              raise
283      except Exception as e:
284          logger.debug("Failed to write %s: %s", path, e, exc_info=True)
285  
286  
287  def get_record(skill_name: str) -> Dict[str, Any]:
288      """Return the record for *skill_name*, creating a fresh one if missing."""
289      data = load_usage()
290      rec = data.get(skill_name)
291      if not isinstance(rec, dict):
292          return _empty_record()
293      # Backfill any missing keys so callers don't need to handle old files
294      base = _empty_record()
295      for k, v in base.items():
296          rec.setdefault(k, v)
297      return rec
298  
299  
300  def _mutate(skill_name: str, mutator) -> None:
301      """Load, apply *mutator(record)* in place, save. Best-effort.
302  
303      Bundled and hub-installed skills are NEVER recorded in the sidecar.
304      Local manual skills may still accrue usage telemetry, but they only
305      become curator-managed when ``created_by`` is explicitly marked.
306      """
307      if not skill_name:
308          return
309      try:
310          if not is_agent_created(skill_name):
311              return
312          data = load_usage()
313          rec = data.get(skill_name)
314          if not isinstance(rec, dict):
315              rec = _empty_record()
316          mutator(rec)
317          data[skill_name] = rec
318          save_usage(data)
319      except Exception as e:
320          logger.debug("skill_usage._mutate(%s) failed: %s", skill_name, e, exc_info=True)
321  
322  
323  # ---------------------------------------------------------------------------
324  # Public counter-bump helpers
325  # ---------------------------------------------------------------------------
326  
327  def bump_view(skill_name: str) -> None:
328      """Bump view_count and last_viewed_at. Called from skill_view()."""
329      def _apply(rec: Dict[str, Any]) -> None:
330          rec["view_count"] = int(rec.get("view_count") or 0) + 1
331          rec["last_viewed_at"] = _now_iso()
332      _mutate(skill_name, _apply)
333  
334  
335  def bump_use(skill_name: str) -> None:
336      """Bump use_count and last_used_at. Called when a skill is actively used
337      (e.g. loaded into the prompt path or referenced from an assistant turn)."""
338      def _apply(rec: Dict[str, Any]) -> None:
339          rec["use_count"] = int(rec.get("use_count") or 0) + 1
340          rec["last_used_at"] = _now_iso()
341      _mutate(skill_name, _apply)
342  
343  
344  def bump_patch(skill_name: str) -> None:
345      """Bump patch_count and last_patched_at. Called from skill_manage (patch/edit)."""
346      def _apply(rec: Dict[str, Any]) -> None:
347          rec["patch_count"] = int(rec.get("patch_count") or 0) + 1
348          rec["last_patched_at"] = _now_iso()
349      _mutate(skill_name, _apply)
350  
351  
352  def mark_agent_created(skill_name: str) -> None:
353      """Opt a skill created by skill_manage into curator management.
354  
355      Viewing or invoking a manually authored skill may still create telemetry,
356      but only this explicit marker makes it eligible for automatic curation.
357      """
358      def _apply(rec: Dict[str, Any]) -> None:
359          rec["created_by"] = "agent"
360      _mutate(skill_name, _apply)
361  
362  
363  def set_state(skill_name: str, state: str) -> None:
364      """Set lifecycle state. No-op if *state* is invalid."""
365      if state not in _VALID_STATES:
366          logger.debug("set_state: invalid state %r for %s", state, skill_name)
367          return
368      def _apply(rec: Dict[str, Any]) -> None:
369          rec["state"] = state
370          if state == STATE_ARCHIVED:
371              rec["archived_at"] = _now_iso()
372          elif state == STATE_ACTIVE:
373              rec["archived_at"] = None
374      _mutate(skill_name, _apply)
375  
376  
377  def set_pinned(skill_name: str, pinned: bool) -> None:
378      def _apply(rec: Dict[str, Any]) -> None:
379          rec["pinned"] = bool(pinned)
380      _mutate(skill_name, _apply)
381  
382  
383  def forget(skill_name: str) -> None:
384      """Drop a skill's usage entry entirely. Called when the skill is deleted."""
385      if not skill_name:
386          return
387      try:
388          data = load_usage()
389          if skill_name in data:
390              del data[skill_name]
391              save_usage(data)
392      except Exception as e:
393          logger.debug("skill_usage.forget(%s) failed: %s", skill_name, e, exc_info=True)
394  
395  
396  # ---------------------------------------------------------------------------
397  # Archive / restore
398  # ---------------------------------------------------------------------------
399  
400  def archive_skill(skill_name: str) -> Tuple[bool, str]:
401      """Move an agent-created skill directory to ~/.hermes/skills/.archive/.
402  
403      Returns (ok, message). Never archives bundled or hub skills — callers are
404      responsible for checking provenance, but we double-check here as a safety net.
405      """
406      if not is_agent_created(skill_name):
407          return False, f"skill '{skill_name}' is bundled or hub-installed; never archive"
408  
409      skill_dir = _find_skill_dir(skill_name)
410      if skill_dir is None:
411          return False, f"skill '{skill_name}' not found"
412  
413      archive_root = _archive_dir()
414      try:
415          archive_root.mkdir(parents=True, exist_ok=True)
416      except OSError as e:
417          return False, f"failed to create archive dir: {e}"
418  
419      # Flatten any category nesting into a single ".archive/<skill>/" so restores
420      # are simple. If a collision exists, append a timestamp.
421      dest = archive_root / skill_dir.name
422      if dest.exists():
423          dest = archive_root / f"{skill_dir.name}-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}"
424  
425      try:
426          skill_dir.rename(dest)
427      except OSError as e:
428          # Cross-device — fall back to shutil.move
429          import shutil
430          try:
431              shutil.move(str(skill_dir), str(dest))
432          except Exception as e2:
433              return False, f"failed to archive: {e2}"
434  
435      set_state(skill_name, STATE_ARCHIVED)
436      return True, f"archived to {dest}"
437  
438  
439  def restore_skill(skill_name: str) -> Tuple[bool, str]:
440      """Move an archived skill back to ~/.hermes/skills/. Restores to the flat
441      top-level layout; original category nesting is NOT reconstructed.
442  
443      Refuses to restore under a name that now collides with a bundled or
444      hub-installed skill — that would shadow the upstream version.
445      """
446      # If a bundled or hub skill has since been installed under the same
447      # name, refuse to restore rather than shadow it.
448      if not is_agent_created(skill_name):
449          return False, (
450              f"skill '{skill_name}' is now bundled or hub-installed; "
451              "restore would shadow the upstream version"
452          )
453      archive_root = _archive_dir()
454      if not archive_root.exists():
455          return False, "no archive directory"
456  
457      # Try exact name match first, then any prefix match (for timestamped dupes).
458      # Recursive walk handles nested archive layouts (e.g. .archive/<category>/<skill>/)
459      # left behind by older archive paths or external imports.
460      candidates = [p for p in archive_root.rglob("*") if p.is_dir() and p.name == skill_name]
461      if not candidates:
462          candidates = sorted(
463              [p for p in archive_root.rglob("*")
464               if p.is_dir() and p.name.startswith(f"{skill_name}-")],
465              reverse=True,
466          )
467      if not candidates:
468          return False, f"skill '{skill_name}' not found in archive"
469  
470      src = candidates[0]
471      dest = _skills_dir() / skill_name
472      if dest.exists():
473          return False, f"destination already exists: {dest}"
474  
475      try:
476          src.rename(dest)
477      except OSError:
478          import shutil
479          try:
480              shutil.move(str(src), str(dest))
481          except Exception as e:
482              return False, f"failed to restore: {e}"
483  
484      set_state(skill_name, STATE_ACTIVE)
485      return True, f"restored to {dest}"
486  
487  
488  def _find_skill_dir(skill_name: str) -> Optional[Path]:
489      """Locate the directory for a skill by its frontmatter `name:` field.
490  
491      Handles both flat (~/.hermes/skills/<skill>/SKILL.md) and category-nested
492      (~/.hermes/skills/<category>/<skill>/SKILL.md) layouts.
493      """
494      base = _skills_dir()
495      if not base.exists():
496          return None
497      for skill_md in base.rglob("SKILL.md"):
498          try:
499              rel = skill_md.relative_to(base)
500          except ValueError:
501              continue
502          if rel.parts and rel.parts[0].startswith("."):
503              continue
504          if _read_skill_name(skill_md, fallback=skill_md.parent.name) == skill_name:
505              return skill_md.parent
506      return None
507  
508  
509  # ---------------------------------------------------------------------------
510  # Reporting — for the curator CLI / slash command
511  # ---------------------------------------------------------------------------
512  
513  def agent_created_report() -> List[Dict[str, Any]]:
514      """Return a list of {name, state, pinned, last_activity_at, ...}
515      records for every agent-created skill. Missing usage records are backfilled
516      with defaults so callers can always index fields."""
517      data = load_usage()
518      rows: List[Dict[str, Any]] = []
519      for name in list_agent_created_skill_names():
520          rec = data.get(name)
521          if not isinstance(rec, dict):
522              rec = _empty_record()
523          base = _empty_record()
524          for k, v in base.items():
525              rec.setdefault(k, v)
526          row = {"name": name, **rec}
527          row["last_activity_at"] = latest_activity_at(row)
528          row["activity_count"] = activity_count(row)
529          rows.append(row)
530      return rows