/ test / functional / test_framework / netutil.py
netutil.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2014-2022 The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Linux network utilities.
  6  
  7  Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
  8  """
  9  
 10  import sys
 11  import socket
 12  import struct
 13  import array
 14  import os
 15  
 16  # STATE_ESTABLISHED = '01'
 17  # STATE_SYN_SENT  = '02'
 18  # STATE_SYN_RECV = '03'
 19  # STATE_FIN_WAIT1 = '04'
 20  # STATE_FIN_WAIT2 = '05'
 21  # STATE_TIME_WAIT = '06'
 22  # STATE_CLOSE = '07'
 23  # STATE_CLOSE_WAIT = '08'
 24  # STATE_LAST_ACK = '09'
 25  STATE_LISTEN = '0A'
 26  # STATE_CLOSING = '0B'
 27  
 28  # Address manager size constants as defined in addrman_impl.h
 29  ADDRMAN_NEW_BUCKET_COUNT = 1 << 10
 30  ADDRMAN_TRIED_BUCKET_COUNT = 1 << 8
 31  ADDRMAN_BUCKET_SIZE = 1 << 6
 32  
 33  def get_socket_inodes(pid):
 34      '''
 35      Get list of socket inodes for process pid.
 36      '''
 37      base = '/proc/%i/fd' % pid
 38      inodes = []
 39      for item in os.listdir(base):
 40          target = os.readlink(os.path.join(base, item))
 41          if target.startswith('socket:'):
 42              inodes.append(int(target[8:-1]))
 43      return inodes
 44  
 45  def _remove_empty(array):
 46      return [x for x in array if x !='']
 47  
 48  def _convert_ip_port(array):
 49      host,port = array.split(':')
 50      # convert host from mangled-per-four-bytes form as used by kernel
 51      host = bytes.fromhex(host)
 52      host_out = ''
 53      for x in range(0, len(host) // 4):
 54          (val,) = struct.unpack('=I', host[x*4:(x+1)*4])
 55          host_out += '%08x' % val
 56  
 57      return host_out,int(port,16)
 58  
 59  def netstat(typ='tcp'):
 60      '''
 61      Function to return a list with status of tcp connections at linux systems
 62      To get pid of all network process running on system, you must run this script
 63      as superuser
 64      '''
 65      with open('/proc/net/'+typ,'r',encoding='utf8') as f:
 66          content = f.readlines()
 67          content.pop(0)
 68      result = []
 69      for line in content:
 70          line_array = _remove_empty(line.split(' '))     # Split lines and remove empty spaces.
 71          tcp_id = line_array[0]
 72          l_addr = _convert_ip_port(line_array[1])
 73          r_addr = _convert_ip_port(line_array[2])
 74          state = line_array[3]
 75          inode = int(line_array[9])                      # Need the inode to match with process pid.
 76          nline = [tcp_id, l_addr, r_addr, state, inode]
 77          result.append(nline)
 78      return result
 79  
 80  def get_bind_addrs(pid):
 81      '''
 82      Get bind addresses as (host,port) tuples for process pid.
 83      '''
 84      inodes = get_socket_inodes(pid)
 85      bind_addrs = []
 86      for conn in netstat('tcp') + netstat('tcp6'):
 87          if conn[3] == STATE_LISTEN and conn[4] in inodes:
 88              bind_addrs.append(conn[1])
 89      return bind_addrs
 90  
 91  # from: https://code.activestate.com/recipes/439093/
 92  def all_interfaces():
 93      '''
 94      Return all interfaces that are up
 95      '''
 96      import fcntl  # Linux only, so only import when required
 97  
 98      is_64bits = sys.maxsize > 2**32
 99      struct_size = 40 if is_64bits else 32
100      s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
101      max_possible = 8 # initial value
102      while True:
103          bytes = max_possible * struct_size
104          names = array.array('B', b'\0' * bytes)
105          outbytes = struct.unpack('iL', fcntl.ioctl(
106              s.fileno(),
107              0x8912,  # SIOCGIFCONF
108              struct.pack('iL', bytes, names.buffer_info()[0])
109          ))[0]
110          if outbytes == bytes:
111              max_possible *= 2
112          else:
113              break
114      namestr = names.tobytes()
115      return [(namestr[i:i+16].split(b'\0', 1)[0],
116               socket.inet_ntoa(namestr[i+20:i+24]))
117              for i in range(0, outbytes, struct_size)]
118  
119  def addr_to_hex(addr):
120      '''
121      Convert string IPv4 or IPv6 address to binary address as returned by
122      get_bind_addrs.
123      Very naive implementation that certainly doesn't work for all IPv6 variants.
124      '''
125      if '.' in addr: # IPv4
126          addr = [int(x) for x in addr.split('.')]
127      elif ':' in addr: # IPv6
128          sub = [[], []] # prefix, suffix
129          x = 0
130          addr = addr.split(':')
131          for i,comp in enumerate(addr):
132              if comp == '':
133                  if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
134                      continue
135                  x += 1 # :: skips to suffix
136                  assert x < 2
137              else: # two bytes per component
138                  val = int(comp, 16)
139                  sub[x].append(val >> 8)
140                  sub[x].append(val & 0xff)
141          nullbytes = 16 - len(sub[0]) - len(sub[1])
142          assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0)
143          addr = sub[0] + ([0] * nullbytes) + sub[1]
144      else:
145          raise ValueError('Could not parse address %s' % addr)
146      return bytearray(addr).hex()
147  
148  def test_ipv6_local():
149      '''
150      Check for (local) IPv6 support.
151      '''
152      # By using SOCK_DGRAM this will not actually make a connection, but it will
153      # fail if there is no route to IPv6 localhost.
154      have_ipv6 = True
155      try:
156          s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
157          s.connect(('::1', 1))
158      except socket.error:
159          have_ipv6 = False
160      return have_ipv6
161  
162  def test_unix_socket():
163      '''Return True if UNIX sockets are available on this platform.'''
164      try:
165          socket.AF_UNIX
166      except AttributeError:
167          return False
168      else:
169          return True