radicle_native_probe.py
1 #!/usr/bin/env python3 2 from __future__ import annotations 3 4 import argparse 5 import json 6 import re 7 import sys 8 from dataclasses import dataclass 9 from pathlib import Path 10 from typing import Dict, Iterable, List, Optional, Tuple 11 12 13 PROFILE_RE = re.compile(r"^\s*##\s+Profile:\s*(?P<name>.+?)\s*$", re.MULTILINE) 14 SCENARIO_RE = re.compile(r"^\s*Scenario(?:\s+Outline)?:\s*(?P<title>.+?)\s*$") 15 AS_A_RE = re.compile(r"^\s*As\s+an?\s+(?P<role>.+?)\s*$", re.IGNORECASE) 16 TAGLINE_RE = re.compile(r"^\s*@") 17 18 PENDING_TAGS = {"pending", "wip", "skip"} 19 PRIORITY_TAGS = {"p1", "p2", "p3", "p0"} # legacy p0 is treated as p1 by radicle-priorities 20 21 22 @dataclass(frozen=True) 23 class Check: 24 id: str 25 title: str 26 ok: bool 27 details: str = "" 28 29 30 def _read_text(path: Path) -> str: 31 return path.read_text(encoding="utf-8", errors="replace") 32 33 34 def _discover_profiles_doc(repo_root: Path) -> Optional[Path]: 35 candidates = [ 36 repo_root / "docs" / "USER_PROFILES.md", 37 repo_root / "bdd" / "USER_PROFILES.md", 38 repo_root / "USER_PROFILES.md", 39 ] 40 for p in candidates: 41 try: 42 if p.exists() and p.is_file(): 43 return p 44 except Exception: 45 continue 46 return None 47 48 49 def _discover_radicle_native_protocol_doc(repo_root: Path) -> Optional[Path]: 50 candidates = [ 51 repo_root / "docs" / "RADICLE_NATIVE_PROTOCOL.json", 52 repo_root / "RADICLE_NATIVE_PROTOCOL.json", 53 repo_root / "bdd" / "RADICLE_NATIVE_PROTOCOL.json", 54 ] 55 for p in candidates: 56 try: 57 if p.exists() and p.is_file(): 58 return p 59 except Exception: 60 continue 61 return None 62 63 64 def _get_nested(obj: object, path: Tuple[str, ...]) -> object: 65 cur: object = obj 66 for key in path: 67 if not isinstance(cur, dict): 68 return None 69 cur = cur.get(key) 70 return cur 71 72 73 def _is_nonempty_str(v: object) -> bool: 74 return isinstance(v, str) and bool(v.strip()) 75 76 77 def _load_profile_names(repo_root: Path) -> List[str]: 78 p = _discover_profiles_doc(repo_root) 79 if p is None: 80 return [] 81 text = _read_text(p) 82 out: List[str] = [] 83 for m in PROFILE_RE.finditer(text): 84 name = str(m.group("name") or "").strip() 85 if name: 86 out.append(name) 87 return out 88 89 90 def _norm(s: str) -> str: 91 return str(s or "").strip().lower() 92 93 94 def _iter_feature_files(repo_root: Path) -> Iterable[Path]: 95 base = repo_root / "bdd" / "features" 96 if not base.exists() or not base.is_dir(): 97 return [] 98 return sorted(base.glob("*.feature"), key=lambda p: str(p).lower()) 99 100 101 def _parse_scenarios(path: Path) -> List[Tuple[int, str, List[str], Optional[str]]]: 102 """ 103 Returns: [(line_no, scenario_title, tags, actor_role)] 104 - tags: raw @tags for the scenario (only the tagline immediately preceding Scenario) 105 - actor_role: first "As a/an ..." line after Scenario, if any 106 """ 107 lines = _read_text(path).splitlines() 108 out: List[Tuple[int, str, List[str], Optional[str]]] = [] 109 110 next_tags: List[str] = [] 111 current_title: Optional[str] = None 112 current_line_no: int = 0 113 current_tags: List[str] = [] 114 actor_role: Optional[str] = None 115 116 def finalize() -> None: 117 nonlocal current_title, current_line_no, current_tags, actor_role 118 if current_title is None: 119 return 120 out.append((current_line_no, current_title, list(current_tags), actor_role)) 121 current_title = None 122 current_line_no = 0 123 current_tags = [] 124 actor_role = None 125 126 for idx, raw in enumerate(lines, start=1): 127 line = str(raw or "") 128 129 if TAGLINE_RE.match(line.strip()): 130 for tok in line.strip().split(): 131 if tok.startswith("@"): 132 next_tags.append(tok.strip()) 133 continue 134 135 m = SCENARIO_RE.match(line) 136 if m: 137 finalize() 138 current_title = str(m.group("title") or "").strip() or f"Scenario at line {idx}" 139 current_line_no = idx 140 current_tags = list(next_tags) 141 next_tags = [] 142 actor_role = None 143 continue 144 145 if current_title is None: 146 continue 147 148 if actor_role is None: 149 m = AS_A_RE.match(line) 150 if m: 151 actor_role = str(m.group("role") or "").strip() or None 152 153 finalize() 154 return out 155 156 157 def _normalize_tags(raw_tags: List[str]) -> List[str]: 158 return [str(t or "").lstrip("@").strip().lower() for t in (raw_tags or []) if str(t or "").strip()] 159 160 161 def run_checks(repo_root: Path) -> List[Check]: 162 checks: List[Check] = [] 163 164 if not repo_root.exists() or not repo_root.is_dir(): 165 return [Check(id="repo.exists", title="Repo path exists and is a directory", ok=False, details=str(repo_root))] 166 167 # 1) Protocol declaration (recommended). 168 proto_path = repo_root / "bdd" / "RADICLE_PRIORITIES.json" 169 if proto_path.exists(): 170 try: 171 obj = json.loads(_read_text(proto_path)) 172 pv = str(obj.get("protocolVersion", "")).strip() 173 fr = str(obj.get("featuresRoot", "")).strip() 174 ok = bool(pv) and bool(fr) 175 details = "" if ok else f"protocolVersion={pv!r} featuresRoot={fr!r}" 176 checks.append(Check(id="rp.protocol", title="Has bdd/RADICLE_PRIORITIES.json (protocolVersion + featuresRoot)", ok=ok, details=details)) 177 except Exception as e: 178 checks.append(Check(id="rp.protocol", title="Has valid bdd/RADICLE_PRIORITIES.json", ok=False, details=str(e))) 179 else: 180 checks.append(Check(id="rp.protocol", title="Has bdd/RADICLE_PRIORITIES.json (recommended)", ok=False, details="missing")) 181 182 # 2) Canonical backlog exists. 183 feature_files = list(_iter_feature_files(repo_root)) 184 checks.append( 185 Check( 186 id="bdd.features", 187 title="Has one or more .feature files under bdd/features/", 188 ok=bool(feature_files), 189 details=f"count={len(feature_files)}", 190 ) 191 ) 192 193 # 3) Actor profiles exist + scenarios declare actor role. 194 profiles = _load_profile_names(repo_root) 195 profile_set = {_norm(p) for p in profiles if _norm(p)} 196 checks.append( 197 Check( 198 id="bdd.profiles_doc", 199 title="Has a USER_PROFILES.md (docs/USER_PROFILES.md preferred)", 200 ok=bool(profiles), 201 details=f"count={len(profiles)}", 202 ) 203 ) 204 205 missing_actor: List[str] = [] 206 unknown_actor: List[str] = [] 207 208 for ff in feature_files: 209 for line_no, title, raw_tags, actor_role in _parse_scenarios(ff): 210 scenario_loc = f"{ff.as_posix()}:{line_no}::{title}" 211 if not actor_role: 212 missing_actor.append(scenario_loc) 213 continue 214 if profile_set and _norm(actor_role) not in profile_set: 215 unknown_actor.append(f"{scenario_loc} => {actor_role!r}") 216 217 checks.append( 218 Check( 219 id="bdd.actor_lines", 220 title="Every scenario declares an actor line (As a <Profile>)", 221 ok=not missing_actor, 222 details="; ".join(missing_actor[:10]) + (f"; ... ({len(missing_actor) - 10} more)" if len(missing_actor) > 10 else ""), 223 ) 224 ) 225 checks.append( 226 Check( 227 id="bdd.actor_profiles_known", 228 title="Actor profiles used in scenarios exist in USER_PROFILES.md", 229 ok=not unknown_actor, 230 details="; ".join(unknown_actor[:10]) + (f"; ... ({len(unknown_actor) - 10} more)" if len(unknown_actor) > 10 else ""), 231 ) 232 ) 233 234 # 4) Pending scenarios should be triaged with a priority tag (or be triage). 235 bad_priority: List[str] = [] 236 for ff in feature_files: 237 for line_no, title, raw_tags, _actor_role in _parse_scenarios(ff): 238 tags = _normalize_tags(raw_tags) 239 is_pending = any(t in PENDING_TAGS for t in tags) 240 if not is_pending: 241 continue 242 prios = [t for t in tags if t in PRIORITY_TAGS] 243 # triage is allowed: zero prios. 244 if len(prios) > 1: 245 bad_priority.append(f"{ff.as_posix()}:{line_no}::{title} (multiple priority tags: {prios})") 246 247 checks.append( 248 Check( 249 id="bdd.priority_tags", 250 title="Pending scenarios have <= 1 priority tag (@p1/@p2/@p3; optional triage if none)", 251 ok=not bad_priority, 252 details="; ".join(bad_priority[:10]) + (f"; ... ({len(bad_priority) - 10} more)" if len(bad_priority) > 10 else ""), 253 ) 254 ) 255 256 # 5) Radicle-native protocol declaration (required for the probe to be meaningful). 257 rn_path = _discover_radicle_native_protocol_doc(repo_root) 258 if rn_path is None: 259 checks.append( 260 Check( 261 id="rn.protocol_doc", 262 title="Has docs/RADICLE_NATIVE_PROTOCOL.json describing identity + issues-as-protocol + replication + follow policy", 263 ok=False, 264 details="missing", 265 ) 266 ) 267 return checks 268 269 try: 270 rn_obj = json.loads(_read_text(rn_path)) 271 checks.append( 272 Check( 273 id="rn.protocol_doc", 274 title="Has docs/RADICLE_NATIVE_PROTOCOL.json describing identity + issues-as-protocol + replication + follow policy", 275 ok=True, 276 details=str(rn_path.relative_to(repo_root)).replace("\\", "/"), 277 ) 278 ) 279 except Exception as e: 280 checks.append( 281 Check( 282 id="rn.protocol_doc", 283 title="Has valid docs/RADICLE_NATIVE_PROTOCOL.json (parseable JSON)", 284 ok=False, 285 details=str(e), 286 ) 287 ) 288 return checks 289 290 required_fields: List[Tuple[Tuple[str, ...], str]] = [ 291 (("radicleNativeProtocolVersion",), "string"), 292 (("identity", "principal"), "string"), 293 (("cobs", "issuesTypename"), "string"), 294 (("cobs", "patchesTypename"), "string"), 295 (("cobs", "identityTypename"), "string"), 296 (("issuesAsProtocol", "enabled"), "bool"), 297 (("issuesAsProtocol", "schemaVersion"), "string"), 298 (("issuesAsProtocol", "messageTypes"), "list[object]"), 299 (("issuesAsProtocol", "idempotency", "strategy"), "string"), 300 (("issuesAsProtocol", "replay", "strategy"), "string"), 301 (("replication", "updatePlane"), "string"), 302 (("replication", "seedingScopeDefault"), "string (followed|all)"), 303 (("followPolicy", "requiredForFullProtocol"), "bool"), 304 (("followPolicy", "followCommand"), "string"), 305 (("followPolicy", "unfollowCommand"), "string"), 306 (("mergePolicy", "model"), "string"), 307 (("mergePolicy", "identityRef"), "string"), 308 (("mergePolicy", "defaultBranch"), "string"), 309 (("mergePolicy", "threshold"), "number"), 310 (("mergePolicy", "delegates"), "list[string]"), 311 ] 312 313 missing: List[str] = [] 314 wrong_type: List[str] = [] 315 316 for path, expected in required_fields: 317 v = _get_nested(rn_obj, path) 318 label = ".".join(path) 319 if v is None: 320 missing.append(f"{label} ({expected})") 321 continue 322 if expected.startswith("string"): 323 if not _is_nonempty_str(v): 324 wrong_type.append(f"{label} (expected {expected}, got {type(v).__name__})") 325 elif expected == "bool": 326 if not isinstance(v, bool): 327 wrong_type.append(f"{label} (expected bool, got {type(v).__name__})") 328 elif expected == "number": 329 if not isinstance(v, (int, float)): 330 wrong_type.append(f"{label} (expected number, got {type(v).__name__})") 331 elif expected == "list[string]": 332 if not isinstance(v, list) or not all(isinstance(x, str) and x.strip() for x in v): 333 if isinstance(v, list) and not v: 334 # Empty list is allowed: delegates may be discoverable via the identity document ref. 335 # We still require the field to exist so tools can rely on its presence. 336 pass 337 else: 338 wrong_type.append(f"{label} (expected list of non-empty strings, got {type(v).__name__})") 339 elif expected == "list[object]": 340 if not isinstance(v, list) or not v or not all(isinstance(x, dict) for x in v): 341 wrong_type.append(f"{label} (expected non-empty list of objects, got {type(v).__name__})") 342 343 scope = _get_nested(rn_obj, ("replication", "seedingScopeDefault")) 344 if isinstance(scope, str): 345 scope_norm = scope.strip().lower() 346 if scope_norm not in {"followed", "all"}: 347 wrong_type.append(f"replication.seedingScopeDefault (expected 'followed' or 'all', got {scope!r})") 348 349 enabled = _get_nested(rn_obj, ("issuesAsProtocol", "enabled")) 350 if isinstance(enabled, bool) and enabled is not True: 351 wrong_type.append("issuesAsProtocol.enabled (expected true for a radicle-native claim)") 352 353 threshold = _get_nested(rn_obj, ("mergePolicy", "threshold")) 354 if isinstance(threshold, (int, float)) and threshold < 1: 355 wrong_type.append("mergePolicy.threshold (expected >= 1)") 356 357 # Validate at least one well-formed issue message type. 358 msg_types = _get_nested(rn_obj, ("issuesAsProtocol", "messageTypes")) 359 if isinstance(msg_types, list) and msg_types: 360 bad_msgs: List[str] = [] 361 for i, mt in enumerate(msg_types[:25], start=1): 362 if not isinstance(mt, dict): 363 bad_msgs.append(f"messageTypes[{i}] not an object") 364 continue 365 name = mt.get("name") 366 carrier = mt.get("carrier") 367 if not _is_nonempty_str(name): 368 bad_msgs.append(f"messageTypes[{i}].name missing/empty") 369 if not _is_nonempty_str(carrier): 370 bad_msgs.append(f"messageTypes[{i}].carrier missing/empty") 371 if bad_msgs: 372 wrong_type.append("issuesAsProtocol.messageTypes (" + "; ".join(bad_msgs[:6]) + (" ..." if len(bad_msgs) > 6 else "") + ")") 373 374 ok_fields = not missing and not wrong_type 375 details = "" 376 if missing: 377 details += "missing: " + ", ".join(missing[:12]) + (" ..." if len(missing) > 12 else "") 378 if wrong_type: 379 if details: 380 details += "; " 381 details += "invalid: " + ", ".join(wrong_type[:12]) + (" ..." if len(wrong_type) > 12 else "") 382 383 checks.append( 384 Check( 385 id="rn.protocol_fields", 386 title="Radicle-native protocol declaration includes required fields (identity, issues-as-protocol, replication scope, follow policy, merge policy)", 387 ok=ok_fields, 388 details=details, 389 ) 390 ) 391 392 return checks 393 394 395 def _emit_text(checks: List[Check]) -> str: 396 lines: List[str] = [] 397 for c in checks: 398 status = "PASS" if c.ok else "FAIL" 399 line = f"[{status}] {c.id} — {c.title}" 400 if c.details: 401 line += f" ({c.details})" 402 lines.append(line) 403 return "\n".join(lines).rstrip() + "\n" 404 405 406 def _emit_json(checks: List[Check]) -> str: 407 obj = { 408 "checks": [ 409 { 410 "id": c.id, 411 "title": c.title, 412 "ok": bool(c.ok), 413 "details": c.details, 414 } 415 for c in checks 416 ] 417 } 418 return json.dumps(obj, indent=2, sort_keys=True) + "\n" 419 420 421 def main(argv: Optional[List[str]] = None) -> int: 422 ap = argparse.ArgumentParser() 423 ap.add_argument("--repo", default=".", help="Target repo root path (default: .)") 424 ap.add_argument("--format", default="text", choices=["text", "json"], help="Output format.") 425 args = ap.parse_args(argv) 426 427 repo_root = Path(args.repo).expanduser().resolve() 428 checks = run_checks(repo_root) 429 430 if args.format == "json": 431 sys.stdout.write(_emit_json(checks)) 432 else: 433 sys.stdout.write(_emit_text(checks)) 434 435 return 0 if all(c.ok for c in checks) else 1 436 437 438 if __name__ == "__main__": 439 raise SystemExit(main())