/ test / functional / test_framework / netutil.py
netutil.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2014-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Linux, macOS, and BSD network utilities.
  6  
  7  Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
  8  """
  9  
 10  import sys
 11  import socket
 12  import struct
 13  import array
 14  import os
 15  
 16  # Easily unreachable address. Attempts to connect to it will stay within the machine.
 17  # Used to avoid non-loopback traffic or DNS queries.
 18  UNREACHABLE_PROXY_ARG = '-proxy=127.0.0.1:1'
 19  
 20  # STATE_ESTABLISHED = '01'
 21  # STATE_SYN_SENT  = '02'
 22  # STATE_SYN_RECV = '03'
 23  # STATE_FIN_WAIT1 = '04'
 24  # STATE_FIN_WAIT2 = '05'
 25  # STATE_TIME_WAIT = '06'
 26  # STATE_CLOSE = '07'
 27  # STATE_CLOSE_WAIT = '08'
 28  # STATE_LAST_ACK = '09'
 29  STATE_LISTEN = '0A'
 30  # STATE_CLOSING = '0B'
 31  
 32  # Address manager size constants as defined in addrman_impl.h
 33  ADDRMAN_NEW_BUCKET_COUNT = 1 << 10
 34  ADDRMAN_TRIED_BUCKET_COUNT = 1 << 8
 35  ADDRMAN_BUCKET_SIZE = 1 << 6
 36  
 37  def get_socket_inodes(pid):
 38      '''
 39      Get list of socket inodes for process pid.
 40      '''
 41      base = '/proc/%i/fd' % pid
 42      inodes = []
 43      for item in os.listdir(base):
 44          try:
 45              target = os.readlink(os.path.join(base, item))
 46              if target.startswith('socket:'):
 47                  inodes.append(int(target[8:-1]))
 48          except FileNotFoundError:
 49              pass
 50      return inodes
 51  
 52  def _remove_empty(array):
 53      return [x for x in array if x !='']
 54  
 55  def _convert_ip_port(array):
 56      host,port = array.split(':')
 57      # convert host from mangled-per-four-bytes form as used by kernel
 58      host = bytes.fromhex(host)
 59      host_out = ''
 60      for x in range(0, len(host) // 4):
 61          (val,) = struct.unpack('=I', host[x*4:(x+1)*4])
 62          host_out += '%08x' % val
 63  
 64      return host_out,int(port,16)
 65  
 66  def netstat(typ='tcp'):
 67      '''
 68      Function to return a list with status of tcp connections at linux systems
 69      To get pid of all network process running on system, you must run this script
 70      as superuser
 71      '''
 72      with open('/proc/net/'+typ,'r') as f:
 73          content = f.readlines()
 74          content.pop(0)
 75      result = []
 76      for line in content:
 77          line_array = _remove_empty(line.split(' '))     # Split lines and remove empty spaces.
 78          tcp_id = line_array[0]
 79          l_addr = _convert_ip_port(line_array[1])
 80          r_addr = _convert_ip_port(line_array[2])
 81          state = line_array[3]
 82          inode = int(line_array[9])                      # Need the inode to match with process pid.
 83          nline = [tcp_id, l_addr, r_addr, state, inode]
 84          result.append(nline)
 85      return result
 86  
 87  def get_bind_addrs(pid):
 88      '''
 89      Get bind addresses as (host,port) tuples for process pid.
 90      '''
 91      if sys.platform == 'linux':
 92          inodes = get_socket_inodes(pid)
 93          bind_addrs = []
 94          for conn in netstat('tcp') + netstat('tcp6'):
 95              if conn[3] == STATE_LISTEN and conn[4] in inodes:
 96                  bind_addrs.append(conn[1])
 97          return bind_addrs
 98      elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")):
 99          import re
100          import subprocess
101          output = subprocess.check_output(["lsof",
102              *(["-Di"] if sys.platform.startswith("freebsd") else []), # Ignore device cache to avoid stderr warnings.
103              "-nP",          # Keep hosts and ports numeric.
104              "-a",           # Require all filters to match.
105              "-p", str(pid), # Limit results to the target pid.
106              "-iTCP",        # Only inspect TCP sockets.
107              "-sTCP:LISTEN", # Only keep listening sockets.
108              "-Ftn",         # Emit machine-readable type and name fields.
109          ], text=True)
110          return [
111              (addr_to_hex(("::" if sock_type == "IPv6" else "0.0.0.0") if host == "*" else host.strip("[]")), int(port))
112              for sock_type, host, port in re.findall(r"t(IPv[46])\nn(\*|\[.+?]|[^:]+):(\d+)", output)
113          ]
114      else:
115          raise NotImplementedError(f"get_bind_addrs is not supported on {sys.platform}")
116  
117  def all_interfaces():
118      '''
119      Return all IPv4 interfaces that are up.
120      '''
121      if sys.platform == 'linux':
122          import fcntl  # Linux only, so only import when required
123  
124          is_64bits = sys.maxsize > 2**32
125          struct_size = 40 if is_64bits else 32
126          s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
127          max_possible = 8 # initial value
128          while True:
129              bytes = max_possible * struct_size
130              names = array.array('B', b'\0' * bytes)
131              outbytes = struct.unpack('iL', fcntl.ioctl(
132                  s.fileno(),
133                  0x8912,  # SIOCGIFCONF
134                  struct.pack('iL', bytes, names.buffer_info()[0])
135              ))[0]
136              if outbytes == bytes:
137                  max_possible *= 2
138              else:
139                  break
140          namestr = names.tobytes()
141          return [(namestr[i:i+16].split(b'\0', 1)[0],
142                   socket.inet_ntoa(namestr[i+20:i+24]))
143                  for i in range(0, outbytes, struct_size)]
144      elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")):
145          import re
146          import subprocess
147          output = subprocess.check_output(["ifconfig", "-au"], text=True)
148          return [
149              (m["iface"].encode(), ip)
150              for m in re.finditer(r"(?m)^(?P<iface>\S+):(?P<block>[^\n]*(?:\n[ \t]+[^\n]*)*)", output)
151              for ip in re.findall(r"inet (\S+)", m["block"])
152          ]
153      else:
154          raise NotImplementedError(f"all_interfaces is not supported on {sys.platform}")
155  
156  def addr_to_hex(addr):
157      '''
158      Convert string IPv4 or IPv6 address to binary address as returned by
159      get_bind_addrs.
160      Very naive implementation that certainly doesn't work for all IPv6 variants.
161      '''
162      if '.' in addr: # IPv4
163          addr = [int(x) for x in addr.split('.')]
164      elif ':' in addr: # IPv6
165          sub = [[], []] # prefix, suffix
166          x = 0
167          addr = addr.split(':')
168          for i,comp in enumerate(addr):
169              if comp == '':
170                  if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
171                      continue
172                  x += 1 # :: skips to suffix
173                  assert x < 2
174              else: # two bytes per component
175                  val = int(comp, 16)
176                  sub[x].append(val >> 8)
177                  sub[x].append(val & 0xff)
178          nullbytes = 16 - len(sub[0]) - len(sub[1])
179          assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0)
180          addr = sub[0] + ([0] * nullbytes) + sub[1]
181      else:
182          raise ValueError('Could not parse address %s' % addr)
183      return bytearray(addr).hex()
184  
185  def test_ipv6_local():
186      '''
187      Check for (local) IPv6 support.
188      '''
189      # By using SOCK_DGRAM this will not actually make a connection, but it will
190      # fail if there is no route to IPv6 localhost.
191      have_ipv6 = True
192      try:
193          s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
194          s.connect(('::1', 1))
195      except socket.error:
196          have_ipv6 = False
197      return have_ipv6
198  
199  def test_unix_socket():
200      '''Return True if UNIX sockets are available on this platform.'''
201      try:
202          socket.AF_UNIX
203      except AttributeError:
204          return False
205      else:
206          return True
207  
208  def format_addr_port(addr, port):
209      '''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.'''
210      if ":" in addr:
211          return f"[{addr}]:{port}"
212      else:
213          return f"{addr}:{port}"
214  
215  
216  def set_ephemeral_port_range(sock):
217      '''On FreeBSD, set socket to use the high ephemeral port range (49152-65535).
218  
219      FreeBSD's default ephemeral port range (10000-65535) overlaps with the test
220      framework's static port range starting at TEST_RUNNER_PORT_MIN (default=11000).
221      Using IP_PORTRANGE_HIGH avoids this overlap when binding to port 0 for dynamic
222      port allocation.
223      '''
224      if sys.platform.startswith('freebsd'):
225          # Constants from FreeBSD's netinet/in.h and netinet6/in6.h
226          IP_PORTRANGE = 19
227          IPV6_PORTRANGE = 14
228          IP_PORTRANGE_HIGH = 1  # Same value for both IPv4 and IPv6
229          if sock.family == socket.AF_INET6:
230              sock.setsockopt(socket.IPPROTO_IPV6, IPV6_PORTRANGE, IP_PORTRANGE_HIGH)
231          else:
232              sock.setsockopt(socket.IPPROTO_IP, IP_PORTRANGE, IP_PORTRANGE_HIGH)