netutil.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2014-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Linux network utilities. 6 7 Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal 8 """ 9 10 import sys 11 import socket 12 import struct 13 import array 14 import os 15 16 # STATE_ESTABLISHED = '01' 17 # STATE_SYN_SENT = '02' 18 # STATE_SYN_RECV = '03' 19 # STATE_FIN_WAIT1 = '04' 20 # STATE_FIN_WAIT2 = '05' 21 # STATE_TIME_WAIT = '06' 22 # STATE_CLOSE = '07' 23 # STATE_CLOSE_WAIT = '08' 24 # STATE_LAST_ACK = '09' 25 STATE_LISTEN = '0A' 26 # STATE_CLOSING = '0B' 27 28 # Address manager size constants as defined in addrman_impl.h 29 ADDRMAN_NEW_BUCKET_COUNT = 1 << 10 30 ADDRMAN_TRIED_BUCKET_COUNT = 1 << 8 31 ADDRMAN_BUCKET_SIZE = 1 << 6 32 33 def get_socket_inodes(pid): 34 ''' 35 Get list of socket inodes for process pid. 36 ''' 37 base = '/proc/%i/fd' % pid 38 inodes = [] 39 for item in os.listdir(base): 40 target = os.readlink(os.path.join(base, item)) 41 if target.startswith('socket:'): 42 inodes.append(int(target[8:-1])) 43 return inodes 44 45 def _remove_empty(array): 46 return [x for x in array if x !=''] 47 48 def _convert_ip_port(array): 49 host,port = array.split(':') 50 # convert host from mangled-per-four-bytes form as used by kernel 51 host = bytes.fromhex(host) 52 host_out = '' 53 for x in range(0, len(host) // 4): 54 (val,) = struct.unpack('=I', host[x*4:(x+1)*4]) 55 host_out += '%08x' % val 56 57 return host_out,int(port,16) 58 59 def netstat(typ='tcp'): 60 ''' 61 Function to return a list with status of tcp connections at linux systems 62 To get pid of all network process running on system, you must run this script 63 as superuser 64 ''' 65 with open('/proc/net/'+typ,'r',encoding='utf8') as f: 66 content = f.readlines() 67 content.pop(0) 68 result = [] 69 for line in content: 70 line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces. 71 tcp_id = line_array[0] 72 l_addr = _convert_ip_port(line_array[1]) 73 r_addr = _convert_ip_port(line_array[2]) 74 state = line_array[3] 75 inode = int(line_array[9]) # Need the inode to match with process pid. 76 nline = [tcp_id, l_addr, r_addr, state, inode] 77 result.append(nline) 78 return result 79 80 def get_bind_addrs(pid): 81 ''' 82 Get bind addresses as (host,port) tuples for process pid. 83 ''' 84 inodes = get_socket_inodes(pid) 85 bind_addrs = [] 86 for conn in netstat('tcp') + netstat('tcp6'): 87 if conn[3] == STATE_LISTEN and conn[4] in inodes: 88 bind_addrs.append(conn[1]) 89 return bind_addrs 90 91 # from: https://code.activestate.com/recipes/439093/ 92 def all_interfaces(): 93 ''' 94 Return all interfaces that are up 95 ''' 96 import fcntl # Linux only, so only import when required 97 98 is_64bits = sys.maxsize > 2**32 99 struct_size = 40 if is_64bits else 32 100 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 101 max_possible = 8 # initial value 102 while True: 103 bytes = max_possible * struct_size 104 names = array.array('B', b'\0' * bytes) 105 outbytes = struct.unpack('iL', fcntl.ioctl( 106 s.fileno(), 107 0x8912, # SIOCGIFCONF 108 struct.pack('iL', bytes, names.buffer_info()[0]) 109 ))[0] 110 if outbytes == bytes: 111 max_possible *= 2 112 else: 113 break 114 namestr = names.tobytes() 115 return [(namestr[i:i+16].split(b'\0', 1)[0], 116 socket.inet_ntoa(namestr[i+20:i+24])) 117 for i in range(0, outbytes, struct_size)] 118 119 def addr_to_hex(addr): 120 ''' 121 Convert string IPv4 or IPv6 address to binary address as returned by 122 get_bind_addrs. 123 Very naive implementation that certainly doesn't work for all IPv6 variants. 124 ''' 125 if '.' in addr: # IPv4 126 addr = [int(x) for x in addr.split('.')] 127 elif ':' in addr: # IPv6 128 sub = [[], []] # prefix, suffix 129 x = 0 130 addr = addr.split(':') 131 for i,comp in enumerate(addr): 132 if comp == '': 133 if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end 134 continue 135 x += 1 # :: skips to suffix 136 assert x < 2 137 else: # two bytes per component 138 val = int(comp, 16) 139 sub[x].append(val >> 8) 140 sub[x].append(val & 0xff) 141 nullbytes = 16 - len(sub[0]) - len(sub[1]) 142 assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0) 143 addr = sub[0] + ([0] * nullbytes) + sub[1] 144 else: 145 raise ValueError('Could not parse address %s' % addr) 146 return bytearray(addr).hex() 147 148 def test_ipv6_local(): 149 ''' 150 Check for (local) IPv6 support. 151 ''' 152 # By using SOCK_DGRAM this will not actually make a connection, but it will 153 # fail if there is no route to IPv6 localhost. 154 have_ipv6 = True 155 try: 156 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 157 s.connect(('::1', 1)) 158 except socket.error: 159 have_ipv6 = False 160 return have_ipv6 161 162 def test_unix_socket(): 163 '''Return True if UNIX sockets are available on this platform.''' 164 try: 165 socket.AF_UNIX 166 except AttributeError: 167 return False 168 else: 169 return True