/ scripts / radicle_native_probe.py
radicle_native_probe.py
  1  #!/usr/bin/env python3
  2  from __future__ import annotations
  3  
  4  import argparse
  5  import json
  6  import re
  7  import sys
  8  from dataclasses import dataclass
  9  from pathlib import Path
 10  from typing import Dict, Iterable, List, Optional, Tuple
 11  
 12  
 13  PROFILE_RE = re.compile(r"^\s*##\s+Profile:\s*(?P<name>.+?)\s*$", re.MULTILINE)
 14  SCENARIO_RE = re.compile(r"^\s*Scenario(?:\s+Outline)?:\s*(?P<title>.+?)\s*$")
 15  AS_A_RE = re.compile(r"^\s*As\s+an?\s+(?P<role>.+?)\s*$", re.IGNORECASE)
 16  TAGLINE_RE = re.compile(r"^\s*@")
 17  
 18  PENDING_TAGS = {"pending", "wip", "skip"}
 19  PRIORITY_TAGS = {"p1", "p2", "p3", "p0"}  # legacy p0 is treated as p1 by radicle-priorities
 20  
 21  
 22  @dataclass(frozen=True)
 23  class Check:
 24      id: str
 25      title: str
 26      ok: bool
 27      details: str = ""
 28  
 29  
 30  def _read_text(path: Path) -> str:
 31      return path.read_text(encoding="utf-8", errors="replace")
 32  
 33  
 34  def _discover_profiles_doc(repo_root: Path) -> Optional[Path]:
 35      candidates = [
 36          repo_root / "docs" / "USER_PROFILES.md",
 37          repo_root / "bdd" / "USER_PROFILES.md",
 38          repo_root / "USER_PROFILES.md",
 39      ]
 40      for p in candidates:
 41          try:
 42              if p.exists() and p.is_file():
 43                  return p
 44          except Exception:
 45              continue
 46      return None
 47  
 48  
 49  def _discover_radicle_native_protocol_doc(repo_root: Path) -> Optional[Path]:
 50      candidates = [
 51          repo_root / "docs" / "RADICLE_NATIVE_PROTOCOL.json",
 52          repo_root / "RADICLE_NATIVE_PROTOCOL.json",
 53          repo_root / "bdd" / "RADICLE_NATIVE_PROTOCOL.json",
 54      ]
 55      for p in candidates:
 56          try:
 57              if p.exists() and p.is_file():
 58                  return p
 59          except Exception:
 60              continue
 61      return None
 62  
 63  
 64  def _get_nested(obj: object, path: Tuple[str, ...]) -> object:
 65      cur: object = obj
 66      for key in path:
 67          if not isinstance(cur, dict):
 68              return None
 69          cur = cur.get(key)
 70      return cur
 71  
 72  
 73  def _is_nonempty_str(v: object) -> bool:
 74      return isinstance(v, str) and bool(v.strip())
 75  
 76  
 77  def _load_profile_names(repo_root: Path) -> List[str]:
 78      p = _discover_profiles_doc(repo_root)
 79      if p is None:
 80          return []
 81      text = _read_text(p)
 82      out: List[str] = []
 83      for m in PROFILE_RE.finditer(text):
 84          name = str(m.group("name") or "").strip()
 85          if name:
 86              out.append(name)
 87      return out
 88  
 89  
 90  def _norm(s: str) -> str:
 91      return str(s or "").strip().lower()
 92  
 93  
 94  def _iter_feature_files(repo_root: Path) -> Iterable[Path]:
 95      base = repo_root / "bdd" / "features"
 96      if not base.exists() or not base.is_dir():
 97          return []
 98      return sorted(base.glob("*.feature"), key=lambda p: str(p).lower())
 99  
