/ contrib / tracing / p2p_monitor.py
p2p_monitor.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2021-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  
  6  """ Interactive bitcoind P2P network traffic monitor utilizing USDT and the
  7      net:inbound_message and net:outbound_message tracepoints. """
  8  
  9  # This script demonstrates what USDT for Bitcoin Core can enable. It uses BCC
 10  # (https://github.com/iovisor/bcc) to load a sandboxed eBPF program into the
 11  # Linux kernel (root privileges are required). The eBPF program attaches to two
 12  # statically defined tracepoints. The tracepoint 'net:inbound_message' is called
 13  # when a new P2P message is received, and 'net:outbound_message' is called on
 14  # outbound P2P messages. The eBPF program submits the P2P messages to
 15  # this script via a BPF ring buffer.
 16  
 17  import curses
 18  import os
 19  import sys
 20  from curses import wrapper, panel
 21  from bcc import BPF, USDT
 22  
 23  # BCC: The C program to be compiled to an eBPF program (by BCC) and loaded into
 24  # a sandboxed Linux kernel VM.
 25  program = """
 26  #include <uapi/linux/ptrace.h>
 27  
 28  // Tor v3 addresses are 62 chars + 6 chars for the port (':12345').
 29  // I2P addresses are 60 chars + 6 chars for the port (':12345').
 30  #define MAX_PEER_ADDR_LENGTH 62 + 6
 31  #define MAX_PEER_CONN_TYPE_LENGTH 20
 32  #define MAX_MSG_TYPE_LENGTH 20
 33  
 34  struct p2p_message
 35  {
 36      u64     peer_id;
 37      char    peer_addr[MAX_PEER_ADDR_LENGTH];
 38      char    peer_conn_type[MAX_PEER_CONN_TYPE_LENGTH];
 39      char    msg_type[MAX_MSG_TYPE_LENGTH];
 40      u64     msg_size;
 41  };
 42  
 43  
 44  // Two BPF perf buffers for pushing data (here P2P messages) to user space.
 45  BPF_PERF_OUTPUT(inbound_messages);
 46  BPF_PERF_OUTPUT(outbound_messages);
 47  
 48  int trace_inbound_message(struct pt_regs *ctx) {
 49      struct p2p_message msg = {};
 50      void *paddr = NULL, *pconn_type = NULL, *pmsg_type = NULL;
 51  
 52      bpf_usdt_readarg(1, ctx, &msg.peer_id);
 53      bpf_usdt_readarg(2, ctx, &paddr);
 54      bpf_probe_read_user_str(&msg.peer_addr, sizeof(msg.peer_addr), paddr);
 55      bpf_usdt_readarg(3, ctx, &pconn_type);
 56      bpf_probe_read_user_str(&msg.peer_conn_type, sizeof(msg.peer_conn_type), pconn_type);
 57      bpf_usdt_readarg(4, ctx, &pmsg_type);
 58      bpf_probe_read_user_str(&msg.msg_type, sizeof(msg.msg_type), pmsg_type);
 59      bpf_usdt_readarg(5, ctx, &msg.msg_size);
 60  
 61      inbound_messages.perf_submit(ctx, &msg, sizeof(msg));
 62      return 0;
 63  };
 64  
 65  int trace_outbound_message(struct pt_regs *ctx) {
 66      struct p2p_message msg = {};
 67      void *paddr = NULL, *pconn_type = NULL, *pmsg_type = NULL;
 68  
 69      bpf_usdt_readarg(1, ctx, &msg.peer_id);
 70      bpf_usdt_readarg(2, ctx, &paddr);
 71      bpf_probe_read_user_str(&msg.peer_addr, sizeof(msg.peer_addr), paddr);
 72      bpf_usdt_readarg(3, ctx, &pconn_type);
 73      bpf_probe_read_user_str(&msg.peer_conn_type, sizeof(msg.peer_conn_type), pconn_type);
 74      bpf_usdt_readarg(4, ctx, &pmsg_type);
 75      bpf_probe_read_user_str(&msg.msg_type, sizeof(msg.msg_type), pmsg_type);
 76      bpf_usdt_readarg(5, ctx, &msg.msg_size);
 77  
 78      outbound_messages.perf_submit(ctx, &msg, sizeof(msg));
 79      return 0;
 80  };
 81  """
 82  
 83  
 84  class Message:
 85      """ A P2P network message. """
 86      msg_type = ""
 87      size = 0
 88      data = bytes()
 89      inbound = False
 90  
 91      def __init__(self, msg_type, size, inbound):
 92          self.msg_type = msg_type
 93          self.size = size
 94          self.inbound = inbound
 95  
 96  
 97  class Peer:
 98      """ A P2P network peer. """
 99      id = 0
