p2p_monitor.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2021-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 6 """ Interactive bitcoind P2P network traffic monitor utilizing USDT and the 7 net:inbound_message and net:outbound_message tracepoints. """ 8 9 # This script demonstrates what USDT for Bitcoin Core can enable. It uses BCC 10 # (https://github.com/iovisor/bcc) to load a sandboxed eBPF program into the 11 # Linux kernel (root privileges are required). The eBPF program attaches to two 12 # statically defined tracepoints. The tracepoint 'net:inbound_message' is called 13 # when a new P2P message is received, and 'net:outbound_message' is called on 14 # outbound P2P messages. The eBPF program submits the P2P messages to 15 # this script via a BPF ring buffer. 16 17 import curses 18 import os 19 import sys 20 from curses import wrapper, panel 21 from bcc import BPF, USDT 22 23 # BCC: The C program to be compiled to an eBPF program (by BCC) and loaded into 24 # a sandboxed Linux kernel VM. 25 program = """ 26 #include <uapi/linux/ptrace.h> 27 28 // Tor v3 addresses are 62 chars + 6 chars for the port (':12345'). 29 // I2P addresses are 60 chars + 6 chars for the port (':12345'). 30 #define MAX_PEER_ADDR_LENGTH 62 + 6 31 #define MAX_PEER_CONN_TYPE_LENGTH 20 32 #define MAX_MSG_TYPE_LENGTH 20 33 34 struct p2p_message 35 { 36 u64 peer_id; 37 char peer_addr[MAX_PEER_ADDR_LENGTH]; 38 char peer_conn_type[MAX_PEER_CONN_TYPE_LENGTH]; 39 char msg_type[MAX_MSG_TYPE_LENGTH]; 40 u64 msg_size; 41 }; 42 43 44 // Two BPF perf buffers for pushing data (here P2P messages) to user space. 45 BPF_PERF_OUTPUT(inbound_messages); 46 BPF_PERF_OUTPUT(outbound_messages); 47 48 int trace_inbound_message(struct pt_regs *ctx) { 49 struct p2p_message msg = {}; 50 void *paddr = NULL, *pconn_type = NULL, *pmsg_type = NULL; 51 52 bpf_usdt_readarg(1, ctx, &msg.peer_id); 53 bpf_usdt_readarg(2, ctx, &paddr); 54 bpf_probe_read_user_str(&msg.peer_addr, sizeof(msg.peer_addr), paddr); 55 bpf_usdt_readarg(3, ctx, &pconn_type); 56 bpf_probe_read_user_str(&msg.peer_conn_type, sizeof(msg.peer_conn_type), pconn_type); 57 bpf_usdt_readarg(4, ctx, &pmsg_type); 58 bpf_probe_read_user_str(&msg.msg_type, sizeof(msg.msg_type), pmsg_type); 59 bpf_usdt_readarg(5, ctx, &msg.msg_size); 60 61 inbound_messages.perf_submit(ctx, &msg, sizeof(msg)); 62 return 0; 63 }; 64 65 int trace_outbound_message(struct pt_regs *ctx) { 66 struct p2p_message msg = {}; 67 void *paddr = NULL, *pconn_type = NULL, *pmsg_type = NULL; 68 69 bpf_usdt_readarg(1, ctx, &msg.peer_id); 70 bpf_usdt_readarg(2, ctx, &paddr); 71 bpf_probe_read_user_str(&msg.peer_addr, sizeof(msg.peer_addr), paddr); 72 bpf_usdt_readarg(3, ctx, &pconn_type); 73 bpf_probe_read_user_str(&msg.peer_conn_type, sizeof(msg.peer_conn_type), pconn_type); 74 bpf_usdt_readarg(4, ctx, &pmsg_type); 75 bpf_probe_read_user_str(&msg.msg_type, sizeof(msg.msg_type), pmsg_type); 76 bpf_usdt_readarg(5, ctx, &msg.msg_size); 77 78 outbound_messages.perf_submit(ctx, &msg, sizeof(msg)); 79 return 0; 80 }; 81 """ 82 83 84 class Message: 85 """ A P2P network message. """ 86 msg_type = "" 87 size = 0 88 data = bytes() 89 inbound = False 90 91 def __init__(self, msg_type, size, inbound): 92 self.msg_type = msg_type 93 self.size = size 94 self.inbound = inbound 95 96 97 class Peer: 98 """ A P2P network peer. """ 99 id = 0 100 address = "" 101 connection_type = "" 102 last_messages = list() 103 104 total_inbound_msgs = 0 105 total_inbound_bytes = 0 106 total_outbound_msgs = 0 107 total_outbound_bytes = 0 108 109 def __init__(self, id, address, connection_type): 110 self.id = id 111 self.address = address 112 self.connection_type = connection_type 113 self.last_messages = list() 114 115 def add_message(self, message): 116 self.last_messages.append(message) 117 if len(self.last_messages) > 25: 118 self.last_messages.pop(0) 119 if message.inbound: 120 self.total_inbound_bytes += message.size 121 self.total_inbound_msgs += 1 122 else: 123 self.total_outbound_bytes += message.size 124 self.total_outbound_msgs += 1 125 126 127 def main(pid): 128 peers = dict() 129 print(f"Hooking into bitcoind with pid {pid}") 130 bitcoind_with_usdts = USDT(pid=int(pid)) 131 132 # attaching the trace functions defined in the BPF program to the tracepoints 133 bitcoind_with_usdts.enable_probe( 134 probe="inbound_message", fn_name="trace_inbound_message") 135 bitcoind_with_usdts.enable_probe( 136 probe="outbound_message", fn_name="trace_outbound_message") 137 bpf = BPF(text=program, usdt_contexts=[bitcoind_with_usdts]) 138 139 # BCC: perf buffer handle function for inbound_messages 140 def handle_inbound(_, data, size): 141 """ Inbound message handler. 142 143 Called each time a message is submitted to the inbound_messages BPF table.""" 144 event = bpf["inbound_messages"].event(data) 145 if event.peer_id not in peers: 146 peer = Peer(event.peer_id, event.peer_addr.decode( 147 "utf-8"), event.peer_conn_type.decode("utf-8")) 148 peers[peer.id] = peer 149 peers[event.peer_id].add_message( 150 Message(event.msg_type.decode("utf-8"), event.msg_size, True)) 151 152 # BCC: perf buffer handle function for outbound_messages 153 def handle_outbound(_, data, size): 154 """ Outbound message handler. 155 156 Called each time a message is submitted to the outbound_messages BPF table.""" 157 event = bpf["outbound_messages"].event(data) 158 if event.peer_id not in peers: 159 peer = Peer(event.peer_id, event.peer_addr.decode( 160 "utf-8"), event.peer_conn_type.decode("utf-8")) 161 peers[peer.id] = peer 162 peers[event.peer_id].add_message( 163 Message(event.msg_type.decode("utf-8"), event.msg_size, False)) 164 165 # BCC: add handlers to the inbound and outbound perf buffers 166 bpf["inbound_messages"].open_perf_buffer(handle_inbound) 167 bpf["outbound_messages"].open_perf_buffer(handle_outbound) 168 169 wrapper(loop, bpf, peers) 170 171 172 def loop(screen, bpf, peers): 173 screen.nodelay(1) 174 cur_list_pos = 0 175 win = curses.newwin(30, 70, 2, 7) 176 win.erase() 177 win.border(ord("|"), ord("|"), ord("-"), ord("-"), 178 ord("-"), ord("-"), ord("-"), ord("-")) 179 info_panel = panel.new_panel(win) 180 info_panel.hide() 181 182 ROWS_AVAILABLE_FOR_LIST = curses.LINES - 5 183 scroll = 0 184 185 while True: 186 try: 187 # BCC: poll the perf buffers for new events or timeout after 50ms 188 bpf.perf_buffer_poll(timeout=50) 189 190 ch = screen.getch() 191 if (ch == curses.KEY_DOWN or ch == ord("j")) and cur_list_pos < len( 192 peers.keys()) -1 and info_panel.hidden(): 193 cur_list_pos += 1 194 if cur_list_pos >= ROWS_AVAILABLE_FOR_LIST: 195 scroll += 1 196 if (ch == curses.KEY_UP or ch == ord("k")) and cur_list_pos > 0 and info_panel.hidden(): 197 cur_list_pos -= 1 198 if scroll > 0: 199 scroll -= 1 200 if ch == ord('\n') or ch == ord(' '): 201 if info_panel.hidden(): 202 info_panel.show() 203 else: 204 info_panel.hide() 205 screen.erase() 206 render(screen, peers, cur_list_pos, scroll, ROWS_AVAILABLE_FOR_LIST, info_panel) 207 curses.panel.update_panels() 208 screen.refresh() 209 except KeyboardInterrupt: 210 exit() 211 212 213 def render(screen, peers, cur_list_pos, scroll, ROWS_AVAILABLE_FOR_LIST, info_panel): 214 """ renders the list of peers and details panel 215 216 This code is unrelated to USDT, BCC and BPF. 217 """ 218 header_format = "%6s %-20s %-20s %-22s %-67s" 219 row_format = "%6s %-5d %9d byte %-5d %9d byte %-22s %-67s" 220 221 screen.addstr(0, 1, (" P2P Message Monitor "), curses.A_REVERSE) 222 screen.addstr( 223 1, 0, (" Navigate with UP/DOWN or J/K and select a peer with ENTER or SPACE to see individual P2P messages"), curses.A_NORMAL) 224 screen.addstr(3, 0, 225 header_format % ("PEER", "OUTBOUND", "INBOUND", "TYPE", "ADDR"), curses.A_BOLD | curses.A_UNDERLINE) 226 peer_list = sorted(peers.keys())[scroll:ROWS_AVAILABLE_FOR_LIST+scroll] 227 for i, peer_id in enumerate(peer_list): 228 peer = peers[peer_id] 229 screen.addstr(i + 4, 0, 230 row_format % (peer.id, peer.total_outbound_msgs, peer.total_outbound_bytes, 231 peer.total_inbound_msgs, peer.total_inbound_bytes, 232 peer.connection_type, peer.address), 233 curses.A_REVERSE if i + scroll == cur_list_pos else curses.A_NORMAL) 234 if i + scroll == cur_list_pos: 235 info_window = info_panel.window() 236 info_window.erase() 237 info_window.border( 238 ord("|"), ord("|"), ord("-"), ord("-"), 239 ord("-"), ord("-"), ord("-"), ord("-")) 240 241 info_window.addstr( 242 1, 1, f"PEER {peer.id} ({peer.address})".center(68), curses.A_REVERSE | curses.A_BOLD) 243 info_window.addstr( 244 2, 1, f" OUR NODE{peer.connection_type:^54}PEER ", 245 curses.A_BOLD) 246 for i, msg in enumerate(peer.last_messages): 247 if msg.inbound: 248 info_window.addstr( 249 i + 3, 1, "%68s" % 250 (f"<--- {msg.msg_type} ({msg.size} bytes) "), curses.A_NORMAL) 251 else: 252 info_window.addstr( 253 i + 3, 1, " %s (%d byte) --->" % 254 (msg.msg_type, msg.size), curses.A_NORMAL) 255 256 257 def running_as_root(): 258 return os.getuid() == 0 259 260 if __name__ == "__main__": 261 if len(sys.argv) != 2: 262 print("USAGE:", sys.argv[0], "<pid of bitcoind>") 263 exit() 264 if not running_as_root(): 265 print("You might not have the privileges required to hook into the tracepoints!") 266 pid = sys.argv[1] 267 main(pid)