netutil.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2014-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Linux network utilities. 6 7 Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal 8 """ 9 10 import sys 11 import socket 12 import struct 13 import array 14 import os 15 16 # Easily unreachable address. Attempts to connect to it will stay within the machine. 17 # Used to avoid non-loopback traffic or DNS queries. 18 UNREACHABLE_PROXY_ARG = '-proxy=127.0.0.1:1' 19 20 # STATE_ESTABLISHED = '01' 21 # STATE_SYN_SENT = '02' 22 # STATE_SYN_RECV = '03' 23 # STATE_FIN_WAIT1 = '04' 24 # STATE_FIN_WAIT2 = '05' 25 # STATE_TIME_WAIT = '06' 26 # STATE_CLOSE = '07' 27 # STATE_CLOSE_WAIT = '08' 28 # STATE_LAST_ACK = '09' 29 STATE_LISTEN = '0A' 30 # STATE_CLOSING = '0B' 31 32 # Address manager size constants as defined in addrman_impl.h 33 ADDRMAN_NEW_BUCKET_COUNT = 1 << 10 34 ADDRMAN_TRIED_BUCKET_COUNT = 1 << 8 35 ADDRMAN_BUCKET_SIZE = 1 << 6 36 37 def get_socket_inodes(pid): 38 ''' 39 Get list of socket inodes for process pid. 40 ''' 41 base = '/proc/%i/fd' % pid 42 inodes = [] 43 for item in os.listdir(base): 44 try: 45 target = os.readlink(os.path.join(base, item)) 46 if target.startswith('socket:'): 47 inodes.append(int(target[8:-1])) 48 except FileNotFoundError: 49 pass 50 return inodes 51 52 def _remove_empty(array): 53 return [x for x in array if x !=''] 54 55 def _convert_ip_port(array): 56 host,port = array.split(':') 57 # convert host from mangled-per-four-bytes form as used by kernel 58 host = bytes.fromhex(host) 59 host_out = '' 60 for x in range(0, len(host) // 4): 61 (val,) = struct.unpack('=I', host[x*4:(x+1)*4]) 62 host_out += '%08x' % val 63 64 return host_out,int(port,16) 65 66 def netstat(typ='tcp'): 67 ''' 68 Function to return a list with status of tcp connections at linux systems 69 To get pid of all network process running on system, you must run this script 70 as superuser 71 ''' 72 with open('/proc/net/'+typ,'r') as f: 73 content = f.readlines() 74 content.pop(0) 75 result = [] 76 for line in content: 77 line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces. 78 tcp_id = line_array[0] 79 l_addr = _convert_ip_port(line_array[1]) 80 r_addr = _convert_ip_port(line_array[2]) 81 state = line_array[3] 82 inode = int(line_array[9]) # Need the inode to match with process pid. 83 nline = [tcp_id, l_addr, r_addr, state, inode] 84 result.append(nline) 85 return result 86 87 def get_bind_addrs(pid): 88 ''' 89 Get bind addresses as (host,port) tuples for process pid. 90 ''' 91 inodes = get_socket_inodes(pid) 92 bind_addrs = [] 93 for conn in netstat('tcp') + netstat('tcp6'): 94 if conn[3] == STATE_LISTEN and conn[4] in inodes: 95 bind_addrs.append(conn[1]) 96 return bind_addrs 97 98 # from: https://code.activestate.com/recipes/439093/ 99 def all_interfaces(): 100 ''' 101 Return all interfaces that are up 102 ''' 103 import fcntl # Linux only, so only import when required 104 105 is_64bits = sys.maxsize > 2**32 106 struct_size = 40 if is_64bits else 32 107 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 108 max_possible = 8 # initial value 109 while True: 110 bytes = max_possible * struct_size 111 names = array.array('B', b'\0' * bytes) 112 outbytes = struct.unpack('iL', fcntl.ioctl( 113 s.fileno(), 114 0x8912, # SIOCGIFCONF 115 struct.pack('iL', bytes, names.buffer_info()[0]) 116 ))[0] 117 if outbytes == bytes: 118 max_possible *= 2 119 else: 120 break 121 namestr = names.tobytes() 122 return [(namestr[i:i+16].split(b'\0', 1)[0], 123 socket.inet_ntoa(namestr[i+20:i+24])) 124 for i in range(0, outbytes, struct_size)] 125 126 def addr_to_hex(addr): 127 ''' 128 Convert string IPv4 or IPv6 address to binary address as returned by 129 get_bind_addrs. 130 Very naive implementation that certainly doesn't work for all IPv6 variants. 131 ''' 132 if '.' in addr: # IPv4 133 addr = [int(x) for x in addr.split('.')] 134 elif ':' in addr: # IPv6 135 sub = [[], []] # prefix, suffix 136 x = 0 137 addr = addr.split(':') 138 for i,comp in enumerate(addr): 139 if comp == '': 140 if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end 141 continue 142 x += 1 # :: skips to suffix 143 assert x < 2 144 else: # two bytes per component 145 val = int(comp, 16) 146 sub[x].append(val >> 8) 147 sub[x].append(val & 0xff) 148 nullbytes = 16 - len(sub[0]) - len(sub[1]) 149 assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0) 150 addr = sub[0] + ([0] * nullbytes) + sub[1] 151 else: 152 raise ValueError('Could not parse address %s' % addr) 153 return bytearray(addr).hex() 154 155 def test_ipv6_local(): 156 ''' 157 Check for (local) IPv6 support. 158 ''' 159 # By using SOCK_DGRAM this will not actually make a connection, but it will 160 # fail if there is no route to IPv6 localhost. 161 have_ipv6 = True 162 try: 163 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 164 s.connect(('::1', 1)) 165 except socket.error: 166 have_ipv6 = False 167 return have_ipv6 168 169 def test_unix_socket(): 170 '''Return True if UNIX sockets are available on this platform.''' 171 try: 172 socket.AF_UNIX 173 except AttributeError: 174 return False 175 else: 176 return True 177 178 def format_addr_port(addr, port): 179 '''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.''' 180 if ":" in addr: 181 return f"[{addr}]:{port}" 182 else: 183 return f"{addr}:{port}"