100      address = ""
101      connection_type = ""
102      last_messages = list()
103  
104      total_inbound_msgs = 0
105      total_inbound_bytes = 0
106      total_outbound_msgs = 0
107      total_outbound_bytes = 0
108  
109      def __init__(self, id, address, connection_type):
110          self.id = id
111          self.address = address
112          self.connection_type = connection_type
113          self.last_messages = list()
114  
115      def add_message(self, message):
116          self.last_messages.append(message)
117          if len(self.last_messages) > 25:
118              self.last_messages.pop(0)
119          if message.inbound:
120              self.total_inbound_bytes += message.size
121              self.total_inbound_msgs += 1
122          else:
123              self.total_outbound_bytes += message.size
124              self.total_outbound_msgs += 1
125  
126  
127  def main(pid):
128      peers = dict()
129      print(f"Hooking into bitcoind with pid {pid}")
130      bitcoind_with_usdts = USDT(pid=int(pid))
131  
132      # attaching the trace functions defined in the BPF program to the tracepoints
133      bitcoind_with_usdts.enable_probe(
134          probe="inbound_message", fn_name="trace_inbound_message")
135      bitcoind_with_usdts.enable_probe(
136          probe="outbound_message", fn_name="trace_outbound_message")
137      bpf = BPF(text=program, usdt_contexts=[bitcoind_with_usdts])
138  
139      # BCC: perf buffer handle function for inbound_messages
140      def handle_inbound(_, data, size):
141          """ Inbound message handler.
142  
143          Called each time a message is submitted to the inbound_messages BPF table."""
144          event = bpf["inbound_messages"].event(data)
145          if event.peer_id not in peers:
146              peer = Peer(event.peer_id, event.peer_addr.decode(
147                  "utf-8"), event.peer_conn_type.decode("utf-8"))
148              peers[peer.id] = peer
149          peers[event.peer_id].add_message(
150              Message(event.msg_type.decode("utf-8"), event.msg_size, True))
151  
152      # BCC: perf buffer handle function for outbound_messages
153      def handle_outbound(_, data, size):
154          """ Outbound message handler.
155  
156          Called each time a message is submitted to the outbound_messages BPF table."""
157          event = bpf["outbound_messages"].event(data)
158          if event.peer_id not in peers:
159              peer = Peer(event.peer_id, event.peer_addr.decode(
160                  "utf-8"), event.peer_conn_type.decode("utf-8"))
161              peers[peer.id] = peer
162          peers[event.peer_id].add_message(
163              Message(event.msg_type.decode("utf-8"), event.msg_size, False))
164  
165      # BCC: add handlers to the inbound and outbound perf buffers
166      bpf["inbound_messages"].open_perf_buffer(handle_inbound)
167      bpf["outbound_messages"].open_perf_buffer(handle_outbound)
168  
169      wrapper(loop, bpf, peers)
170  
171  
172  def loop(screen, bpf, peers):
173      screen.nodelay(1)
174      cur_list_pos = 0
175      win = curses.newwin(30, 70, 2, 7)
176      win.erase()
177      win.border(ord("|"), ord("|"), ord("-"), ord("-"),
178                 ord("-"), ord("-"), ord("-"), ord("-"))
179      info_panel = panel.new_panel(win)
180      info_panel.hide()
181  
182      ROWS_AVAILABLE_FOR_LIST = curses.LINES - 5
183      scroll = 0
184  
185      while True:
186          try:
187              # BCC: poll the perf buffers for new events or timeout after 50ms
188              bpf.perf_buffer_poll(timeout=50)
189  
190              ch = screen.getch()
191              if (ch == curses.KEY_DOWN or ch == ord("j")) and cur_list_pos < len(
192                      peers.keys()) -1 and info_panel.hidden():
193                  cur_list_pos += 1
194                  if cur_list_pos >= ROWS_AVAILABLE_FOR_LIST:
195                      scroll += 1
196              if (ch == curses.KEY_UP or ch == ord("k")) and cur_list_pos > 0 and info_panel.hidden():
197                  cur_list_pos -= 1
198                  if scroll > 0:
199                      scroll -= 1
200              if ch == ord('\n') or ch == ord(' '):
201                  if info_panel.hidden():
202                      info_panel.show()
203                  else:
204                      info_panel.hide()
205              screen.erase()
206              render(screen, peers, cur_list_pos, scroll, ROWS_AVAILABLE_FOR_LIST, info_panel)
207              curses.panel.update_panels()
208              screen.refresh()
209          except KeyboardInterrupt:
210              exit()
211  
212  
213  def render(screen, peers, cur_list_pos, scroll, ROWS_AVAILABLE_FOR_LIST, info_panel):
214      """ renders the list of peers and details panel
215  
216      This code is unrelated to USDT, BCC and BPF.
217      """
218      header_format = "%6s  %-20s  %-20s  %-22s  %-67s"
219      row_format = "%6s  %-5d %9d byte  %-5d %9d byte  %-22s  %-67s"
220  
221      screen.addstr(0, 1, (" P2P Message Monitor "), curses.A_REVERSE)
222      screen.addstr(
223          1, 0, (" Navigate with UP/DOWN or J/K and select a peer with ENTER or SPACE to see individual P2P messages"), curses.A_NORMAL)
224      screen.addstr(3, 0,
225                    header_format % ("PEER", "OUTBOUND", "INBOUND", "TYPE", "ADDR"), curses.A_BOLD | curses.A_UNDERLINE)
226      peer_list = sorted(peers.keys())[scroll:ROWS_AVAILABLE_FOR_LIST+scroll]
227      for i, peer_id in enumerate(peer_list):
228          peer = peers[peer_id]
229          screen.addstr(i + 4, 0,
230                        row_format % (peer.id, peer.total_outbound_msgs, peer.total_outbound_bytes,
231                                      peer.total_inbound_msgs, peer.total_inbound_bytes,
232                                      peer.connection_type, peer.address),
233                        curses.A_REVERSE if i + scroll == cur_list_pos else curses.A_NORMAL)
234          if i + scroll == cur_list_pos:
235              info_window = info_panel.window()
236              info_window.erase()
237              info_window.border(
238                  ord("|"), ord("|"), ord("-"), ord("-"),
239                  ord("-"), ord("-"), ord("-"), ord("-"))
240  
241              info_window.addstr(
242                  1, 1, f"PEER {peer.id} ({peer.address})".center(68), curses.A_REVERSE | curses.A_BOLD)
243              info_window.addstr(
244                  2, 1, f" OUR NODE{peer.connection_type:^54}PEER ",
245                  curses.A_BOLD)
246              for i, msg in enumerate(peer.last_messages):
247                  if msg.inbound:
248                      info_window.addstr(
249                          i + 3, 1, "%68s" %
250                          (f"<--- {msg.msg_type} ({msg.size} bytes) "), curses.A_NORMAL)
251                  else:
252                      info_window.addstr(
253                          i + 3, 1, " %s (%d byte) --->" %
254                          (msg.msg_type, msg.size), curses.A_NORMAL)
255  
256  
257  def running_as_root():
258      return os.getuid() == 0
259  
260  if __name__ == "__main__":
261      if len(sys.argv) != 2:
262          print("USAGE:", sys.argv[0], "<pid of bitcoind>")
263          exit()
264      if not running_as_root():
265          print("You might not have the privileges required to hook into the tracepoints!")
266      pid = sys.argv[1]
267      main(pid)