100  
101  def _parse_scenarios(path: Path) -> List[Tuple[int, str, List[str], Optional[str]]]:
102      """
103      Returns: [(line_no, scenario_title, tags, actor_role)]
104      - tags: raw @tags for the scenario (only the tagline immediately preceding Scenario)
105      - actor_role: first "As a/an ..." line after Scenario, if any
106      """
107      lines = _read_text(path).splitlines()
108      out: List[Tuple[int, str, List[str], Optional[str]]] = []
109  
110      next_tags: List[str] = []
111      current_title: Optional[str] = None
112      current_line_no: int = 0
113      current_tags: List[str] = []
114      actor_role: Optional[str] = None
115  
116      def finalize() -> None:
117          nonlocal current_title, current_line_no, current_tags, actor_role
118          if current_title is None:
119              return
120          out.append((current_line_no, current_title, list(current_tags), actor_role))
121          current_title = None
122          current_line_no = 0
123          current_tags = []
124          actor_role = None
125  
126      for idx, raw in enumerate(lines, start=1):
127          line = str(raw or "")
128  
129          if TAGLINE_RE.match(line.strip()):
130              for tok in line.strip().split():
131                  if tok.startswith("@"):
132                      next_tags.append(tok.strip())
133              continue
134  
135          m = SCENARIO_RE.match(line)
136          if m:
137              finalize()
138              current_title = str(m.group("title") or "").strip() or f"Scenario at line {idx}"
139              current_line_no = idx
140              current_tags = list(next_tags)
141              next_tags = []
142              actor_role = None
143              continue
144  
145          if current_title is None:
146              continue
147  
148          if actor_role is None:
149              m = AS_A_RE.match(line)
150              if m:
151                  actor_role = str(m.group("role") or "").strip() or None
152  
153      finalize()
154      return out
155  
156  
157  def _normalize_tags(raw_tags: List[str]) -> List[str]:
158      return [str(t or "").lstrip("@").strip().lower() for t in (raw_tags or []) if str(t or "").strip()]
159  
160  
161  def run_checks(repo_root: Path) -> List[Check]:
162      checks: List[Check] = []
163  
164      if not repo_root.exists() or not repo_root.is_dir():
165          return [Check(id="repo.exists", title="Repo path exists and is a directory", ok=False, details=str(repo_root))]
166  
167      # 1) Protocol declaration (recommended).
168      proto_path = repo_root / "bdd" / "RADICLE_PRIORITIES.json"
169      if proto_path.exists():
170          try:
171              obj = json.loads(_read_text(proto_path))
172              pv = str(obj.get("protocolVersion", "")).strip()
173              fr = str(obj.get("featuresRoot", "")).strip()
174              ok = bool(pv) and bool(fr)
175              details = "" if ok else f"protocolVersion={pv!r} featuresRoot={fr!r}"
176              checks.append(Check(id="rp.protocol", title="Has bdd/RADICLE_PRIORITIES.json (protocolVersion + featuresRoot)", ok=ok, details=details))
177          except Exception as e:
178              checks.append(Check(id="rp.protocol", title="Has valid bdd/RADICLE_PRIORITIES.json", ok=False, details=str(e)))
179      else:
180          checks.append(Check(id="rp.protocol", title="Has bdd/RADICLE_PRIORITIES.json (recommended)", ok=False, details="missing"))
181  
182      # 2) Canonical backlog exists.
183      feature_files = list(_iter_feature_files(repo_root))
184      checks.append(
185          Check(
186              id="bdd.features",
187              title="Has one or more .feature files under bdd/features/",
188              ok=bool(feature_files),
189              details=f"count={len(feature_files)}",
190          )
191      )
192  
193      # 3) Actor profiles exist + scenarios declare actor role.
194      profiles = _load_profile_names(repo_root)
195      profile_set = {_norm(p) for p in profiles if _norm(p)}
196      checks.append(
197          Check(
198              id="bdd.profiles_doc",
199              title="Has a USER_PROFILES.md (docs/USER_PROFILES.md preferred)",
200              ok=bool(profiles),
201              details=f"count={len(profiles)}",
202          )
203      )
204  
205      missing_actor: List[str] = []
206      unknown_actor: List[str] = []
207  
208      for ff in feature_files:
209          for line_no, title, raw_tags, actor_role in _parse_scenarios(ff):
210              scenario_loc = f"{ff.as_posix()}:{line_no}::{title}"
211              if not actor_role:
212                  missing_actor.append(scenario_loc)
213                  continue
214              if profile_set and _norm(actor_role) not in profile_set:
215                  unknown_actor.append(f"{scenario_loc} => {actor_role!r}")
216  
217      checks.append(
218          Check(
219              id="bdd.actor_lines",
220              title="Every scenario declares an actor line (As a <Profile>)",
221              ok=not missing_actor,
222              details="; ".join(missing_actor[:10]) + (f"; ... ({len(missing_actor) - 10} more)" if len(missing_actor) > 10 else ""),
223          )
224      )
225      checks.append(
226          Check(
227              id="bdd.actor_profiles_known",
228              title="Actor profiles used in scenarios exist in USER_PROFILES.md",
229              ok=not unknown_actor,
230              details="; ".join(unknown_actor[:10]) + (f"; ... ({len(unknown_actor) - 10} more)" if len(unknown_actor) > 10 else ""),
231          )
232      )
233  
234      # 4) Pending scenarios should be triaged with a priority tag (or be triage).
235      bad_priority: List[str] = []
236      for ff in feature_files:
237          for line_no, title, raw_tags, _actor_role in _parse_scenarios(ff):
238              tags = _normalize_tags(raw_tags)
239              is_pending = any(t in PENDING_TAGS for t in tags)
240              if not is_pending:
241                  continue
242              prios = [t for t in tags if t in PRIORITY_TAGS]
243              # triage is allowed: zero prios.
244              if len(prios) > 1:
245                  bad_priority.append(f"{ff.as_posix()}:{line_no}::{title} (multiple priority tags: {prios})")
246  
247      checks.append(
248          Check(
249              id="bdd.priority_tags",
250              title="Pending scenarios have <= 1 priority tag (@p1/@p2/@p3; optional triage if none)",
251              ok=not bad_priority,
252              details="; ".join(bad_priority[:10]) + (f"; ... ({len(bad_priority) - 10} more)" if len(bad_priority) > 10 else ""),
253          )
254      )
255  
256      # 5) Radicle-native protocol declaration (required for the probe to be meaningful).
257      rn_path = _discover_radicle_native_protocol_doc(repo_root)
258      if rn_path is None:
259          checks.append(
260              Check(
261                  id="rn.protocol_doc",
262                  title="Has docs/RADICLE_NATIVE_PROTOCOL.json describing identity + issues-as-protocol + replication + follow policy",
263                  ok=False,
264                  details="missing",
265              )
266          )
267          return checks
268  
269      try:
270          rn_obj = json.loads(_read_text(rn_path))
271          checks.append(
272              Check(
273                  id="rn.protocol_doc",
274                  title="Has docs/RADICLE_NATIVE_PROTOCOL.json describing identity + issues-as-protocol + replication + follow policy",
275                  ok=True,
276                  details=str(rn_path.relative_to(repo_root)).replace("\\", "/"),
277              )
278          )
279      except Exception as e:
280          checks.append(
281              Check(
282                  id="rn.protocol_doc",
283                  title="Has valid docs/RADICLE_NATIVE_PROTOCOL.json (parseable JSON)",
284                  ok=False,
285                  details=str(e),
286              )
287          )
288          return checks
289  
290      required_fields: List[Tuple[Tuple[str, ...], str]] = [
291          (("radicleNativeProtocolVersion",), "string"),
292          (("identity", "principal"), "string"),
293          (("cobs", "issuesTypename"), "string"),
294          (("cobs", "patchesTypename"), "string"),
295          (("cobs", "identityTypename"), "string"),
296          (("issuesAsProtocol", "enabled"), "bool"),
297          (("issuesAsProtocol", "schemaVersion"), "string"),
298          (("issuesAsProtocol", "messageTypes"), "list[object]"),
299          (("issuesAsProtocol", "idempotency", "strategy"), "string"),
300          (("issuesAsProtocol", "replay", "strategy"), "string"),
301          (("replication", "updatePlane"), "string"),
302          (("replication", "seedingScopeDefault"), "string (followed|all)"),
303          (("followPolicy", "requiredForFullProtocol"), "bool"),
304          (("followPolicy", "followCommand"), "string"),
305          (("followPolicy", "unfollowCommand"), "string"),
306          (("mergePolicy", "model"), "string"),
307          (("mergePolicy", "identityRef"), "string"),
308          (("mergePolicy", "defaultBranch"), "string"),
309          (("mergePolicy", "threshold"), "number"),
310          (("mergePolicy", "delegates"), "list[string]"),
311      ]
312  
313      missing: List[str] = []
314      wrong_type: List[str] = []
315  
316      for path, expected in required_fields:
317          v = _get_nested(rn_obj, path)
318          label = ".".join(path)
319          if v is None:
320              missing.append(f"{label} ({expected})")
321              continue
322          if expected.startswith("string"):
323              if not _is_nonempty_str(v):
324                  wrong_type.append(f"{label} (expected {expected}, got {type(v).__name__})")
325          elif expected == "bool":
326              if not isinstance(v, bool):
327                  wrong_type.append(f"{label} (expected bool, got {type(v).__name__})")
328          elif expected == "number":
329              if not isinstance(v, (int, float)):
330                  wrong_type.append(f"{label} (expected number, got {type(v).__name__})")
331          elif expected == "list[string]":
332              if not isinstance(v, list) or not all(isinstance(x, str) and x.strip() for x in v):
333                  if isinstance(v, list) and not v:
334                      # Empty list is allowed: delegates may be discoverable via the identity document ref.
335                      # We still require the field to exist so tools can rely on its presence.
336                      pass
337                  else:
338                      wrong_type.append(f"{label} (expected list of non-empty strings, got {type(v).__name__})")
339          elif expected == "list[object]":
340              if not isinstance(v, list) or not v or not all(isinstance(x, dict) for x in v):
341                  wrong_type.append(f"{label} (expected non-empty list of objects, got {type(v).__name__})")
342  
343      scope = _get_nested(rn_obj, ("replication", "seedingScopeDefault"))
344      if isinstance(scope, str):
345          scope_norm = scope.strip().lower()
346          if scope_norm not in {"followed", "all"}:
347              wrong_type.append(f"replication.seedingScopeDefault (expected 'followed' or 'all', got {scope!r})")
348  
349      enabled = _get_nested(rn_obj, ("issuesAsProtocol", "enabled"))
350      if isinstance(enabled, bool) and enabled is not True:
351          wrong_type.append("issuesAsProtocol.enabled (expected true for a radicle-native claim)")
352  
353      threshold = _get_nested(rn_obj, ("mergePolicy", "threshold"))
354      if isinstance(threshold, (int, float)) and threshold < 1:
355          wrong_type.append("mergePolicy.threshold (expected >= 1)")
356  
357      # Validate at least one well-formed issue message type.
358      msg_types = _get_nested(rn_obj, ("issuesAsProtocol", "messageTypes"))
359      if isinstance(msg_types, list) and msg_types:
360          bad_msgs: List[str] = []
361          for i, mt in enumerate(msg_types[:25], start=1):
362              if not isinstance(mt, dict):
363                  bad_msgs.append(f"messageTypes[{i}] not an object")
364                  continue
365              name = mt.get("name")
366              carrier = mt.get("carrier")
367              if not _is_nonempty_str(name):
368                  bad_msgs.append(f"messageTypes[{i}].name missing/empty")
369              if not _is_nonempty_str(carrier):
370                  bad_msgs.append(f"messageTypes[{i}].carrier missing/empty")
371          if bad_msgs:
372              wrong_type.append("issuesAsProtocol.messageTypes (" + "; ".join(bad_msgs[:6]) + (" ..." if len(bad_msgs) > 6 else "") + ")")
373  
374      ok_fields = not missing and not wrong_type
375      details = ""
376      if missing:
377          details += "missing: " + ", ".join(missing[:12]) + (" ..." if len(missing) > 12 else "")
378      if wrong_type:
379          if details:
380              details += "; "
381          details += "invalid: " + ", ".join(wrong_type[:12]) + (" ..." if len(wrong_type) > 12 else "")
382  
383      checks.append(
384          Check(
385              id="rn.protocol_fields",
386              title="Radicle-native protocol declaration includes required fields (identity, issues-as-protocol, replication scope, follow policy, merge policy)",
387              ok=ok_fields,
388              details=details,
389          )
390      )
391  
392      return checks
393  
394  
395  def _emit_text(checks: List[Check]) -> str:
396      lines: List[str] = []
397      for c in checks:
398          status = "PASS" if c.ok else "FAIL"
399          line = f"[{status}] {c.id} — {c.title}"
400          if c.details:
401              line += f" ({c.details})"
402          lines.append(line)
403      return "\n".join(lines).rstrip() + "\n"
404  
405  
406  def _emit_json(checks: List[Check]) -> str:
407      obj = {
408          "checks": [
409              {
410                  "id": c.id,
411                  "title": c.title,
412                  "ok": bool(c.ok),
413                  "details": c.details,
414              }
415              for c in checks
416          ]
417      }
418      return json.dumps(obj, indent=2, sort_keys=True) + "\n"
419  
420  
421  def main(argv: Optional[List[str]] = None) -> int:
422      ap = argparse.ArgumentParser()
423      ap.add_argument("--repo", default=".", help="Target repo root path (default: .)")
424      ap.add_argument("--format", default="text", choices=["text", "json"], help="Output format.")
425      args = ap.parse_args(argv)
426  
427      repo_root = Path(args.repo).expanduser().resolve()
428      checks = run_checks(repo_root)
429  
430      if args.format == "json":
431          sys.stdout.write(_emit_json(checks))
432      else:
433          sys.stdout.write(_emit_text(checks))
434  
435      return 0 if all(c.ok for c in checks) else 1
436  
437  
438  if __name__ == "__main__":
439      raise SystemExit(main())