/ test / functional / test_framework / netutil.py
netutil.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2014-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Linux network utilities.
  6  
  7  Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
  8  """
  9  
 10  import sys
 11  import socket
 12  import struct
 13  import array
 14  import os
 15  
 16  # Easily unreachable address. Attempts to connect to it will stay within the machine.
 17  # Used to avoid non-loopback traffic or DNS queries.
 18  UNREACHABLE_PROXY_ARG = '-proxy=127.0.0.1:1'
 19  
 20  # STATE_ESTABLISHED = '01'
 21  # STATE_SYN_SENT  = '02'
 22  # STATE_SYN_RECV = '03'
 23  # STATE_FIN_WAIT1 = '04'
 24  # STATE_FIN_WAIT2 = '05'
 25  # STATE_TIME_WAIT = '06'
 26  # STATE_CLOSE = '07'
 27  # STATE_CLOSE_WAIT = '08'
 28  # STATE_LAST_ACK = '09'
 29  STATE_LISTEN = '0A'
 30  # STATE_CLOSING = '0B'
 31  
 32  # Address manager size constants as defined in addrman_impl.h
 33  ADDRMAN_NEW_BUCKET_COUNT = 1 << 10
 34  ADDRMAN_TRIED_BUCKET_COUNT = 1 << 8
 35  ADDRMAN_BUCKET_SIZE = 1 << 6
 36  
 37  def get_socket_inodes(pid):
 38      '''
 39      Get list of socket inodes for process pid.
 40      '''
 41      base = '/proc/%i/fd' % pid
 42      inodes = []
 43      for item in os.listdir(base):
 44          try:
 45              target = os.readlink(os.path.join(base, item))
 46              if target.startswith('socket:'):
 47                  inodes.append(int(target[8:-1]))
 48          except FileNotFoundError:
 49              pass
 50      return inodes
 51  
 52  def _remove_empty(array):
 53      return [x for x in array if x !='']
 54  
 55  def _convert_ip_port(array):
 56      host,port = array.split(':')
 57      # convert host from mangled-per-four-bytes form as used by kernel
 58      host = bytes.fromhex(host)
 59      host_out = ''
 60      for x in range(0, len(host) // 4):
 61          (val,) = struct.unpack('=I', host[x*4:(x+1)*4])
 62          host_out += '%08x' % val
 63  
 64      return host_out,int(port,16)
 65  
 66  def netstat(typ='tcp'):
 67      '''
 68      Function to return a list with status of tcp connections at linux systems
 69      To get pid of all network process running on system, you must run this script
 70      as superuser
 71      '''
 72      with open('/proc/net/'+typ,'r') as f:
 73          content = f.readlines()
 74          content.pop(0)
 75      result = []
 76      for line in content:
 77          line_array = _remove_empty(line.split(' '))     # Split lines and remove empty spaces.
 78          tcp_id = line_array[0]
 79          l_addr = _convert_ip_port(line_array[1])
 80          r_addr = _convert_ip_port(line_array[2])
 81          state = line_array[3]
 82          inode = int(line_array[9])                      # Need the inode to match with process pid.
 83          nline = [tcp_id, l_addr, r_addr, state, inode]
 84          result.append(nline)
 85      return result
 86  
 87  def get_bind_addrs(pid):
 88      '''
 89      Get bind addresses as (host,port) tuples for process pid.
 90      '''
 91      inodes = get_socket_inodes(pid)
 92      bind_addrs = []
 93      for conn in netstat('tcp') + netstat('tcp6'):
 94          if conn[3] == STATE_LISTEN and conn[4] in inodes:
 95              bind_addrs.append(conn[1])
 96      return bind_addrs
 97  
 98  # from: https://code.activestate.com/recipes/439093/
 99  def all_interfaces():
100      '''
101      Return all interfaces that are up
102      '''
103      import fcntl  # Linux only, so only import when required
104  
105      is_64bits = sys.maxsize > 2**32
106      struct_size = 40 if is_64bits else 32
107      s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
108      max_possible = 8 # initial value
109      while True:
110          bytes = max_possible * struct_size
111          names = array.array('B', b'\0' * bytes)
112          outbytes = struct.unpack('iL', fcntl.ioctl(
113              s.fileno(),
114              0x8912,  # SIOCGIFCONF
115              struct.pack('iL', bytes, names.buffer_info()[0])
116          ))[0]
117          if outbytes == bytes:
118              max_possible *= 2
119          else:
120              break
121      namestr = names.tobytes()
122      return [(namestr[i:i+16].split(b'\0', 1)[0],
123               socket.inet_ntoa(namestr[i+20:i+24]))
124              for i in range(0, outbytes, struct_size)]
125  
126  def addr_to_hex(addr):
127      '''
128      Convert string IPv4 or IPv6 address to binary address as returned by
129      get_bind_addrs.
130      Very naive implementation that certainly doesn't work for all IPv6 variants.
131      '''
132      if '.' in addr: # IPv4
133          addr = [int(x) for x in addr.split('.')]
134      elif ':' in addr: # IPv6
135          sub = [[], []] # prefix, suffix
136          x = 0
137          addr = addr.split(':')
138          for i,comp in enumerate(addr):
139              if comp == '':
140                  if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
141                      continue
142                  x += 1 # :: skips to suffix
143                  assert x < 2
144              else: # two bytes per component
145                  val = int(comp, 16)
146                  sub[x].append(val >> 8)
147                  sub[x].append(val & 0xff)
148          nullbytes = 16 - len(sub[0]) - len(sub[1])
149          assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0)
150          addr = sub[0] + ([0] * nullbytes) + sub[1]
151      else:
152          raise ValueError('Could not parse address %s' % addr)
153      return bytearray(addr).hex()
154  
155  def test_ipv6_local():
156      '''
157      Check for (local) IPv6 support.
158      '''
159      # By using SOCK_DGRAM this will not actually make a connection, but it will
160      # fail if there is no route to IPv6 localhost.
161      have_ipv6 = True
162      try:
163          s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
164          s.connect(('::1', 1))
165      except socket.error:
166          have_ipv6 = False
167      return have_ipv6
168  
169  def test_unix_socket():
170      '''Return True if UNIX sockets are available on this platform.'''
171      try:
172          socket.AF_UNIX
173      except AttributeError:
174          return False
175      else:
176          return True
177  
178  def format_addr_port(addr, port):
179      '''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.'''
180      if ":" in addr:
181          return f"[{addr}]:{port}"
182      else:
183          return f"{addr}:{port}"
184  
185  
186  def set_ephemeral_port_range(sock):
187      '''On FreeBSD, set socket to use the high ephemeral port range (49152-65535).
188  
189      FreeBSD's default ephemeral port range (10000-65535) overlaps with the test
190      framework's static port range starting at TEST_RUNNER_PORT_MIN (default=11000).
191      Using IP_PORTRANGE_HIGH avoids this overlap when binding to port 0 for dynamic
192      port allocation.
193      '''
194      if sys.platform.startswith('freebsd'):
195          # Constants from FreeBSD's netinet/in.h and netinet6/in6.h
196          IP_PORTRANGE = 19
197          IPV6_PORTRANGE = 14
198          IP_PORTRANGE_HIGH = 1  # Same value for both IPv4 and IPv6
199          if sock.family == socket.AF_INET6:
200              sock.setsockopt(socket.IPPROTO_IPV6, IPV6_PORTRANGE, IP_PORTRANGE_HIGH)
201          else:
202              sock.setsockopt(socket.IPPROTO_IP, IP_PORTRANGE, IP_PORTRANGE_HIGH)