/ script / node-metering.py
node-metering.py
  1  #!/usr/bin/env python
  2  
  3  # This file is part of DarkFi (https://dark.fi)
  4  #
  5  # Copyright (C) 2020-2025 Dyne.org foundation
  6  #
  7  # This program is free software: you can redistribute it and/or modify
  8  # it under the terms of the GNU Affero General Public License as
  9  # published by the Free Software Foundation, either version 3 of the
 10  # License, or (at your option) any later version.
 11  #
 12  # This program is distributed in the hope that it will be useful,
 13  # but WITHOUT ANY WARRANTY; without even the implied warranty of
 14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 15  # GNU Affero General Public License for more details.
 16  #
 17  # You should have received a copy of the GNU Affero General Public License
 18  # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 19  
 20  import asyncio, json, random, sys, time, argparse, csv, signal, statistics
 21  from collections import defaultdict
 22  
 23  class JsonRpc:
 24  
 25      async def start(self, server, port):
 26          reader, writer = await asyncio.open_connection(server, port)
 27          self.reader = reader
 28          self.writer = writer
 29  
 30      async def stop(self):
 31          self.writer.close()
 32          await self.writer.wait_closed()
 33  
 34      async def _make_request(self, method, params):
 35          ident = random.randint(0, 2**16)
 36          request = {
 37              "jsonrpc": "2.0",
 38              "method": method,
 39              "params": params,
 40              "id": ident,
 41          }
 42  
 43          message = json.dumps(request) + "\n"
 44          self.writer.write(message.encode())
 45          await self.writer.drain()
 46  
 47          data = await self.reader.readline()
 48          message = data.decode().strip()
 49          response = json.loads(message)
 50          return response
 51  
 52      async def _subscribe(self, method, params):
 53          ident = random.randint(0, 2**16)
 54          request = {
 55              "jsonrpc": "2.0",
 56              "method": method,
 57              "params": params,
 58              "id": ident,
 59          }
 60  
 61          message = json.dumps(request) + "\n"
 62          self.writer.write(message.encode())
 63          await self.writer.drain()
 64  
 65      async def ping(self):
 66          return await self._make_request("ping", [])
 67  
 68      async def dnet_switch(self, state):
 69          return await self._make_request("dnet.switch", [state])
 70  
 71      async def dnet_subscribe_events(self):
 72          return await self._subscribe("dnet.subscribe_events", [])
 73  
 74  async def collect_messages(server, port, output_file):
 75      rpc = JsonRpc()
 76      while True:
 77          try:
 78              await rpc.start(server, port)
 79              break
 80          except OSError:
 81              pass
 82  
 83      file = open(output_file, "a", encoding="utf-8")
 84      print(f"Started collecting measurements, saving to {output_file} ...")
 85      try:
 86          await rpc.dnet_switch(True)
 87          await rpc.dnet_subscribe_events()
 88  
 89          file_writer = csv.writer(file, delimiter="\t")
 90          count = 0
 91          while True:
 92              data = await rpc.reader.readline()
 93              data = json.loads(data)
 94  
 95              params = data["params"][0]
 96              ev = params["event"]
 97              if ev != "recv":
 98                  continue
 99  
100              row = [params["info"]["cmd"], int(params["info"]["time"]), params["info"]["chan"]["addr"]]
101              file_writer.writerow(row)
102              count += 1
103              print(f"Messages collected: {count}", end="\r", flush=True)
104      except asyncio.CancelledError:
105          print("Stopping message collection.")
106      finally:
107          await rpc.dnet_switch(False)
108          await rpc.stop()
109          file.close()
110  
111  def analyze_messages(output_file):
112      data = []
113      with open(output_file, mode="r", encoding="utf-8") as file:
114          tsv_reader = csv.reader(file, delimiter="\t")
115          # 0 - message_type
116          # 1 - time in nano seconds
117          # 2 - peer_addr
118          for row in tsv_reader:
119              row[1] = int(row[1])
120              data.append(row)
121  
122      print(f"Analyzing the collected measurement data from {output_file} ...")
123      # Group by message_type and peer_addr since we want to get the number of
124      # messages we received from a particular peer in some time window
125      grouped_data = defaultdict(list)
126      for item in data:
127          grouped_data[(item[0], item[2])].append(item)
128  
129      # Use a 10 second window size in nano seconds
130      window_size = 10 * 1_000_000_000
131      results = defaultdict(list)
132  
133      for key, messages in grouped_data.items():
134          messages.sort(key=lambda x: x[1]) # Sort by time
135  
136          if len(messages) < 2:
137              continue
138  
139          # We will start with the first item and count the number of messages
140          # of some particular message_type in a 10 second window for each peer
141          start_time = messages[0][1]
142          window_counts = []
143          count = 0
144  
145          for message in messages:
146              message_time = message[1]
147              if message_time < start_time + window_size:
148                  count += 1
149              else:
150                  window_counts.append(count)
151                  start_time = message_time
152                  count = 1
153  
154          if count:
155              window_counts.append(count)
156  
157          message_type = key[0]
158          # Store counts of the same message_type across different peers
159          results[message_type].extend(window_counts)
160  
161      for message_type, counts in results.items():
162          print(f"Message Type: {message_type}")
163          print(f"    Count: {len(counts)}")
164          print(f"    Mean : {statistics.mean(counts)}")
165          print(f"    Median: {statistics.median(counts)}")
166          print(f"    Variance: {statistics.variance(counts)}")
167          print(f"    Max: {max(counts)}")
168          print(f"    Min: {min(counts)}\n")
169  
170  async def main(argv):
171      parser = argparse.ArgumentParser(description='Tool to collect and analyze measurement of received messages')
172      parser.add_argument('--server', default='127.0.0.1', help='RPC server')
173      parser.add_argument('--port', default=26660, help='Port of the RPC server')
174      parser.add_argument('--output-file', default='/tmp/node-metering-data.tsv', help='Location of the file containing the collected data')
175      parser.add_argument('--analyze', action='store_true', help='Analyzes existing message from the output file without collecting new ones')
176  
177      args = parser.parse_args()
178      if args.analyze:
179          analyze_messages(args.output_file)
180      else:
181          collect_task = asyncio.create_task(collect_messages(args.server, args.port, args.output_file))
182          loop = asyncio.get_event_loop()
183          loop.add_signal_handler(signal.SIGINT, lambda: collect_task.cancel())
184          await collect_task
185  
186  
187  asyncio.run(main(sys.argv))