netutil.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2014-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Linux, macOS, and BSD network utilities. 6 7 Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal 8 """ 9 10 import sys 11 import socket 12 import struct 13 import array 14 import os 15 16 # Easily unreachable address. Attempts to connect to it will stay within the machine. 17 # Used to avoid non-loopback traffic or DNS queries. 18 UNREACHABLE_PROXY_ARG = '-proxy=127.0.0.1:1' 19 20 # STATE_ESTABLISHED = '01' 21 # STATE_SYN_SENT = '02' 22 # STATE_SYN_RECV = '03' 23 # STATE_FIN_WAIT1 = '04' 24 # STATE_FIN_WAIT2 = '05' 25 # STATE_TIME_WAIT = '06' 26 # STATE_CLOSE = '07' 27 # STATE_CLOSE_WAIT = '08' 28 # STATE_LAST_ACK = '09' 29 STATE_LISTEN = '0A' 30 # STATE_CLOSING = '0B' 31 32 # Address manager size constants as defined in addrman_impl.h 33 ADDRMAN_NEW_BUCKET_COUNT = 1 << 10 34 ADDRMAN_TRIED_BUCKET_COUNT = 1 << 8 35 ADDRMAN_BUCKET_SIZE = 1 << 6 36 37 def get_socket_inodes(pid): 38 ''' 39 Get list of socket inodes for process pid. 40 ''' 41 base = '/proc/%i/fd' % pid 42 inodes = [] 43 for item in os.listdir(base): 44 try: 45 target = os.readlink(os.path.join(base, item)) 46 if target.startswith('socket:'): 47 inodes.append(int(target[8:-1])) 48 except FileNotFoundError: 49 pass 50 return inodes 51 52 def _remove_empty(array): 53 return [x for x in array if x !=''] 54 55 def _convert_ip_port(array): 56 host,port = array.split(':') 57 # convert host from mangled-per-four-bytes form as used by kernel 58 host = bytes.fromhex(host) 59 host_out = '' 60 for x in range(0, len(host) // 4): 61 (val,) = struct.unpack('=I', host[x*4:(x+1)*4]) 62 host_out += '%08x' % val 63 64 return host_out,int(port,16) 65 66 def netstat(typ='tcp'): 67 ''' 68 Function to return a list with status of tcp connections at linux systems 69 To get pid of all network process running on system, you must run this script 70 as superuser 71 ''' 72 with open('/proc/net/'+typ,'r') as f: 73 content = f.readlines() 74 content.pop(0) 75 result = [] 76 for line in content: 77 line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces. 78 tcp_id = line_array[0] 79 l_addr = _convert_ip_port(line_array[1]) 80 r_addr = _convert_ip_port(line_array[2]) 81 state = line_array[3] 82 inode = int(line_array[9]) # Need the inode to match with process pid. 83 nline = [tcp_id, l_addr, r_addr, state, inode] 84 result.append(nline) 85 return result 86 87 def get_bind_addrs(pid): 88 ''' 89 Get bind addresses as (host,port) tuples for process pid. 90 ''' 91 if sys.platform == 'linux': 92 inodes = get_socket_inodes(pid) 93 bind_addrs = [] 94 for conn in netstat('tcp') + netstat('tcp6'): 95 if conn[3] == STATE_LISTEN and conn[4] in inodes: 96 bind_addrs.append(conn[1]) 97 return bind_addrs 98 elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")): 99 import re 100 import subprocess 101 output = subprocess.check_output(["lsof", 102 *(["-Di"] if sys.platform.startswith("freebsd") else []), # Ignore device cache to avoid stderr warnings. 103 "-nP", # Keep hosts and ports numeric. 104 "-a", # Require all filters to match. 105 "-p", str(pid), # Limit results to the target pid. 106 "-iTCP", # Only inspect TCP sockets. 107 "-sTCP:LISTEN", # Only keep listening sockets. 108 "-Ftn", # Emit machine-readable type and name fields. 109 ], text=True) 110 return [ 111 (addr_to_hex(("::" if sock_type == "IPv6" else "0.0.0.0") if host == "*" else host.strip("[]")), int(port)) 112 for sock_type, host, port in re.findall(r"t(IPv[46])\nn(\*|\[.+?]|[^:]+):(\d+)", output) 113 ] 114 else: 115 raise NotImplementedError(f"get_bind_addrs is not supported on {sys.platform}") 116 117 def all_interfaces(): 118 ''' 119 Return all IPv4 interfaces that are up. 120 ''' 121 if sys.platform == 'linux': 122 import fcntl # Linux only, so only import when required 123 124 is_64bits = sys.maxsize > 2**32 125 struct_size = 40 if is_64bits else 32 126 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 127 max_possible = 8 # initial value 128 while True: 129 bytes = max_possible * struct_size 130 names = array.array('B', b'\0' * bytes) 131 outbytes = struct.unpack('iL', fcntl.ioctl( 132 s.fileno(), 133 0x8912, # SIOCGIFCONF 134 struct.pack('iL', bytes, names.buffer_info()[0]) 135 ))[0] 136 if outbytes == bytes: 137 max_possible *= 2 138 else: 139 break 140 namestr = names.tobytes() 141 return [(namestr[i:i+16].split(b'\0', 1)[0], 142 socket.inet_ntoa(namestr[i+20:i+24])) 143 for i in range(0, outbytes, struct_size)] 144 elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")): 145 import re 146 import subprocess 147 output = subprocess.check_output(["ifconfig", "-au"], text=True) 148 return [ 149 (m["iface"].encode(), ip) 150 for m in re.finditer(r"(?m)^(?P<iface>\S+):(?P<block>[^\n]*(?:\n[ \t]+[^\n]*)*)", output) 151 for ip in re.findall(r"inet (\S+)", m["block"]) 152 ] 153 else: 154 raise NotImplementedError(f"all_interfaces is not supported on {sys.platform}") 155 156 def addr_to_hex(addr): 157 ''' 158 Convert string IPv4 or IPv6 address to binary address as returned by 159 get_bind_addrs. 160 Very naive implementation that certainly doesn't work for all IPv6 variants. 161 ''' 162 if '.' in addr: # IPv4 163 addr = [int(x) for x in addr.split('.')] 164 elif ':' in addr: # IPv6 165 sub = [[], []] # prefix, suffix 166 x = 0 167 addr = addr.split(':') 168 for i,comp in enumerate(addr): 169 if comp == '': 170 if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end 171 continue 172 x += 1 # :: skips to suffix 173 assert x < 2 174 else: # two bytes per component 175 val = int(comp, 16) 176 sub[x].append(val >> 8) 177 sub[x].append(val & 0xff) 178 nullbytes = 16 - len(sub[0]) - len(sub[1]) 179 assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0) 180 addr = sub[0] + ([0] * nullbytes) + sub[1] 181 else: 182 raise ValueError('Could not parse address %s' % addr) 183 return bytearray(addr).hex() 184 185 def test_ipv6_local(): 186 ''' 187 Check for (local) IPv6 support. 188 ''' 189 # By using SOCK_DGRAM this will not actually make a connection, but it will 190 # fail if there is no route to IPv6 localhost. 191 have_ipv6 = True 192 try: 193 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 194 s.connect(('::1', 1)) 195 except socket.error: 196 have_ipv6 = False 197 return have_ipv6 198 199 def test_unix_socket(): 200 '''Return True if UNIX sockets are available on this platform.''' 201 try: 202 socket.AF_UNIX 203 except AttributeError: 204 return False 205 else: 206 return True 207 208 def format_addr_port(addr, port): 209 '''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.''' 210 if ":" in addr: 211 return f"[{addr}]:{port}" 212 else: 213 return f"{addr}:{port}" 214 215 216 def set_ephemeral_port_range(sock): 217 '''On FreeBSD, set socket to use the high ephemeral port range (49152-65535). 218 219 FreeBSD's default ephemeral port range (10000-65535) overlaps with the test 220 framework's static port range starting at TEST_RUNNER_PORT_MIN (default=11000). 221 Using IP_PORTRANGE_HIGH avoids this overlap when binding to port 0 for dynamic 222 port allocation. 223 ''' 224 if sys.platform.startswith('freebsd'): 225 # Constants from FreeBSD's netinet/in.h and netinet6/in6.h 226 IP_PORTRANGE = 19 227 IPV6_PORTRANGE = 14 228 IP_PORTRANGE_HIGH = 1 # Same value for both IPv4 and IPv6 229 if sock.family == socket.AF_INET6: 230 sock.setsockopt(socket.IPPROTO_IPV6, IPV6_PORTRANGE, IP_PORTRANGE_HIGH) 231 else: 232 sock.setsockopt(socket.IPPROTO_IP, IP_PORTRANGE, IP_PORTRANGE_HIGH)