/ src / radicle_util.py
radicle_util.py
 1  import subprocess
 2  import os
 3  from typing import Optional, Dict, Any, Tuple
 4  
 5  class RadicleUtil:
 6      """
 7      Wrapper for the Radicle CLI (`rad`).
 8      Handles creating repositories, pushing to the P2P network, and retrieving Knowledge State IDs.
 9      """
10  
11      @staticmethod
12      def is_installed() -> bool:
13          """Checks if the radicle CLI is installed."""
14          try:
15              result = subprocess.run(["rad", "--version"], capture_output=True, text=True)
16              return result.returncode == 0
17          except FileNotFoundError:
18              return False
19  
20      @staticmethod
21      def get_node_id() -> Optional[str]:
22          """Gets the local Radicle node ID (DID)."""
23          if not RadicleUtil.is_installed():
24              return "did:key:mock_radicle_node_id_for_testing"
25              
26          try:
27              result = subprocess.run(["rad", "self", "--did"], capture_output=True, text=True, check=True)
28              return result.stdout.strip()
29          except subprocess.CalledProcessError:
30              return None
31  
32      @staticmethod
33      def get_repo_id(repo_path: str) -> Optional[str]:
34          """Gets the Repository ID (RID) for a given git repository initialized with Radicle."""
35          if not RadicleUtil.is_installed():
36              return "rad:z3NhCeuYEVXwf2MPT5upkAs3uft91"
37  
38          try:
39              result = subprocess.run(["rad", "inspect", "--rid"], cwd=repo_path, capture_output=True, text=True, check=True)
40              return result.stdout.strip()
41          except subprocess.CalledProcessError:
42              return None
43  
44      @staticmethod
45      def sync_repo(repo_path: str) -> bool:
46          """
47          Syncs the local repository with the Radicle network (gossip protocol).
48          Equivalent to `rad sync`.
49          """
50          if not RadicleUtil.is_installed():
51              print("[*] Radicle CLI missing. Mocking network sync...")
52              return True
53  
54          try:
55              subprocess.run(["rad", "sync", "--announce"], cwd=repo_path, check=True, capture_output=True)
56              return True
57          except subprocess.CalledProcessError as e:
58              print(f"Error syncing Radicle repo: {e}")
59              return False