nodetool.py
1 #!/usr/bin/env python 2 3 # This file is part of DarkFi (https://dark.fi) 4 # 5 # Copyright (C) 2020-2025 Dyne.org foundation 6 # 7 # This program is free software: you can redistribute it and/or modify 8 # it under the terms of the GNU Affero General Public License as 9 # published by the Free Software Foundation, either version 3 of the 10 # License, or (at your option) any later version. 11 # 12 # This program is distributed in the hope that it will be useful, 13 # but WITHOUT ANY WARRANTY; without even the implied warranty of 14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 # GNU Affero General Public License for more details. 16 # 17 # You should have received a copy of the GNU Affero General Public License 18 # along with this program. If not, see <https://www.gnu.org/licenses/>. 19 20 import asyncio, json, random, sys, time 21 22 class JsonRpc: 23 24 async def start(self, server, port): 25 reader, writer = await asyncio.open_connection(server, port) 26 self.reader = reader 27 self.writer = writer 28 29 async def stop(self): 30 self.writer.close() 31 await self.writer.wait_closed() 32 33 async def _make_request(self, method, params): 34 ident = random.randint(0, 2**16) 35 print(ident) 36 request = { 37 "jsonrpc": "2.0", 38 "method": method, 39 "params": params, 40 "id": ident, 41 } 42 43 message = json.dumps(request) + "\n" 44 self.writer.write(message.encode()) 45 await self.writer.drain() 46 47 data = await self.reader.readline() 48 message = data.decode().strip() 49 response = json.loads(message) 50 print(response) 51 return response 52 53 async def _subscribe(self, method, params): 54 ident = random.randint(0, 2**16) 55 request = { 56 "jsonrpc": "2.0", 57 "method": method, 58 "params": params, 59 "id": ident, 60 } 61 62 message = json.dumps(request) + "\n" 63 self.writer.write(message.encode()) 64 await self.writer.drain() 65 print("Subscribed") 66 67 async def ping(self): 68 return await self._make_request("ping", []) 69 70 async def dnet_switch(self, state): 71 return await self._make_request("dnet.switch", [state]) 72 73 async def dnet_subscribe_events(self): 74 return await self._subscribe("dnet.subscribe_events", []) 75 76 async def main(argv): 77 rpc = JsonRpc() 78 while True: 79 try: 80 await rpc.start("localhost", 26660) 81 break 82 except OSError: 83 pass 84 await rpc.dnet_switch(True) 85 await rpc.dnet_subscribe_events() 86 87 while True: 88 data = await rpc.reader.readline() 89 #with open("rpclog", "a") as f: 90 # f.write(data.decode()) 91 data = json.loads(data) 92 93 params = data["params"][0] 94 ev = params["event"] 95 if ev in ["send", "recv"]: 96 continue 97 info = params["info"] 98 99 t = time.localtime() 100 current_time = time.strftime("%H:%M:%S", t) 101 102 match ev: 103 case "inbound_connected": 104 addr = info["addr"] 105 print(f"{current_time} inbound (connect): {addr}") 106 case "inbound_disconnected": 107 addr = info["addr"] 108 print(f"{current_time} inbound (disconnect): {addr}") 109 case "outbound_slot_sleeping": 110 slot = info["slot"] 111 print(f"{current_time} slot {slot}: sleeping") 112 case "outbound_slot_connecting": 113 slot = info["slot"] 114 addr = info["addr"] 115 print(f"{current_time} slot {slot}: connecting addr={addr}") 116 case "outbound_slot_connected": 117 slot = info["slot"] 118 addr = info["addr"] 119 channel_id = info["channel_id"] 120 print(f"{current_time} slot {slot}: connected addr={addr}") 121 case "outbound_slot_disconnected": 122 slot = info["slot"] 123 err = info["err"] 124 print(f"{current_time} slot {slot}: disconnected err='{err}'") 125 case "outbound_peer_discovery": 126 attempt = info["attempt"] 127 state = info["state"] 128 print(f"{current_time} peer_discovery: {state} (attempt {attempt})") 129 #print(data) 130 131 await rpc.dnet_switch(False) 132 await rpc.stop() 133 134 asyncio.run(main(sys.argv))