node-metering.py
1 #!/usr/bin/env python 2 3 # This file is part of DarkFi (https://dark.fi) 4 # 5 # Copyright (C) 2020-2025 Dyne.org foundation 6 # 7 # This program is free software: you can redistribute it and/or modify 8 # it under the terms of the GNU Affero General Public License as 9 # published by the Free Software Foundation, either version 3 of the 10 # License, or (at your option) any later version. 11 # 12 # This program is distributed in the hope that it will be useful, 13 # but WITHOUT ANY WARRANTY; without even the implied warranty of 14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 # GNU Affero General Public License for more details. 16 # 17 # You should have received a copy of the GNU Affero General Public License 18 # along with this program. If not, see <https://www.gnu.org/licenses/>. 19 20 import asyncio, json, random, sys, time, argparse, csv, signal, statistics 21 from collections import defaultdict 22 23 class JsonRpc: 24 25 async def start(self, server, port): 26 reader, writer = await asyncio.open_connection(server, port) 27 self.reader = reader 28 self.writer = writer 29 30 async def stop(self): 31 self.writer.close() 32 await self.writer.wait_closed() 33 34 async def _make_request(self, method, params): 35 ident = random.randint(0, 2**16) 36 request = { 37 "jsonrpc": "2.0", 38 "method": method, 39 "params": params, 40 "id": ident, 41 } 42 43 message = json.dumps(request) + "\n" 44 self.writer.write(message.encode()) 45 await self.writer.drain() 46 47 data = await self.reader.readline() 48 message = data.decode().strip() 49 response = json.loads(message) 50 return response 51 52 async def _subscribe(self, method, params): 53 ident = random.randint(0, 2**16) 54 request = { 55 "jsonrpc": "2.0", 56 "method": method, 57 "params": params, 58 "id": ident, 59 } 60 61 message = json.dumps(request) + "\n" 62 self.writer.write(message.encode()) 63 await self.writer.drain() 64 65 async def ping(self): 66 return await self._make_request("ping", []) 67 68 async def dnet_switch(self, state): 69 return await self._make_request("dnet.switch", [state]) 70 71 async def dnet_subscribe_events(self): 72 return await self._subscribe("dnet.subscribe_events", []) 73 74 async def collect_messages(server, port, output_file): 75 rpc = JsonRpc() 76 while True: 77 try: 78 await rpc.start(server, port) 79 break 80 except OSError: 81 pass 82 83 file = open(output_file, "a", encoding="utf-8") 84 print(f"Started collecting measurements, saving to {output_file} ...") 85 try: 86 await rpc.dnet_switch(True) 87 await rpc.dnet_subscribe_events() 88 89 file_writer = csv.writer(file, delimiter="\t") 90 count = 0 91 while True: 92 data = await rpc.reader.readline() 93 data = json.loads(data) 94 95 params = data["params"][0] 96 ev = params["event"] 97 if ev != "recv": 98 continue 99 100 row = [params["info"]["cmd"], int(params["info"]["time"]), params["info"]["chan"]["addr"]] 101 file_writer.writerow(row) 102 count += 1 103 print(f"Messages collected: {count}", end="\r", flush=True) 104 except asyncio.CancelledError: 105 print("Stopping message collection.") 106 finally: 107 await rpc.dnet_switch(False) 108 await rpc.stop() 109 file.close() 110 111 def analyze_messages(output_file): 112 data = [] 113 with open(output_file, mode="r", encoding="utf-8") as file: 114 tsv_reader = csv.reader(file, delimiter="\t") 115 # 0 - message_type 116 # 1 - time in nano seconds 117 # 2 - peer_addr 118 for row in tsv_reader: 119 row[1] = int(row[1]) 120 data.append(row) 121 122 print(f"Analyzing the collected measurement data from {output_file} ...") 123 # Group by message_type and peer_addr since we want to get the number of 124 # messages we received from a particular peer in some time window 125 grouped_data = defaultdict(list) 126 for item in data: 127 grouped_data[(item[0], item[2])].append(item) 128 129 # Use a 10 second window size in nano seconds 130 window_size = 10 * 1_000_000_000 131 results = defaultdict(list) 132 133 for key, messages in grouped_data.items(): 134 messages.sort(key=lambda x: x[1]) # Sort by time 135 136 if len(messages) < 2: 137 continue 138 139 # We will start with the first item and count the number of messages 140 # of some particular message_type in a 10 second window for each peer 141 start_time = messages[0][1] 142 window_counts = [] 143 count = 0 144 145 for message in messages: 146 message_time = message[1] 147 if message_time < start_time + window_size: 148 count += 1 149 else: 150 window_counts.append(count) 151 start_time = message_time 152 count = 1 153 154 if count: 155 window_counts.append(count) 156 157 message_type = key[0] 158 # Store counts of the same message_type across different peers 159 results[message_type].extend(window_counts) 160 161 for message_type, counts in results.items(): 162 print(f"Message Type: {message_type}") 163 print(f" Count: {len(counts)}") 164 print(f" Mean : {statistics.mean(counts)}") 165 print(f" Median: {statistics.median(counts)}") 166 print(f" Variance: {statistics.variance(counts)}") 167 print(f" Max: {max(counts)}") 168 print(f" Min: {min(counts)}\n") 169 170 async def main(argv): 171 parser = argparse.ArgumentParser(description='Tool to collect and analyze measurement of received messages') 172 parser.add_argument('--server', default='127.0.0.1', help='RPC server') 173 parser.add_argument('--port', default=26660, help='Port of the RPC server') 174 parser.add_argument('--output-file', default='/tmp/node-metering-data.tsv', help='Location of the file containing the collected data') 175 parser.add_argument('--analyze', action='store_true', help='Analyzes existing message from the output file without collecting new ones') 176 177 args = parser.parse_args() 178 if args.analyze: 179 analyze_messages(args.output_file) 180 else: 181 collect_task = asyncio.create_task(collect_messages(args.server, args.port, args.output_file)) 182 loop = asyncio.get_event_loop() 183 loop.add_signal_handler(signal.SIGINT, lambda: collect_task.cancel()) 184 await collect_task 185 186 187 asyncio.run(main(sys.argv))