radicle_util.py
1 import subprocess 2 import os 3 from typing import Optional, Dict, Any, Tuple 4 5 class RadicleUtil: 6 """ 7 Wrapper for the Radicle CLI (`rad`). 8 Handles creating repositories, pushing to the P2P network, and retrieving Knowledge State IDs. 9 """ 10 11 @staticmethod 12 def is_installed() -> bool: 13 """Checks if the radicle CLI is installed.""" 14 try: 15 result = subprocess.run(["rad", "--version"], capture_output=True, text=True) 16 return result.returncode == 0 17 except FileNotFoundError: 18 return False 19 20 @staticmethod 21 def get_node_id() -> Optional[str]: 22 """Gets the local Radicle node ID (DID).""" 23 if not RadicleUtil.is_installed(): 24 return "did:key:mock_radicle_node_id_for_testing" 25 26 try: 27 result = subprocess.run(["rad", "self", "--did"], capture_output=True, text=True, check=True) 28 return result.stdout.strip() 29 except subprocess.CalledProcessError: 30 return None 31 32 @staticmethod 33 def get_repo_id(repo_path: str) -> Optional[str]: 34 """Gets the Repository ID (RID) for a given git repository initialized with Radicle.""" 35 if not RadicleUtil.is_installed(): 36 return "rad:z3NhCeuYEVXwf2MPT5upkAs3uft91" 37 38 try: 39 result = subprocess.run(["rad", "inspect", "--rid"], cwd=repo_path, capture_output=True, text=True, check=True) 40 return result.stdout.strip() 41 except subprocess.CalledProcessError: 42 return None 43 44 @staticmethod 45 def sync_repo(repo_path: str) -> bool: 46 """ 47 Syncs the local repository with the Radicle network (gossip protocol). 48 Equivalent to `rad sync`. 49 """ 50 if not RadicleUtil.is_installed(): 51 print("[*] Radicle CLI missing. Mocking network sync...") 52 return True 53 54 try: 55 subprocess.run(["rad", "sync", "--announce"], cwd=repo_path, check=True, capture_output=True) 56 return True 57 except subprocess.CalledProcessError as e: 58 print(f"Error syncing Radicle repo: {e}") 59 return False