/ test / functional / test_framework / test_node.py
test_node.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2017-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Class for bitcoind node under test"""
  6  
  7  import contextlib
  8  import decimal
  9  import errno
 10  from enum import Enum
 11  import json
 12  import logging
 13  import os
 14  import pathlib
 15  import platform
 16  import re
 17  import subprocess
 18  import tempfile
 19  import time
 20  import urllib.parse
 21  import collections
 22  import shlex
 23  import shutil
 24  import sys
 25  from pathlib import Path
 26  
 27  from .authproxy import (
 28      JSONRPCException,
 29      serialization_fallback,
 30  )
 31  from .messages import NODE_P2P_V2
 32  from .p2p import P2P_SERVICES, P2P_SUBVERSION
 33  from .util import (
 34      MAX_NODES,
 35      assert_equal,
 36      assert_not_equal,
 37      append_config,
 38      delete_cookie_file,
 39      get_auth_cookie,
 40      get_rpc_proxy,
 41      rpc_url,
 42      wait_until_helper_internal,
 43      p2p_port,
 44      tor_port,
 45  )
 46  
 47  BITCOIND_PROC_WAIT_TIMEOUT = 60
 48  # The size of the blocks xor key
 49  # from InitBlocksdirXorKey::xor_key.size()
 50  NUM_XOR_BYTES = 8
 51  # Many systems have a 128kB limit for a command size. Depending on the
 52  # platform, this limit may be larger or smaller. Moreover, when using the
 53  # 'bitcoin' command, it may internally insert more args, which must be
 54  # accounted for. There is no need to pick the largest possible value here
 55  # anyway and it should be fine to set it to 1kB in tests.
 56  TEST_CLI_MAX_ARG_SIZE = 1024
 57  
 58  # The null blocks key (all 0s)
 59  NULL_BLK_XOR_KEY = bytes([0] * NUM_XOR_BYTES)
 60  BITCOIN_PID_FILENAME_DEFAULT = "bitcoind.pid"
 61  
 62  if sys.platform.startswith("linux"):
 63      UNIX_PATH_MAX = 108          # includes the trailing NUL
 64  elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")):
 65      UNIX_PATH_MAX = 104
 66  else:                            # safest portable value
 67      UNIX_PATH_MAX = 92
 68  
 69  
 70  class FailedToStartError(Exception):
 71      """Raised when a node fails to start correctly."""
 72  
 73  
 74  class ErrorMatch(Enum):
 75      FULL_TEXT = 1
 76      FULL_REGEX = 2
 77      PARTIAL_REGEX = 3
 78  
 79  
 80  class TestNode():
 81      """A class for representing a bitcoind node under test.
 82  
 83      This class contains:
 84  
 85      - state about the node (whether it's running, etc)
 86      - a Python subprocess.Popen object representing the running process
 87      - an RPC connection to the node
 88      - one or more P2P connections to the node
 89  
 90  
 91      To make things easier for the test writer, any unrecognised messages will
 92      be dispatched to the RPC connection."""
 93  
 94      def __init__(self, i, datadir_path, *, chain, rpchost, timewait, timeout_factor, binaries, coverage_dir, cwd, extra_conf=None, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, v2transport=False, uses_wallet=False, ipcbind=False):
 95          """
 96          Kwargs:
 97              start_perf (bool): If True, begin profiling the node with `perf` as soon as
 98                  the node starts.
 99          """
100  
101          self.index = i
102          self.p2p_conn_index = 1
103          self.datadir_path = datadir_path
104          self.bitcoinconf = self.datadir_path / "bitcoin.conf"
105          self.stdout_dir = self.datadir_path / "stdout"
106          self.stderr_dir = self.datadir_path / "stderr"
107          self.chain = chain
108          self.rpchost = rpchost
109          self.rpc_timeout = timewait  # Already multiplied by timeout_factor
110          self.timeout_factor = timeout_factor
111          self.binaries = binaries
112          self.coverage_dir = coverage_dir
113          self.cwd = cwd
114          self.has_explicit_bind = False
115          if extra_conf is not None:
116              append_config(self.datadir_path, extra_conf)
117              # Remember if there is bind=... in the config file.
118              self.has_explicit_bind = any(e.startswith("bind=") for e in extra_conf)
119          # Most callers will just need to add extra args to the standard list below.
120          # For those callers that need more flexibility, they can just set the args property directly.
121          # Note that common args are set in the config file (see initialize_datadir)
122          self.extra_args = extra_args
123          self.version = version
124          # Configuration for logging is set as command-line args rather than in the bitcoin.conf file.
125          # This means that starting a bitcoind using the temp dir to debug a failed test won't
126          # spam debug.log.
127          self.args = self.binaries.node_argv(need_ipc=ipcbind) + [
128              f"-datadir={self.datadir_path}",
129              "-logtimemicros",
130              "-debug",
131              "-debugexclude=libevent",
132              "-debugexclude=leveldb",
133              "-debugexclude=rand",
134              "-uacomment=testnode%d" % i,  # required for subversion uniqueness across peers
135          ]
136          if uses_wallet is not None and not uses_wallet:
137              self.args.append("-disablewallet")
138  
139          self.ipc_tmp_dir = None
140          if ipcbind:
141              self.ipc_socket_path = self.chain_path / "node.sock"
142              if len(os.fsencode(self.ipc_socket_path)) < UNIX_PATH_MAX:
143                  self.args.append("-ipcbind=unix")
144              else:
145                  # Work around default CI path exceeding maximum socket path length.
146                  self.ipc_tmp_dir = pathlib.Path(tempfile.mkdtemp(prefix="test-ipc-"))
147                  self.ipc_socket_path = self.ipc_tmp_dir / "node.sock"
148                  self.args.append(f"-ipcbind=unix:{self.ipc_socket_path}")
149  
150          # Use valgrind, expect for previous release binaries
151          if use_valgrind and version is None:
152              default_suppressions_file = Path(__file__).parents[3] / "contrib" / "valgrind.supp"
153              suppressions_file = os.getenv("VALGRIND_SUPPRESSIONS_FILE",
154                                            default_suppressions_file)
155              self.args = ["valgrind", "--suppressions={}".format(suppressions_file),
156                           "--gen-suppressions=all", "--exit-on-first-error=yes",
157                           "--error-exitcode=1", "--quiet"] + self.args
158  
159          if self.version_is_at_least(190000):
160              self.args.append("-logthreadnames")
161          if self.version_is_at_least(219900):
162              self.args.append("-logsourcelocations")
163          if self.version_is_at_least(239000):
164              self.args.append("-loglevel=trace")
165          if self.version_is_at_least(290100):
166              self.args.append("-nologratelimit")
167  
168          # Default behavior from global -v2transport flag is added to args to persist it over restarts.
169          # May be overwritten in individual tests, using extra_args.
170          self.default_to_v2 = v2transport
171          if self.version_is_at_least(260000):
172              # 26.0 and later support v2transport
173              if v2transport:
174                  self.args.append("-v2transport=1")
175              else:
176                  self.args.append("-v2transport=0")
177          # if v2transport is requested via global flag but not supported for node version, ignore it
178  
179          self.cli = TestNodeCLI(
180              binaries,
181              self.datadir_path,
182              self.rpc_timeout // 2,  # timeout identical to the one used in self._rpc
183          )
184          self.use_cli = use_cli
185          self.start_perf = start_perf
186  
187          self.running = False
188          self.process = None
189          self.rpc_connected = False
190          self._rpc = None # Should usually not be accessed directly in tests to allow for --usecli mode
191          self.reuse_http_connections = True # Must be set before calling get_rpc_proxy() i.e. before restarting node
192          self.url = None
193          self.log = logging.getLogger('TestFramework.node%d' % i)
194          # Cache perf subprocesses here by their data output filename.
195          self.perf_subprocesses = {}
196  
197          self.p2ps = []
198  
199          self.mocktime = None
200  
201      AddressKeyPair = collections.namedtuple('AddressKeyPair', ['address', 'key'])
202      PRIV_KEYS = [
203              # address , privkey
204              AddressKeyPair('mjTkW3DjgyZck4KbiRusZsqTgaYTxdSz6z', 'cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW'),
205              AddressKeyPair('msX6jQXvxiNhx3Q62PKeLPrhrqZQdSimTg', 'cUxsWyKyZ9MAQTaAhUQWJmBbSvHMwSmuv59KgxQV7oZQU3PXN3KE'),
206              AddressKeyPair('mnonCMyH9TmAsSj3M59DsbH8H63U3RKoFP', 'cTrh7dkEAeJd6b3MRX9bZK8eRmNqVCMH3LSUkE3dSFDyzjU38QxK'),
207              AddressKeyPair('mqJupas8Dt2uestQDvV2NH3RU8uZh2dqQR', 'cVuKKa7gbehEQvVq717hYcbE9Dqmq7KEBKqWgWrYBa2CKKrhtRim'),
208              AddressKeyPair('msYac7Rvd5ywm6pEmkjyxhbCDKqWsVeYws', 'cQDCBuKcjanpXDpCqacNSjYfxeQj8G6CAtH1Dsk3cXyqLNC4RPuh'),
209              AddressKeyPair('n2rnuUnwLgXqf9kk2kjvVm8R5BZK1yxQBi', 'cQakmfPSLSqKHyMFGwAqKHgWUiofJCagVGhiB4KCainaeCSxeyYq'),
210              AddressKeyPair('myzuPxRwsf3vvGzEuzPfK9Nf2RfwauwYe6', 'cQMpDLJwA8DBe9NcQbdoSb1BhmFxVjWD5gRyrLZCtpuF9Zi3a9RK'),
211              AddressKeyPair('mumwTaMtbxEPUswmLBBN3vM9oGRtGBrys8', 'cSXmRKXVcoouhNNVpcNKFfxsTsToY5pvB9DVsFksF1ENunTzRKsy'),
212              AddressKeyPair('mpV7aGShMkJCZgbW7F6iZgrvuPHjZjH9qg', 'cSoXt6tm3pqy43UMabY6eUTmR3eSUYFtB2iNQDGgb3VUnRsQys2k'),
213              AddressKeyPair('mq4fBNdckGtvY2mijd9am7DRsbRB4KjUkf', 'cN55daf1HotwBAgAKWVgDcoppmUNDtQSfb7XLutTLeAgVc3u8hik'),
214              AddressKeyPair('mpFAHDjX7KregM3rVotdXzQmkbwtbQEnZ6', 'cT7qK7g1wkYEMvKowd2ZrX1E5f6JQ7TM246UfqbCiyF7kZhorpX3'),
215              AddressKeyPair('mzRe8QZMfGi58KyWCse2exxEFry2sfF2Y7', 'cPiRWE8KMjTRxH1MWkPerhfoHFn5iHPWVK5aPqjW8NxmdwenFinJ'),
216      ]
217  
218      def get_deterministic_priv_key(self):
219          """Return a deterministic priv key in base58, that only depends on the node's index"""
220          assert len(self.PRIV_KEYS) == MAX_NODES
221          return self.PRIV_KEYS[self.index]
222  
223      def _node_msg(self, msg: str) -> str:
224          """Return a modified msg that identifies this node by its index as a debugging aid."""
225          return "[node %d] %s" % (self.index, msg)
226  
227      def _raise_assertion_error(self, msg: str):
228          """Raise an AssertionError with msg modified to identify this node."""
229          raise AssertionError(self._node_msg(msg))
230  
231      def __del__(self):
232          # Ensure that we don't leave any bitcoind processes lying around after
233          # the test ends
234          if self.process:
235              # Should only happen on test failure
236              # Avoid using logger, as that may have already been shutdown when
237              # this destructor is called.
238              print(self._node_msg("Cleaning up leftover process"), file=sys.stderr)
239              self.process.kill()
240          if self.ipc_tmp_dir:
241              print(self._node_msg(f"Cleaning up ipc directory {str(self.ipc_tmp_dir)!r}"))
242              shutil.rmtree(self.ipc_tmp_dir)
243  
244      def __getattr__(self, name):
245          """Dispatches any unrecognised messages to the RPC connection or a CLI instance."""
246          if self.use_cli:
247              return getattr(self.cli, name)
248          else:
249              assert self.rpc_connected and self._rpc is not None, self._node_msg("Error: no RPC connection")
250              return getattr(self._rpc, name)
251  
252      def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, env=None, **kwargs):
253          """Start the node."""
254          if extra_args is None:
255              extra_args = self.extra_args
256  
257          # If listening and no -bind is given, then bitcoind would bind P2P ports on
258          # 0.0.0.0:P and 127.0.0.1:P+1 (for incoming Tor connections), where P is
259          # a unique port chosen by the test framework and configured as port=P in
260          # bitcoin.conf. To avoid collisions, change it to 127.0.0.1:tor_port().
261          will_listen = all(e != "-nolisten" and e != "-listen=0" for e in extra_args)
262          has_explicit_bind = self.has_explicit_bind or any(e.startswith("-bind=") for e in extra_args)
263          if will_listen and not has_explicit_bind:
264              extra_args.append(f"-bind=0.0.0.0:{p2p_port(self.index)}")
265              extra_args.append(f"-bind=127.0.0.1:{tor_port(self.index)}=onion")
266  
267          self.use_v2transport = "-v2transport=1" in extra_args or (self.default_to_v2 and "-v2transport=0" not in extra_args)
268  
269          # Add a new stdout and stderr file each time bitcoind is started
270          if stderr is None:
271              stderr = tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False)
272          if stdout is None:
273              stdout = tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False)
274          self.stderr = stderr
275          self.stdout = stdout
276  
277          if cwd is None:
278              cwd = self.cwd
279  
280          # Delete any existing cookie file -- if such a file exists (eg due to
281          # unclean shutdown), it will get overwritten anyway by bitcoind, and
282          # potentially interfere with our attempt to authenticate
283          delete_cookie_file(self.datadir_path, self.chain)
284  
285          # add environment variable LIBC_FATAL_STDERR_=1 so that libc errors are written to stderr and not the terminal
286          subp_env = dict(os.environ, LIBC_FATAL_STDERR_="1")
287          if env is not None:
288              subp_env.update(env)
289  
290          self.process = subprocess.Popen(self.args + extra_args, env=subp_env, stdout=stdout, stderr=stderr, cwd=cwd, **kwargs)
291  
292          self.running = True
293          self.log.debug("bitcoind started, waiting for RPC to come up")
294  
295          if self.start_perf:
296              self._start_perf()
297  
298      def wait_for_rpc_connection(self, *, wait_for_import=True):
299          """Sets up an RPC connection to the bitcoind process. Returns False if unable to connect."""
300          # Poll at a rate of four times per second
301          poll_per_s = 4
302  
303          suppressed_errors = collections.defaultdict(int)
304          latest_error = None
305          def suppress_error(category: str, e: Exception):
306              suppressed_errors[category] += 1
307              return (category, repr(e))
308  
309          for _ in range(poll_per_s * self.rpc_timeout):
310              if self.process.poll() is not None:
311                  # Attach abrupt shutdown error/s to the exception message
312                  self.stderr.seek(0)
313                  str_error = ''.join(line.decode('utf-8') for line in self.stderr)
314                  str_error += "************************\n" if str_error else ''
315  
316                  raise FailedToStartError(self._node_msg(
317                      f'bitcoind exited with status {self.process.returncode} during initialization. {str_error}'))
318              try:
319                  rpc = get_rpc_proxy(
320                      rpc_url(self.datadir_path, self.index, self.chain, self.rpchost),
321                      self.index,
322                      timeout=self.rpc_timeout // 2,  # Shorter timeout to allow for one retry in case of ETIMEDOUT
323                      coveragedir=self.coverage_dir,
324                  )
325                  rpc.auth_service_proxy_instance.reuse_http_connections = self.reuse_http_connections
326                  rpc.getblockcount()
327                  # If the call to getblockcount() succeeds then the RPC connection is up
328                  if self.version_is_at_least(190000) and wait_for_import:
329                      # getmempoolinfo.loaded is available since commit
330                      # bb8ae2c (version 0.19.0)
331                      self.wait_until(lambda: rpc.getmempoolinfo()['loaded'])
332                      # Wait for the node to finish reindex, block import, and
333                      # loading the mempool. Usually importing happens fast or
334                      # even "immediate" when the node is started. However, there
335                      # is no guarantee and sometimes ImportBlocks might finish
336                      # later. This is going to cause intermittent test failures,
337                      # because generally the tests assume the node is fully
338                      # ready after being started.
339                      #
340                      # For example, the node will reject block messages from p2p
341                      # when it is still importing with the error "Unexpected
342                      # block message received"
343                      #
344                      # The wait is done here to make tests as robust as possible
345                      # and prevent racy tests and intermittent failures as much
346                      # as possible. Some tests might not need this, but the
347                      # overhead is trivial, and the added guarantees are worth
348                      # the minimal performance cost.
349                  self.log.debug("RPC successfully started")
350                  # Set rpc_connected even if we are in use_cli mode so that we know we can call self.stop() if needed.
351                  self.rpc_connected = True
352                  if self.use_cli:
353                      return
354                  self._rpc = rpc
355                  self.url = self._rpc.rpc_url
356                  return
357              except JSONRPCException as e:
358                  # Suppress these as they are expected during initialization.
359                  # -28 RPC in warmup
360                  # -342 Service unavailable, could be starting up or shutting down
361                  if e.error['code'] not in [-28, -342]:
362                      raise  # unknown JSON RPC exception
363                  latest_error = suppress_error(f"JSONRPCException {e.error['code']}", e)
364              except OSError as e:
365                  error_num = e.errno
366                  # Work around issue where socket timeouts don't have errno set.
367                  # https://github.com/python/cpython/issues/109601
368                  if error_num is None and isinstance(e, TimeoutError):
369                      error_num = errno.ETIMEDOUT
370  
371                  # Suppress similarly to the above JSONRPCException errors.
372                  if error_num not in [
373                      errno.ECONNRESET,   # This might happen when the RPC server is in warmup,
374                                          # but shut down before the call to getblockcount succeeds.
375                      errno.ETIMEDOUT,    # Treat identical to ECONNRESET
376                      errno.ECONNREFUSED  # Port not yet open?
377                  ]:
378                      raise  # unknown OS error
379                  latest_error = suppress_error(f"OSError {errno.errorcode[error_num]}", e)
380              except ValueError as e:
381                  # Suppress if cookie file isn't generated yet and no rpcuser or rpcpassword; bitcoind may be starting.
382                  if "No RPC credentials" not in str(e):
383                      raise
384                  latest_error = suppress_error("missing_credentials", e)
385              time.sleep(1.0 / poll_per_s)
386          self._raise_assertion_error(f"Unable to connect to bitcoind after {self.rpc_timeout}s (ignored errors: {dict(suppressed_errors)!s}{'' if latest_error is None else f', latest: {latest_error[0]!r}/{latest_error[1]}'})")
387  
388      def wait_for_cookie_credentials(self):
389          """Ensures auth cookie credentials can be read, e.g. for testing CLI with -rpcwait before RPC connection is up."""
390          self.log.debug("Waiting for cookie credentials")
391          # Poll at a rate of four times per second.
392          poll_per_s = 4
393          for _ in range(poll_per_s * self.rpc_timeout):
394              try:
395                  get_auth_cookie(self.datadir_path, self.chain)
396                  self.log.debug("Cookie credentials successfully retrieved")
397                  return
398              except ValueError:  # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting
399                  pass            # so we continue polling until RPC credentials are retrieved
400              time.sleep(1.0 / poll_per_s)
401          self._raise_assertion_error("Unable to retrieve cookie credentials after {}s".format(self.rpc_timeout))
402  
403      def generate(self, nblocks, maxtries=1000000, **kwargs):
404          self.log.debug("TestNode.generate() dispatches `generate` call to `generatetoaddress`")
405          return self.generatetoaddress(nblocks=nblocks, address=self.get_deterministic_priv_key().address, maxtries=maxtries, **kwargs)
406  
407      def generateblock(self, *args, called_by_framework, **kwargs):
408          assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly."
409          return self.__getattr__('generateblock')(*args, **kwargs)
410  
411      def generatetoaddress(self, *args, called_by_framework, **kwargs):
412          assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly."
413          return self.__getattr__('generatetoaddress')(*args, **kwargs)
414  
415      def generatetodescriptor(self, *args, called_by_framework, **kwargs):
416          assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly."
417          return self.__getattr__('generatetodescriptor')(*args, **kwargs)
418  
419      def setmocktime(self, timestamp):
420          """Wrapper for setmocktime RPC, sets self.mocktime"""
421          if timestamp == 0:
422              # setmocktime(0) resets to system time.
423              self.mocktime = None
424          else:
425              self.mocktime = timestamp
426          return self.__getattr__('setmocktime')(timestamp)
427  
428      def get_wallet_rpc(self, wallet_name):
429          if self.use_cli:
430              return self.cli("-rpcwallet={}".format(wallet_name))
431          else:
432              assert self.rpc_connected and self._rpc, self._node_msg("RPC not connected")
433              wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name))
434              return self._rpc / wallet_path
435  
436      def version_is_at_least(self, ver):
437          return self.version is None or self.version >= ver
438  
439      def stop_node(self, expected_stderr='', *, wait=0, wait_until_stopped=True):
440          """Stop the node."""
441          if not self.running:
442              return
443          assert self.rpc_connected, self._node_msg(
444              "Should only call stop_node() on a running node after wait_for_rpc_connection() succeeded. "
445              f"Did you forget to call the latter after start()? Not connected to process: {self.process.pid}")
446          self.log.debug("Stopping node")
447          # Do not use wait argument when testing older nodes, e.g. in wallet_backwards_compatibility.py
448          if self.version_is_at_least(180000):
449              self.stop(wait=wait)
450          else:
451              self.stop()
452  
453          # If there are any running perf processes, stop them.
454          for profile_name in tuple(self.perf_subprocesses.keys()):
455              self._stop_perf(profile_name)
456  
457          del self.p2ps[:]
458  
459          assert (not expected_stderr) or wait_until_stopped  # Must wait to check stderr
460          if wait_until_stopped:
461              self.wait_until_stopped(expected_stderr=expected_stderr)
462  
463      def is_node_stopped(self, *, expected_stderr="", expected_ret_code=0):
464          """Checks whether the node has stopped.
465  
466          Returns True if the node has stopped. False otherwise.
467          This method is responsible for freeing resources (self.process)."""
468          if not self.running:
469              return True
470          return_code = self.process.poll()
471          if return_code is None:
472              return False
473  
474          # process has stopped. Assert that it didn't return an error code.
475          assert return_code == expected_ret_code, self._node_msg(
476              f"Node returned unexpected exit code ({return_code}) vs ({expected_ret_code}) when stopping")
477          # Check that stderr is as expected
478          self.stderr.seek(0)
479          stderr = self.stderr.read().decode('utf-8').strip()
480          if stderr != expected_stderr:
481              raise AssertionError("Unexpected stderr {} != {}".format(stderr, expected_stderr))
482  
483          self.stdout.close()
484          self.stderr.close()
485  
486          self.running = False
487          self.process = None
488          self.rpc_connected = False
489          self._rpc = None
490          self.log.debug("Node stopped")
491          return True
492  
493      def wait_until_stopped(self, *, timeout=BITCOIND_PROC_WAIT_TIMEOUT, expect_error=False, **kwargs):
494          if "expected_ret_code" not in kwargs:
495              kwargs["expected_ret_code"] = 1 if expect_error else 0  # Whether node shutdown return EXIT_FAILURE or EXIT_SUCCESS
496          self.wait_until(lambda: self.is_node_stopped(**kwargs), timeout=timeout)
497  
498      def kill_process(self):
499          self.process.kill()
500          self.wait_until_stopped(expected_ret_code=1 if platform.system() == "Windows" else -9)
501          assert self.is_node_stopped()
502  
503      def replace_in_config(self, replacements):
504          """
505          Perform replacements in the configuration file.
506          The substitutions are passed as a list of search-replace-tuples, e.g.
507              [("old", "new"), ("foo", "bar"), ...]
508          """
509          with open(self.bitcoinconf, 'r') as conf:
510              conf_data = conf.read()
511          for replacement in replacements:
512              assert_equal(len(replacement), 2)
513              old, new = replacement[0], replacement[1]
514              conf_data = conf_data.replace(old, new)
515          with open(self.bitcoinconf, 'w') as conf:
516              conf.write(conf_data)
517  
518      @property
519      def chain_path(self) -> Path:
520          return self.datadir_path / self.chain
521  
522      @property
523      def debug_log_path(self) -> Path:
524          return self.chain_path / 'debug.log'
525  
526      @property
527      def blocks_path(self) -> Path:
528          return self.chain_path / "blocks"
529  
530      @property
531      def blocks_key_path(self) -> Path:
532          return self.blocks_path / "xor.dat"
533  
534      def read_xor_key(self) -> bytes:
535          with open(self.blocks_key_path, "rb") as xor_f:
536              return xor_f.read(NUM_XOR_BYTES)
537  
538      @property
539      def wallets_path(self) -> Path:
540          return self.chain_path / "wallets"
541  
542      def debug_log_size(self, **kwargs) -> int:
543          with open(self.debug_log_path, **kwargs) as dl:
544              dl.seek(0, 2)
545              return dl.tell()
546  
547      @contextlib.contextmanager
548      def assert_debug_log(self, expected_msgs, unexpected_msgs=None, timeout=2):
549          if unexpected_msgs is None:
550              unexpected_msgs = []
551          assert_equal(type(expected_msgs), list)
552          assert_equal(type(unexpected_msgs), list)
553  
554          time_end = time.time() + timeout * self.timeout_factor
555          prev_size = self.debug_log_size(encoding="utf-8")  # Must use same encoding that is used to read() below
556  
557          yield
558  
559          while True:
560              found = True
561              with open(self.debug_log_path, encoding="utf-8", errors="replace") as dl:
562                  dl.seek(prev_size)
563                  log = dl.read()
564              print_log = " - " + "\n - ".join(log.splitlines())
565              for unexpected_msg in unexpected_msgs:
566                  if re.search(re.escape(unexpected_msg), log, flags=re.MULTILINE):
567                      self._raise_assertion_error('Unexpected message "{}" partially matches log:\n\n{}\n\n'.format(unexpected_msg, print_log))
568              for expected_msg in expected_msgs:
569                  if re.search(re.escape(expected_msg), log, flags=re.MULTILINE) is None:
570                      found = False
571              if found:
572                  return
573              if time.time() >= time_end:
574                  break
575              time.sleep(0.05)
576          self._raise_assertion_error('Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(str(expected_msgs), print_log))
577  
578      @contextlib.contextmanager
579      def busy_wait_for_debug_log(self, expected_msgs, timeout=60):
580          """
581          Block until we see a particular debug log message fragment or until we exceed the timeout.
582          Return:
583              the number of log lines we encountered when matching
584          """
585          time_end = time.time() + timeout * self.timeout_factor
586          prev_size = self.debug_log_size(mode="rb")  # Must use same mode that is used to read() below
587  
588          yield
589  
590          while True:
591              found = True
592              with open(self.debug_log_path, "rb") as dl:
593                  dl.seek(prev_size)
594                  log = dl.read()
595  
596              for expected_msg in expected_msgs:
597                  if expected_msg not in log:
598                      found = False
599  
600              if found:
601                  return
602  
603              if time.time() >= time_end:
604                  print_log = " - " + "\n - ".join(log.decode("utf8", errors="replace").splitlines())
605                  break
606  
607              # No sleep here because we want to detect the message fragment as fast as
608              # possible.
609  
610          self._raise_assertion_error(
611              'Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(
612                  str(expected_msgs), print_log))
613  
614      @contextlib.contextmanager
615      def wait_for_new_peer(self, timeout=5):
616          """
617          Wait until the node is connected to at least one new peer. We detect this
618          by watching for an increased highest peer id, using the `getpeerinfo` RPC call.
619          Note that the simpler approach of only accounting for the number of peers
620          suffers from race conditions, as disconnects from unrelated previous peers
621          could happen anytime in-between.
622          """
623          def get_highest_peer_id():
624              peer_info = self.getpeerinfo()
625              return peer_info[-1]["id"] if peer_info else -1
626  
627          initial_peer_id = get_highest_peer_id()
628          yield
629          self.wait_until(lambda: get_highest_peer_id() > initial_peer_id, timeout=timeout)
630  
631      @contextlib.contextmanager
632      def profile_with_perf(self, profile_name: str):
633          """
634          Context manager that allows easy profiling of node activity using `perf`.
635  
636          See `test/functional/README.md` for details on perf usage.
637  
638          Args:
639              profile_name: This string will be appended to the
640                  profile data filename generated by perf.
641          """
642          subp = self._start_perf(profile_name)
643  
644          yield
645  
646          if subp:
647              self._stop_perf(profile_name)
648  
649      def _start_perf(self, profile_name=None):
650          """Start a perf process to profile this node.
651  
652          Returns the subprocess running perf."""
653          subp = None
654  
655          def test_success(cmd):
656              return subprocess.call(
657                  # shell=True required for pipe use below
658                  cmd, shell=True,
659                  stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) == 0
660  
661          if platform.system() != 'Linux':
662              self.log.warning("Can't profile with perf; only available on Linux platforms")
663              return None
664  
665          if not test_success('which perf'):
666              self.log.warning("Can't profile with perf; must install perf-tools")
667              return None
668  
669          if not test_success('readelf -S {} | grep .debug_str'.format(shlex.quote(self.binary))):
670              self.log.warning(
671                  "perf output won't be very useful without debug symbols compiled into bitcoind")
672  
673          output_path = tempfile.NamedTemporaryFile(
674              dir=self.datadir_path,
675              prefix="{}.perf.data.".format(profile_name or 'test'),
676              delete=False,
677          ).name
678  
679          cmd = [
680              'perf', 'record',
681              '-g',                     # Record the callgraph.
682              '--call-graph', 'dwarf',  # Compatibility for gcc's --fomit-frame-pointer.
683              '-F', '101',              # Sampling frequency in Hz.
684              '-p', str(self.process.pid),
685              '-o', output_path,
686          ]
687          subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
688          self.perf_subprocesses[profile_name] = subp
689  
690          return subp
691  
692      def _stop_perf(self, profile_name):
693          """Stop (and pop) a perf subprocess."""
694          subp = self.perf_subprocesses.pop(profile_name)
695          output_path = subp.args[subp.args.index('-o') + 1]
696  
697          subp.terminate()
698          subp.wait(timeout=10)
699  
700          stderr = subp.stderr.read().decode()
701          if 'Consider tweaking /proc/sys/kernel/perf_event_paranoid' in stderr:
702              self.log.warning(
703                  "perf couldn't collect data! Try "
704                  "'sudo sysctl -w kernel.perf_event_paranoid=-1'")
705          else:
706              report_cmd = "perf report -i {}".format(output_path)
707              self.log.info("See perf output by running '{}'".format(report_cmd))
708  
709      def assert_start_raises_init_error(self, extra_args=None, expected_msg=None, match=ErrorMatch.FULL_TEXT, *args, **kwargs):
710          """Attempt to start the node and expect it to raise an error.
711  
712          extra_args: extra arguments to pass through to bitcoind
713          expected_msg: regex that stderr should match when bitcoind fails
714  
715          Will throw if bitcoind starts without an error.
716          Will throw if an expected_msg is provided and it does not match bitcoind's stdout."""
717          assert not self.running
718          with tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) as log_stderr, \
719               tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) as log_stdout:
720              try:
721                  self.start(extra_args, stdout=log_stdout, stderr=log_stderr, *args, **kwargs)
722                  ret = self.process.wait(timeout=self.rpc_timeout)
723                  self.log.debug(self._node_msg(f'bitcoind exited with status {ret} during initialization'))
724                  assert_not_equal(ret, 0) # Exit code must indicate failure
725                  self.running = False
726                  self.process = None
727                  # Check stderr for expected message
728                  if expected_msg is not None:
729                      log_stderr.seek(0)
730                      stderr = log_stderr.read().decode('utf-8').strip()
731                      if match == ErrorMatch.PARTIAL_REGEX:
732                          if re.search(expected_msg, stderr, flags=re.MULTILINE) is None:
733                              self._raise_assertion_error(
734                                  'Expected message "{}" does not partially match stderr:\n"{}"'.format(expected_msg, stderr))
735                      elif match == ErrorMatch.FULL_REGEX:
736                          if re.fullmatch(expected_msg, stderr) is None:
737                              self._raise_assertion_error(
738                                  'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr))
739                      elif match == ErrorMatch.FULL_TEXT:
740                          if expected_msg != stderr:
741                              self._raise_assertion_error(
742                                  'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr))
743              except subprocess.TimeoutExpired:
744                  self.process.kill()
745                  self.running = False
746                  self.process = None
747                  assert_msg = f'bitcoind should have exited within {self.rpc_timeout}s '
748                  if expected_msg is None:
749                      assert_msg += "with an error"
750                  else:
751                      assert_msg += "with expected error " + expected_msg
752                  self._raise_assertion_error(assert_msg)
753  
754      def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=None, wait_for_v2_handshake=True, expect_success=True, **kwargs):
755          """Add an inbound p2p connection to the node.
756  
757          This method adds the p2p connection to the self.p2ps list and also
758          returns the connection to the caller.
759  
760          When self.use_v2transport is True, TestNode advertises NODE_P2P_V2 service flag
761  
762          An inbound connection is made from TestNode <------ P2PConnection
763          - if TestNode doesn't advertise NODE_P2P_V2 service, P2PConnection sends version message and v1 P2P is followed
764          - if TestNode advertises NODE_P2P_V2 service, (and if P2PConnections supports v2 P2P)
765                  P2PConnection sends ellswift bytes and v2 P2P is followed
766          """
767          if 'dstport' not in kwargs:
768              kwargs['dstport'] = p2p_port(self.index)
769          if 'dstaddr' not in kwargs:
770              kwargs['dstaddr'] = '127.0.0.1'
771          if supports_v2_p2p is None:
772              supports_v2_p2p = self.use_v2transport
773  
774          if self.use_v2transport:
775              kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
776          supports_v2_p2p = self.use_v2transport and supports_v2_p2p
777          p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p)()
778  
779          self.p2ps.append(p2p_conn)
780          if not expect_success:
781              return p2p_conn
782          p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False)
783          if supports_v2_p2p and wait_for_v2_handshake:
784              p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake)
785          if send_version:
786              p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg)
787          if wait_for_verack:
788              # Wait for the node to send us the version and verack
789              p2p_conn.wait_for_verack()
790              # At this point we have sent our version message and received the version and verack, however the full node
791              # has not yet received the verack from us (in reply to their version). So, the connection is not yet fully
792              # established (fSuccessfullyConnected).
793              #
794              # This shouldn't lead to any issues when sending messages, since the verack will be in-flight before the
795              # message we send. However, it might lead to races where we are expecting to receive a message. E.g. a
796              # transaction that will be added to the mempool as soon as we return here.
797              #
798              # So syncing here is redundant when we only want to send a message, but the cost is low (a few milliseconds)
799              # in comparison to the upside of making tests less fragile and unexpected intermittent errors less likely.
800              p2p_conn.sync_with_ping()
801  
802              # Consistency check that the node received our user agent string.
803              # Find our connection in getpeerinfo by our address:port and theirs, as this combination is unique.
804              sockname = p2p_conn._transport.get_extra_info("socket").getsockname()
805              our_addr_and_port = f"{sockname[0]}:{sockname[1]}"
806              dst_addr_and_port = f"{p2p_conn.dstaddr}:{p2p_conn.dstport}"
807              info = [peer for peer in self.getpeerinfo() if peer["addr"] == our_addr_and_port and peer["addrbind"] == dst_addr_and_port]
808              assert_equal(len(info), 1)
809              assert_equal(info[0]["subver"], P2P_SUBVERSION)
810  
811          return p2p_conn
812  
813      def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, wait_for_disconnect=False, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=None, advertise_v2_p2p=None, **kwargs):
814          """Add an outbound p2p connection from node. Must be an
815          "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection.
816  
817          This method adds the p2p connection to the self.p2ps list and returns
818          the connection to the caller.
819  
820          p2p_idx must be different for simultaneously connected peers. When reusing it for the next peer
821          after disconnecting the previous one, it is necessary to wait for the disconnect to finish to avoid
822          a race condition.
823  
824          Parameters:
825              supports_v2_p2p: whether p2p_conn supports v2 P2P or not
826              advertise_v2_p2p: whether p2p_conn is advertised to support v2 P2P or not
827  
828          An outbound connection is made from TestNode -------> P2PConnection
829              - if P2PConnection doesn't advertise_v2_p2p, TestNode sends version message and v1 P2P is followed
830              - if P2PConnection both supports_v2_p2p and advertise_v2_p2p, TestNode sends ellswift bytes and v2 P2P is followed
831              - if P2PConnection doesn't supports_v2_p2p but advertise_v2_p2p,
832                  TestNode sends ellswift bytes and P2PConnection disconnects,
833                  TestNode reconnects by sending version message and v1 P2P is followed
834          """
835  
836          def addconnection_callback(address, port):
837              self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type))
838              self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p)
839  
840          if supports_v2_p2p is None:
841              supports_v2_p2p = self.use_v2transport
842          if advertise_v2_p2p is None:
843              advertise_v2_p2p = self.use_v2transport
844  
845          if advertise_v2_p2p:
846              kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
847              assert self.use_v2transport  # only a v2 TestNode could make a v2 outbound connection
848  
849          # if P2PConnection is advertised to support v2 P2P when it doesn't actually support v2 P2P,
850          # reconnection needs to be attempted using v1 P2P by sending version message
851          reconnect = advertise_v2_p2p and not supports_v2_p2p
852          # P2PConnection needs to be advertised to support v2 P2P so that ellswift bytes are sent instead of msg_version
853          supports_v2_p2p = supports_v2_p2p and advertise_v2_p2p
854          p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p, reconnect=reconnect, **kwargs)()
855  
856          if reconnect:
857              p2p_conn.wait_for_reconnect()
858  
859          if connection_type == "feeler" or wait_for_disconnect:
860              # feeler connections are closed as soon as the node receives a `version` message
861              p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False)
862              p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False)
863          else:
864              p2p_conn.wait_for_connect()
865              self.p2ps.append(p2p_conn)
866  
867              if supports_v2_p2p:
868                  p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake)
869              p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg)
870              if wait_for_verack:
871                  p2p_conn.wait_for_verack()
872                  p2p_conn.sync_with_ping()
873  
874          return p2p_conn
875  
876      def num_test_p2p_connections(self):
877          """Return number of test framework p2p connections to the node."""
878          return len([peer for peer in self.getpeerinfo() if peer['subver'] == P2P_SUBVERSION])
879  
880      def disconnect_p2ps(self):
881          """Close all p2p connections to the node.
882          The state of the peers (such as txrequests) may not be fully cleared
883          yet, even after this method returns."""
884          for p in self.p2ps:
885              p.peer_disconnect()
886          del self.p2ps[:]
887  
888          self.wait_until(lambda: self.num_test_p2p_connections() == 0)
889  
890      def bumpmocktime(self, seconds):
891          """Fast forward using setmocktime to self.mocktime + seconds. Requires setmocktime to have
892          been called at some point in the past."""
893          assert self.mocktime
894          self.mocktime += seconds
895          self.setmocktime(self.mocktime)
896  
897      def wait_until(self, test_function, timeout=60, check_interval=0.05):
898          return wait_until_helper_internal(test_function, timeout=timeout, timeout_factor=self.timeout_factor, check_interval=check_interval)
899  
900  
901  class TestNodeCLIAttr:
902      def __init__(self, cli, command):
903          self.cli = cli
904          self.command = command
905  
906      def __call__(self, *args, **kwargs):
907          return self.cli.send_cli(self.command, *args, **kwargs)
908  
909      def get_request(self, *args, **kwargs):
910          return lambda: self(*args, **kwargs)
911  
912  
913  def arg_to_cli(arg):
914      if isinstance(arg, bool):
915          return str(arg).lower()
916      elif arg is None:
917          return 'null'
918      elif isinstance(arg, dict) or isinstance(arg, list) or isinstance(arg, tuple):
919          return json.dumps(arg, default=serialization_fallback)
920      else:
921          return str(arg)
922  
923  
924  class TestNodeCLI():
925      """Interface to bitcoin-cli for an individual node"""
926      def __init__(self, binaries, datadir, rpc_timeout):
927          self.options = []
928          self.binaries = binaries
929          self.datadir = datadir
930          self.rpc_timeout = rpc_timeout
931          self.input = None
932          self.log = logging.getLogger('TestFramework.bitcoincli')
933  
934      def __call__(self, *options, input=None):
935          # TestNodeCLI is callable with bitcoin-cli command-line options
936          cli = TestNodeCLI(self.binaries, self.datadir, self.rpc_timeout)
937          cli.options = [str(o) for o in options]
938          cli.input = input
939          return cli
940  
941      def __getattr__(self, command):
942          return TestNodeCLIAttr(self, command)
943  
944      def batch(self, requests):
945          results = []
946          for request in requests:
947              try:
948                  results.append(dict(result=request()))
949              except JSONRPCException as e:
950                  results.append(dict(error=e))
951          return results
952  
953      def send_cli(self, clicommand=None, *args, **kwargs):
954          """Run bitcoin-cli command. Deserializes returned string as python object."""
955          pos_args = [arg_to_cli(arg) for arg in args]
956          named_args = [key + "=" + arg_to_cli(value) for (key, value) in kwargs.items() if value is not None]
957          p_args = self.binaries.rpc_argv() + [
958              f"-datadir={self.datadir}",
959              f"-rpcclienttimeout={int(self.rpc_timeout)}",
960          ] + self.options
961          if named_args:
962              p_args += ["-named"]
963          base_arg_pos = len(p_args)
964          if clicommand is not None:
965              p_args += [clicommand]
966          p_args += pos_args + named_args
967  
968          # TEST_CLI_MAX_ARG_SIZE is set low enough that checking the string
969          # length is enough and encoding to bytes is not needed before
970          # calculating the sum.
971          sum_arg_size = sum(len(arg) for arg in p_args)
972          stdin_data = self.input
973          if sum_arg_size >= TEST_CLI_MAX_ARG_SIZE:
974              self.log.debug(f"Cli: Command size {sum_arg_size} too large, using stdin")
975              rpc_args = "\n".join([arg for arg in p_args[base_arg_pos:]])
976              if stdin_data is not None:
977                  stdin_data += "\n" + rpc_args
978              else:
979                  stdin_data = rpc_args
980              p_args = p_args[:base_arg_pos] + ['-stdin']
981  
982          self.log.debug("Running bitcoin-cli {}".format(p_args[2:]))
983          process = subprocess.Popen(p_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
984          cli_stdout, cli_stderr = process.communicate(input=stdin_data)
985          returncode = process.poll()
986          if returncode:
987              match = re.match(r'error code: ([-0-9]+)\nerror message:\n(.*)', cli_stderr)
988              if match:
989                  code, message = match.groups()
990                  raise JSONRPCException(dict(code=int(code), message=message))
991              # Ignore cli_stdout, raise with cli_stderr
992              raise subprocess.CalledProcessError(returncode, p_args, output=cli_stderr)
993          try:
994              if not cli_stdout.strip():
995                  return None
996              return json.loads(cli_stdout, parse_float=decimal.Decimal)
997          except (json.JSONDecodeError, decimal.InvalidOperation):
998              return cli_stdout.rstrip("\n")