/ test / functional / test_framework / test_framework.py
test_framework.py
   1  #!/usr/bin/env python3
   2  # Copyright (c) 2014-present The Bitcoin Core developers
   3  # Distributed under the MIT software license, see the accompanying
   4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
   5  """Base class for RPC testing."""
   6  
   7  import configparser
   8  from enum import Enum
   9  import argparse
  10  from datetime import datetime, timezone
  11  import logging
  12  import os
  13  import platform
  14  import pdb
  15  import random
  16  import re
  17  import shutil
  18  import subprocess
  19  import sys
  20  import tempfile
  21  import time
  22  
  23  from .address import create_deterministic_address_bcrt1_p2tr_op_true
  24  from .authproxy import JSONRPCException
  25  from . import coverage
  26  from .p2p import NetworkThread
  27  from .test_node import TestNode
  28  from .util import (
  29      Binaries,
  30      MAX_NODES,
  31      PortSeed,
  32      assert_equal,
  33      check_json_precision,
  34      export_env_build_path,
  35      find_vout_for_address,
  36      get_binary_paths,
  37      get_datadir_path,
  38      initialize_datadir,
  39      p2p_port,
  40      wait_until_helper_internal,
  41      wallet_importprivkey,
  42  )
  43  
  44  
  45  class TestStatus(Enum):
  46      PASSED = 1
  47      FAILED = 2
  48      SKIPPED = 3
  49  
  50  TEST_EXIT_PASSED = 0
  51  TEST_EXIT_FAILED = 1
  52  TEST_EXIT_SKIPPED = 77
  53  
  54  TMPDIR_PREFIX = "bitcoin_func_test_"
  55  
  56  
  57  class SkipTest(Exception):
  58      """This exception is raised to skip a test"""
  59  
  60      def __init__(self, message):
  61          self.message = message
  62  
  63  
  64  class BitcoinTestMetaClass(type):
  65      """Metaclass for BitcoinTestFramework.
  66  
  67      Ensures that any attempt to register a subclass of `BitcoinTestFramework`
  68      adheres to a standard whereby the subclass overrides `set_test_params` and
  69      `run_test` but DOES NOT override either `__init__` or `main`. If any of
  70      those standards are violated, a ``TypeError`` is raised."""
  71  
  72      def __new__(cls, clsname, bases, dct):
  73          if not clsname == 'BitcoinTestFramework':
  74              if not ('run_test' in dct and 'set_test_params' in dct):
  75                  raise TypeError("BitcoinTestFramework subclasses must override "
  76                                  "'run_test' and 'set_test_params'")
  77              if '__init__' in dct or 'main' in dct:
  78                  raise TypeError("BitcoinTestFramework subclasses may not override "
  79                                  "'__init__' or 'main'")
  80  
  81          return super().__new__(cls, clsname, bases, dct)
  82  
  83  
  84  class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
  85      """Base class for a bitcoin test script.
  86  
  87      Individual bitcoin test scripts should subclass this class and override the set_test_params() and run_test() methods.
  88  
  89      Individual tests can also override the following methods to customize the test setup:
  90  
  91      - add_options()
  92      - setup_chain()
  93      - setup_network()
  94      - setup_nodes()
  95  
  96      The __init__() and main() methods should not be overridden.
  97  
  98      This class also contains various public and private helper methods."""
  99  
 100      def __init__(self, test_file) -> None:
 101          """Sets test framework defaults. Do not override this method. Instead, override the set_test_params() method"""
 102          self.chain: str = 'regtest'
 103          self.setup_clean_chain: bool = False
 104          self.noban_tx_relay: bool = False
 105          self.nodes: list[TestNode] = []
 106          self.extra_args = None
 107          self.extra_init = None
 108          self.network_thread = None
 109          self.rpc_timeout = 60  # Wait for up to 60 seconds for the RPC server to respond
 110          self.supports_cli = True
 111          self.bind_to_localhost_only = True
 112          self.parse_args(test_file)
 113          self.default_wallet_name = "default_wallet"
 114          self.wallet_data_filename = "wallet.dat"
 115          # Optional list of wallet names that can be set in set_test_params to
 116          # create and import keys to. If unset, default is len(nodes) *
 117          # [default_wallet_name]. If wallet names are None, wallet creation is
 118          # skipped. If list is truncated, wallet creation is skipped and keys
 119          # are not imported.
 120          self.wallet_names = None
 121          # By default the wallet is not required. Set to true by skip_if_no_wallet().
 122          # Can also be set to None to indicate that the wallet will be used if available.
 123          # When False or None, we ignore wallet_names in setup_nodes().
 124          self.uses_wallet = False
 125          # Disable ThreadOpenConnections by default, so that adding entries to
 126          # addrman will not result in automatic connections to them.
 127          self.disable_autoconnect = True
 128          self.set_test_params()
 129          assert self.wallet_names is None or len(self.wallet_names) <= self.num_nodes
 130          self.rpc_timeout = int(self.rpc_timeout * self.options.timeout_factor) # optionally, increase timeout by a factor
 131  
 132      def main(self):
 133          """Main function. This should not be overridden by the subclass test scripts."""
 134  
 135          assert hasattr(self, "num_nodes"), "Test must set self.num_nodes in set_test_params()"
 136  
 137          try:
 138              self.setup()
 139              if self.options.test_methods:
 140                  self.run_test_methods()
 141              else:
 142                  self.run_test()
 143  
 144          except SkipTest as e:
 145              self.log.warning("Test Skipped: %s" % e.message)
 146              self.success = TestStatus.SKIPPED
 147          except subprocess.CalledProcessError as e:
 148              self.log.exception(f"Called Process failed with stdout='{e.stdout}'; stderr='{e.stderr}';")
 149              self.success = TestStatus.FAILED
 150          except BaseException:
 151              # The `exception` log will add the exception info to the message.
 152              # https://docs.python.org/3/library/logging.html#logging.exception
 153              self.log.exception("Unexpected exception:")
 154              self.success = TestStatus.FAILED
 155          finally:
 156              exit_code = self.shutdown()
 157              sys.exit(exit_code)
 158  
 159      def run_test_methods(self):
 160          for method_name in self.options.test_methods:
 161              self.log.info(f"Attempting to execute method: {method_name}")
 162              method = getattr(self, method_name)
 163              method()
 164              self.log.info(f"Method '{method_name}' executed successfully.")
 165  
 166      def parse_args(self, test_file):
 167          previous_releases_path = os.getenv("PREVIOUS_RELEASES_DIR") or os.getcwd() + "/releases"
 168          parser = argparse.ArgumentParser(usage="%(prog)s [options]")
 169          parser.add_argument("--nocleanup", dest="nocleanup", default=False, action="store_true",
 170                              help="Leave bitcoinds and test.* datadir on exit or error")
 171          parser.add_argument("--cachedir", dest="cachedir", default=os.path.abspath(os.path.dirname(test_file) + "/../cache"),
 172                              help="Directory for caching pregenerated datadirs (default: %(default)s)")
 173          parser.add_argument("--tmpdir", dest="tmpdir", help="Root directory for datadirs (must not exist)")
 174          parser.add_argument("-l", "--loglevel", dest="loglevel", default="INFO",
 175                              help="log events at this level and higher to the console. Can be set to DEBUG, INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output all logs to console. Note that logs at all levels are always written to the test_framework.log file in the temporary test directory.")
 176          parser.add_argument("--tracerpc", dest="trace_rpc", default=False, action="store_true",
 177                              help="Print out all RPC calls as they are made")
 178          parser.add_argument("--portseed", dest="port_seed", default=os.getpid(), type=int,
 179                              help="The seed to use for assigning port numbers (default: current process id)")
 180          parser.add_argument("--previous-releases", dest="prev_releases", action="store_true",
 181                              default=os.path.isdir(previous_releases_path) and bool(os.listdir(previous_releases_path)),
 182                              help="Force test of previous releases (default: %(default)s). Previous releases binaries can be downloaded via `test/get_previous_releases.py`.")
 183          parser.add_argument("--coveragedir", dest="coveragedir",
 184                              help="Write tested RPC commands into this directory")
 185          parser.add_argument("--configfile", dest="configfile",
 186                              default=os.path.abspath(os.path.dirname(test_file) + "/../config.ini"),
 187                              help="Location of the test framework config file (default: %(default)s)")
 188          parser.add_argument("--pdbonfailure", dest="pdbonfailure", default=False, action="store_true",
 189                              help="Attach a python debugger if test fails")
 190          parser.add_argument("--usecli", dest="usecli", default=False, action="store_true",
 191                              help="use bitcoin-cli instead of RPC for all commands")
 192          parser.add_argument("--perf", dest="perf", default=False, action="store_true",
 193                              help="profile running nodes with perf for the duration of the test")
 194          parser.add_argument("--valgrind", dest="valgrind", default=False, action="store_true",
 195                              help="Run binaries under the valgrind memory error detector: Expect at least a ~10x slowdown. Does not apply to previous release binaries.")
 196          parser.add_argument("--randomseed", type=int,
 197                              help="set a random seed for deterministically reproducing a previous test run")
 198          parser.add_argument("--timeout-factor", dest="timeout_factor", type=float, help="adjust test timeouts by a factor. Setting it to 0 disables all timeouts")
 199          parser.add_argument("--v2transport", dest="v2transport", default=False, action="store_true",
 200                              help="use BIP324 v2 connections between all nodes by default")
 201          parser.add_argument("--v1transport", dest="v1transport", default=False, action="store_true",
 202                              help="Explicitly use v1 transport (can be used to overwrite global --v2transport option)")
 203          parser.add_argument("--test_methods", dest="test_methods", nargs='*',
 204                              help="Run specified test methods sequentially instead of the full test. Use only for methods that do not depend on any context set up in run_test or other methods.")
 205  
 206          self.add_options(parser)
 207          # Running TestShell in a Jupyter notebook causes an additional -f argument
 208          # To keep TestShell from failing with an "unrecognized argument" error, we add a dummy "-f" argument
 209          # source: https://stackoverflow.com/questions/48796169/how-to-fix-ipykernel-launcher-py-error-unrecognized-arguments-in-jupyter/56349168#56349168
 210          parser.add_argument("-f", "--fff", help="a dummy argument to fool ipython", default="1")
 211          self.options = parser.parse_args()
 212          if self.options.timeout_factor == 0:
 213              self.options.timeout_factor = 999
 214          self.options.timeout_factor = self.options.timeout_factor or (4 if self.options.valgrind else 1)
 215          self.options.previous_releases_path = previous_releases_path
 216  
 217          self.config = configparser.ConfigParser()
 218          self.config.read_file(open(self.options.configfile))
 219          self.binary_paths = get_binary_paths(self.config)
 220          if self.options.v1transport:
 221              self.options.v2transport=False
 222  
 223          PortSeed.n = self.options.port_seed
 224  
 225      def get_binaries(self, bin_dir=None):
 226          return Binaries(self.binary_paths, bin_dir, use_valgrind=self.options.valgrind)
 227  
 228      def setup(self):
 229          """Call this method to start up the test framework object with options set."""
 230  
 231          check_json_precision()
 232          export_env_build_path(self.config)
 233  
 234          self.options.cachedir = os.path.abspath(self.options.cachedir)
 235  
 236          # Set up temp directory and start logging
 237          if self.options.tmpdir:
 238              self.options.tmpdir = os.path.abspath(self.options.tmpdir)
 239              os.makedirs(self.options.tmpdir, exist_ok=False)
 240          else:
 241              self.options.tmpdir = tempfile.mkdtemp(prefix=TMPDIR_PREFIX)
 242          self._start_logging()
 243  
 244          # Seed the PRNG. Note that test runs are reproducible if and only if
 245          # a single thread accesses the PRNG. For more information, see
 246          # https://docs.python.org/3/library/random.html#notes-on-reproducibility.
 247          # The network thread shouldn't access random. If we need to change the
 248          # network thread to access randomness, it should instantiate its own
 249          # random.Random object.
 250          seed = self.options.randomseed
 251  
 252          if seed is None:
 253              seed = random.randrange(sys.maxsize)
 254          else:
 255              self.log.info("User supplied random seed {}".format(seed))
 256  
 257          random.seed(seed)
 258          self.log.info("PRNG seed is: {}".format(seed))
 259  
 260          self.log.debug('Setting up network thread')
 261          self.network_thread = NetworkThread()
 262          self.network_thread.start()
 263          self.wait_until(lambda: self.network_thread.network_event_loop.is_running())
 264  
 265          if self.options.usecli:
 266              if not self.supports_cli:
 267                  raise SkipTest("--usecli specified but test does not support using CLI")
 268              self.skip_if_no_cli()
 269          self.skip_test_if_missing_module()
 270          self.setup_chain()
 271          self.setup_network()
 272  
 273          self.success = TestStatus.PASSED
 274  
 275      def shutdown(self):
 276          """Call this method to shut down the test framework object."""
 277  
 278          if self.success == TestStatus.FAILED and self.options.pdbonfailure:
 279              print("Testcase failed. Attaching python debugger. Enter ? for help")
 280              pdb.set_trace()
 281  
 282          self.log.debug('Closing down network thread')
 283          self.network_thread.close(timeout=self.options.timeout_factor * 10)
 284          if self.success == TestStatus.FAILED:
 285              self.log.info("Not stopping nodes as test failed. The dangling processes will be cleaned up later.")
 286          else:
 287              self.log.info("Stopping nodes")
 288              if self.nodes:
 289                  self.stop_nodes()
 290  
 291          should_clean_up = (
 292              not self.options.nocleanup and
 293              self.success != TestStatus.FAILED and
 294              not self.options.perf
 295          )
 296          if should_clean_up:
 297              self.log.info("Cleaning up {} on exit".format(self.options.tmpdir))
 298              cleanup_tree_on_exit = True
 299          elif self.options.perf:
 300              self.log.warning("Not cleaning up dir {} due to perf data".format(self.options.tmpdir))
 301              cleanup_tree_on_exit = False
 302          else:
 303              self.log.warning("Not cleaning up dir {}".format(self.options.tmpdir))
 304              cleanup_tree_on_exit = False
 305  
 306          if self.success == TestStatus.PASSED:
 307              self.log.info("Tests successful")
 308              exit_code = TEST_EXIT_PASSED
 309          elif self.success == TestStatus.SKIPPED:
 310              self.log.info("Test skipped")
 311              exit_code = TEST_EXIT_SKIPPED
 312          else:
 313              self.log.error("Test failed. Test logging available at %s/test_framework.log", self.options.tmpdir)
 314              self.log.error("")
 315              self.log.error("Hint: Call {} '{}' to consolidate all logs".format(os.path.normpath(os.path.dirname(os.path.realpath(__file__)) + "/../combine_logs.py"), self.options.tmpdir))
 316              self.log.error("")
 317              self.log.error("If this failure happened unexpectedly or intermittently, please file a bug and provide a link or upload of the combined log.")
 318              self.log.error(self.config['environment']['CLIENT_BUGREPORT'])
 319              self.log.error("")
 320              exit_code = TEST_EXIT_FAILED
 321          # Logging.shutdown will not remove stream- and filehandlers, so we must
 322          # do it explicitly. Handlers are removed so the next test run can apply
 323          # different log handler settings.
 324          # See: https://docs.python.org/3/library/logging.html#logging.shutdown
 325          for h in list(self.log.handlers):
 326              h.flush()
 327              h.close()
 328              self.log.removeHandler(h)
 329          rpc_logger = logging.getLogger("BitcoinRPC")
 330          for h in list(rpc_logger.handlers):
 331              h.flush()
 332              rpc_logger.removeHandler(h)
 333          if cleanup_tree_on_exit:
 334              shutil.rmtree(self.options.tmpdir)
 335  
 336          self.nodes.clear()
 337          return exit_code
 338  
 339      # Methods to override in subclass test scripts.
 340      def set_test_params(self):
 341          """Tests must override this method to change default values for number of nodes, topology, etc"""
 342          raise NotImplementedError
 343  
 344      def add_options(self, parser):
 345          """Override this method to add command-line options to the test"""
 346          pass
 347  
 348      def skip_test_if_missing_module(self):
 349          """Override this method to skip a test if a module is not compiled"""
 350          pass
 351  
 352      def setup_chain(self):
 353          """Override this method to customize blockchain setup"""
 354          self.log.info("Initializing test directory " + self.options.tmpdir)
 355          if self.setup_clean_chain:
 356              self._initialize_chain_clean()
 357          else:
 358              self._initialize_chain()
 359  
 360      def setup_network(self):
 361          """Override this method to customize test network topology"""
 362          self.setup_nodes()
 363  
 364          # Connect the nodes as a "chain".  This allows us
 365          # to split the network between nodes 1 and 2 to get
 366          # two halves that can work on competing chains.
 367          #
 368          # Topology looks like this:
 369          # node0 <-- node1 <-- node2 <-- node3
 370          #
 371          # If all nodes are in IBD (clean chain from genesis), node0 is assumed to be the source of blocks (miner). To
 372          # ensure block propagation, all nodes will establish outgoing connections toward node0.
 373          # See fPreferredDownload in net_processing.
 374          #
 375          # If further outbound connections are needed, they can be added at the beginning of the test with e.g.
 376          # self.connect_nodes(1, 2)
 377          for i in range(self.num_nodes - 1):
 378              self.connect_nodes(i + 1, i)
 379          self.sync_all()
 380  
 381      def setup_nodes(self):
 382          """Override this method to customize test node setup"""
 383          self.add_nodes(self.num_nodes, self.extra_args)
 384          self.start_nodes()
 385          if self.uses_wallet:
 386              self.import_deterministic_coinbase_privkeys()
 387          if not self.setup_clean_chain:
 388              for n in self.nodes:
 389                  assert_equal(n.getblockchaininfo()["blocks"], 199)
 390              # To ensure that all nodes are out of IBD, the most recent block
 391              # must have a timestamp not too old (see IsInitialBlockDownload()).
 392              self.log.debug('Generate a block with current time')
 393              block_hash = self.generate(self.nodes[0], 1, sync_fun=self.no_op)[0]
 394              block = self.nodes[0].getblock(blockhash=block_hash, verbosity=0)
 395              for n in self.nodes:
 396                  n.submitblock(block)
 397                  chain_info = n.getblockchaininfo()
 398                  assert_equal(chain_info["blocks"], 200)
 399                  assert_equal(chain_info["initialblockdownload"], False)
 400  
 401      def import_deterministic_coinbase_privkeys(self):
 402          for i in range(self.num_nodes):
 403              self.init_wallet(node=i)
 404  
 405      def init_wallet(self, *, node):
 406          wallet_name = self.default_wallet_name if self.wallet_names is None else self.wallet_names[node] if node < len(self.wallet_names) else False
 407          if wallet_name is not False:
 408              n = self.nodes[node]
 409              if wallet_name is not None:
 410                  n.createwallet(wallet_name=wallet_name, load_on_startup=True)
 411              wallet_importprivkey(n.get_wallet_rpc(wallet_name), n.get_deterministic_priv_key().key, 0, label="coinbase")
 412  
 413      def run_test(self):
 414          """Tests must override this method to define test logic"""
 415          raise NotImplementedError
 416  
 417      # Public helper methods. These can be accessed by the subclass test scripts.
 418  
 419      def add_nodes(self, num_nodes: int, extra_args=None, *, rpchost=None, versions=None):
 420          """Instantiate TestNode objects.
 421  
 422          Should only be called once after the nodes have been specified in
 423          set_test_params()."""
 424          def bin_dir_from_version(version):
 425              if not version:
 426                  return None
 427              if version > 219999:
 428                  # Starting at client version 220000 the first two digits represent
 429                  # the major version, e.g. v22.0 instead of v0.22.0.
 430                  version *= 100
 431              return os.path.join(
 432                  self.options.previous_releases_path,
 433                  re.sub(
 434                      r'\.0$' if version <= 219999 else r'(\.0){1,2}$',
 435                      '', # Remove trailing dot for point releases, after 22.0 also remove double trailing dot.
 436                      'v{}.{}.{}.{}'.format(
 437                          (version % 100000000) // 1000000,
 438                          (version % 1000000) // 10000,
 439                          (version % 10000) // 100,
 440                          (version % 100) // 1,
 441                      ),
 442                  ),
 443                  'bin',
 444              )
 445  
 446          if self.bind_to_localhost_only:
 447              extra_confs = [["bind=127.0.0.1"]] * num_nodes
 448          else:
 449              extra_confs = [[]] * num_nodes
 450          if extra_args is None:
 451              extra_args = [[]] * num_nodes
 452          # Whitelist peers to speed up tx relay / mempool sync. Don't use it if testing tx relay or timing.
 453          if self.noban_tx_relay:
 454              for i in range(len(extra_args)):
 455                  extra_args[i] = extra_args[i] + ["-whitelist=noban,in,out@127.0.0.1"]
 456          if versions is None:
 457              versions = [None] * num_nodes
 458          bin_dirs = []
 459          for v in versions:
 460              bin_dir = bin_dir_from_version(v)
 461              bin_dirs.append(bin_dir)
 462  
 463          extra_init = [{}] * num_nodes if self.extra_init is None else self.extra_init # type: ignore[var-annotated]
 464          assert_equal(len(extra_init), num_nodes)
 465          assert_equal(len(extra_confs), num_nodes)
 466          assert_equal(len(extra_args), num_nodes)
 467          assert_equal(len(versions), num_nodes)
 468          assert_equal(len(bin_dirs), num_nodes)
 469          for i in range(num_nodes):
 470              args = list(extra_args[i])
 471              init = dict(
 472                  chain=self.chain,
 473                  rpchost=rpchost,
 474                  timewait=self.rpc_timeout,
 475                  timeout_factor=self.options.timeout_factor,
 476                  binaries=self.get_binaries(bin_dirs[i]),
 477                  version=versions[i],
 478                  coverage_dir=self.options.coveragedir,
 479                  cwd=self.options.tmpdir,
 480                  extra_conf=extra_confs[i],
 481                  extra_args=args,
 482                  use_cli=self.options.usecli,
 483                  start_perf=self.options.perf,
 484                  v2transport=self.options.v2transport,
 485                  uses_wallet=self.uses_wallet,
 486              )
 487              init.update(extra_init[i])
 488              test_node_i = TestNode(
 489                  i,
 490                  get_datadir_path(self.options.tmpdir, i),
 491                  **init)
 492              self.nodes.append(test_node_i)
 493              if not test_node_i.version_is_at_least(170000):
 494                  # adjust conf for pre 17
 495                  test_node_i.replace_in_config([('[regtest]', '')])
 496  
 497      def start_node(self, i, *args, **kwargs):
 498          """Start a bitcoind"""
 499  
 500          node = self.nodes[i]
 501  
 502          node.start(*args, **kwargs)
 503          node.wait_for_rpc_connection()
 504  
 505          if self.options.coveragedir is not None:
 506              coverage.write_all_rpc_commands(self.options.coveragedir, node._rpc)
 507  
 508      def start_nodes(self, extra_args=None, *args, **kwargs):
 509          """Start multiple bitcoinds"""
 510  
 511          if extra_args is None:
 512              extra_args = [None] * self.num_nodes
 513          assert_equal(len(extra_args), self.num_nodes)
 514          for i, node in enumerate(self.nodes):
 515              node.start(extra_args[i], *args, **kwargs)
 516          for node in self.nodes:
 517              node.wait_for_rpc_connection()
 518  
 519          if self.options.coveragedir is not None:
 520              for node in self.nodes:
 521                  coverage.write_all_rpc_commands(self.options.coveragedir, node._rpc)
 522  
 523      def stop_node(self, i, expected_stderr='', wait=0):
 524          """Stop a bitcoind test node"""
 525          self.nodes[i].stop_node(expected_stderr, wait=wait)
 526  
 527      def stop_nodes(self, wait=0):
 528          """Stop multiple bitcoind test nodes"""
 529          for node in self.nodes:
 530              # Issue RPC to stop nodes
 531              node.stop_node(wait=wait, wait_until_stopped=False)
 532  
 533          for node in self.nodes:
 534              # Wait for nodes to stop
 535              node.wait_until_stopped()
 536  
 537      def restart_node(self, i, extra_args=None, clear_addrman=False, *, expected_stderr=''):
 538          """Stop and start a test node"""
 539          self.stop_node(i, expected_stderr=expected_stderr)
 540          if clear_addrman:
 541              peers_dat = self.nodes[i].chain_path / "peers.dat"
 542              os.remove(peers_dat)
 543              with self.nodes[i].assert_debug_log(expected_msgs=[f'Creating peers.dat because the file was not found ("{peers_dat}")']):
 544                  self.start_node(i, extra_args)
 545          else:
 546              self.start_node(i, extra_args)
 547  
 548      def wait_for_node_exit(self, i, timeout):
 549          self.nodes[i].process.wait(timeout)
 550  
 551      def connect_nodes(self, a, b, *, peer_advertises_v2=None, wait_for_connect: bool = True):
 552          """
 553          Kwargs:
 554              wait_for_connect: if True, block until the nodes are verified as connected. You might
 555                  want to disable this when using -stopatheight with one of the connected nodes,
 556                  since there will be a race between the actual connection and performing
 557                  the assertions before one node shuts down.
 558          """
 559          from_connection = self.nodes[a]
 560          to_connection = self.nodes[b]
 561          ip_port = "127.0.0.1:" + str(p2p_port(b))
 562  
 563          if peer_advertises_v2 is None:
 564              peer_advertises_v2 = from_connection.use_v2transport
 565  
 566          if peer_advertises_v2 != from_connection.use_v2transport:
 567              from_connection.addnode(node=ip_port, command="onetry", v2transport=peer_advertises_v2)
 568          else:
 569              # skip the optional third argument if it matches the default, for
 570              # compatibility with older clients
 571              from_connection.addnode(ip_port, "onetry")
 572  
 573          if not wait_for_connect:
 574              return
 575  
 576          # Use subversion as peer id. Test nodes have their node number appended to the user agent string
 577          from_connection_subver = from_connection.getnetworkinfo()['subversion']
 578          to_connection_subver = to_connection.getnetworkinfo()['subversion']
 579  
 580          def find_conn(node, peer_subversion, inbound):
 581              return next(filter(lambda peer: peer['subver'] == peer_subversion and peer['inbound'] == inbound, node.getpeerinfo()), None)
 582  
 583          self.wait_until(lambda: find_conn(from_connection, to_connection_subver, inbound=False) is not None)
 584          self.wait_until(lambda: find_conn(to_connection, from_connection_subver, inbound=True) is not None)
 585  
 586          def check_bytesrecv(peer, msg_type, min_bytes_recv):
 587              assert peer is not None, "Error: peer disconnected"
 588              return peer['bytesrecv_per_msg'].pop(msg_type, 0) >= min_bytes_recv
 589  
 590          # Poll until version handshake (fSuccessfullyConnected) is complete to
 591          # avoid race conditions, because some message types are blocked from
 592          # being sent or received before fSuccessfullyConnected.
 593          #
 594          # As the flag fSuccessfullyConnected is not exposed, check it by
 595          # waiting for a pong, which can only happen after the flag was set.
 596          self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 29))
 597          self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 29))
 598  
 599      def disconnect_nodes(self, a, b):
 600          def disconnect_nodes_helper(node_a, node_b):
 601              def get_peer_ids(from_connection, node_num):
 602                  result = []
 603                  for peer in from_connection.getpeerinfo():
 604                      if "testnode{}".format(node_num) in peer['subver']:
 605                          result.append(peer['id'])
 606                  return result
 607  
 608              peer_ids = get_peer_ids(node_a, node_b.index)
 609              if not peer_ids:
 610                  self.log.warning("disconnect_nodes: {} and {} were not connected".format(
 611                      node_a.index,
 612                      node_b.index,
 613                  ))
 614                  return
 615              for peer_id in peer_ids:
 616                  try:
 617                      node_a.disconnectnode(nodeid=peer_id)
 618                  except JSONRPCException as e:
 619                      # If this node is disconnected between calculating the peer id
 620                      # and issuing the disconnect, don't worry about it.
 621                      # This avoids a race condition if we're mass-disconnecting peers.
 622                      if e.error['code'] != -29:  # RPC_CLIENT_NODE_NOT_CONNECTED
 623                          raise
 624  
 625              # wait to disconnect
 626              self.wait_until(lambda: not get_peer_ids(node_a, node_b.index), timeout=5)
 627              self.wait_until(lambda: not get_peer_ids(node_b, node_a.index), timeout=5)
 628  
 629          disconnect_nodes_helper(self.nodes[a], self.nodes[b])
 630  
 631      def split_network(self):
 632          """
 633          Split the network of four nodes into nodes 0/1 and 2/3.
 634          """
 635          self.disconnect_nodes(1, 2)
 636          self.sync_all(self.nodes[:2])
 637          self.sync_all(self.nodes[2:])
 638  
 639      def join_network(self):
 640          """
 641          Join the (previously split) network halves together.
 642          """
 643          self.connect_nodes(1, 2)
 644          self.sync_all()
 645  
 646      def no_op(self):
 647          pass
 648  
 649      def generate(self, generator, *args, sync_fun=None, **kwargs):
 650          blocks = generator.generate(*args, called_by_framework=True, **kwargs)
 651          sync_fun() if sync_fun else self.sync_all()
 652          return blocks
 653  
 654      def generateblock(self, generator, *args, sync_fun=None, **kwargs):
 655          blocks = generator.generateblock(*args, called_by_framework=True, **kwargs)
 656          sync_fun() if sync_fun else self.sync_all()
 657          return blocks
 658  
 659      def generatetoaddress(self, generator, *args, sync_fun=None, **kwargs):
 660          blocks = generator.generatetoaddress(*args, called_by_framework=True, **kwargs)
 661          sync_fun() if sync_fun else self.sync_all()
 662          return blocks
 663  
 664      def generatetodescriptor(self, generator, *args, sync_fun=None, **kwargs):
 665          blocks = generator.generatetodescriptor(*args, called_by_framework=True, **kwargs)
 666          sync_fun() if sync_fun else self.sync_all()
 667          return blocks
 668  
 669      def create_outpoints(self, node, *, outputs):
 670          """Send funds to a given list of `{address: amount}` targets using the bitcoind
 671          wallet and return the corresponding outpoints as a list of dictionaries
 672          `[{"txid": txid, "vout": vout1}, {"txid": txid, "vout": vout2}, ...]`.
 673          The result can be used to specify inputs for RPCs like `createrawtransaction`,
 674          `createpsbt`, `lockunspent` etc."""
 675          assert all(len(output.keys()) == 1 for output in outputs)
 676          send_res = node.send(outputs)
 677          assert send_res["complete"]
 678          utxos = []
 679          for output in outputs:
 680              address = list(output.keys())[0]
 681              vout = find_vout_for_address(node, send_res["txid"], address)
 682              utxos.append({"txid": send_res["txid"], "vout": vout})
 683          return utxos
 684  
 685      def sync_blocks(self, nodes=None, wait=1, timeout=60):
 686          """
 687          Wait until everybody has the same tip.
 688          sync_blocks needs to be called with an rpc_connections set that has least
 689          one node already synced to the latest, stable tip, otherwise there's a
 690          chance it might return before all nodes are stably synced.
 691          """
 692          rpc_connections = nodes or self.nodes
 693          timeout = int(timeout * self.options.timeout_factor)
 694          stop_time = time.time() + timeout
 695          while time.time() <= stop_time:
 696              best_hash = [x.getbestblockhash() for x in rpc_connections]
 697              if best_hash.count(best_hash[0]) == len(rpc_connections):
 698                  return
 699              # Check that each peer has at least one connection
 700              assert (all([len(x.getpeerinfo()) for x in rpc_connections]))
 701              time.sleep(wait)
 702          raise AssertionError("Block sync timed out after {}s:{}".format(
 703              timeout,
 704              "".join("\n  {!r}".format(b) for b in best_hash),
 705          ))
 706  
 707      def sync_mempools(self, nodes=None, wait=1, timeout=60, flush_scheduler=True):
 708          """
 709          Wait until everybody has the same transactions in their memory
 710          pools
 711          """
 712          rpc_connections = nodes or self.nodes
 713          timeout = int(timeout * self.options.timeout_factor)
 714          stop_time = time.time() + timeout
 715          while time.time() <= stop_time:
 716              pool = [set(r.getrawmempool()) for r in rpc_connections]
 717              if pool.count(pool[0]) == len(rpc_connections):
 718                  if flush_scheduler:
 719                      for r in rpc_connections:
 720                          r.syncwithvalidationinterfacequeue()
 721                  return
 722              # Check that each peer has at least one connection
 723              assert (all([len(x.getpeerinfo()) for x in rpc_connections]))
 724              time.sleep(wait)
 725          raise AssertionError("Mempool sync timed out after {}s:{}".format(
 726              timeout,
 727              "".join("\n  {!r}".format(m) for m in pool),
 728          ))
 729  
 730      def sync_all(self, nodes=None):
 731          self.sync_blocks(nodes)
 732          self.sync_mempools(nodes)
 733  
 734      def wait_until(self, test_function, timeout=60, check_interval=0.05):
 735          return wait_until_helper_internal(test_function, timeout=timeout, timeout_factor=self.options.timeout_factor, check_interval=check_interval)
 736  
 737      # Private helper methods. These should not be accessed by the subclass test scripts.
 738  
 739      def _start_logging(self):
 740          # Add logger and logging handlers
 741          self.log = logging.getLogger('TestFramework')
 742          self.log.setLevel(logging.DEBUG)
 743          # Create file handler to log all messages
 744          fh = logging.FileHandler(self.options.tmpdir + '/test_framework.log')
 745          fh.setLevel(logging.DEBUG)
 746          # Create console handler to log messages to stderr. By default this logs only error messages, but can be configured with --loglevel.
 747          ch = logging.StreamHandler(sys.stdout)
 748          # User can provide log level as a number or string (eg DEBUG). loglevel was caught as a string, so try to convert it to an int
 749          ll = int(self.options.loglevel) if self.options.loglevel.isdigit() else self.options.loglevel.upper()
 750          ch.setLevel(ll)
 751  
 752          # Format logs the same as bitcoind's debug.log with microprecision (so log files can be concatenated and sorted)
 753          class MicrosecondFormatter(logging.Formatter):
 754              def formatTime(self, record, _=None):
 755                  dt = datetime.fromtimestamp(record.created, timezone.utc)
 756                  return dt.strftime('%Y-%m-%dT%H:%M:%S.%f')
 757  
 758          formatter = MicrosecondFormatter(
 759              fmt='%(asctime)sZ %(name)s (%(levelname)s): %(message)s',
 760          )
 761          fh.setFormatter(formatter)
 762          ch.setFormatter(formatter)
 763          # add the handlers to the logger
 764          self.log.addHandler(fh)
 765          self.log.addHandler(ch)
 766  
 767          if self.options.trace_rpc:
 768              rpc_logger = logging.getLogger("BitcoinRPC")
 769              rpc_logger.setLevel(logging.DEBUG)
 770              rpc_handler = logging.StreamHandler(sys.stdout)
 771              rpc_handler.setLevel(logging.DEBUG)
 772              rpc_logger.addHandler(rpc_handler)
 773  
 774      def _initialize_chain(self):
 775          """Initialize a pre-mined blockchain for use by the test.
 776  
 777          Create a cache of a 199-block-long chain
 778          Afterward, create num_nodes copies from the cache."""
 779  
 780          CACHE_NODE_ID = 0  # Use node 0 to create the cache for all other nodes
 781          cache_node_dir = get_datadir_path(self.options.cachedir, CACHE_NODE_ID)
 782          assert self.num_nodes <= MAX_NODES
 783  
 784          if not os.path.isdir(cache_node_dir):
 785              self.log.debug("Creating cache directory {}".format(cache_node_dir))
 786  
 787              initialize_datadir(self.options.cachedir, CACHE_NODE_ID, self.chain, self.disable_autoconnect)
 788              self.nodes.append(
 789                  TestNode(
 790                      CACHE_NODE_ID,
 791                      cache_node_dir,
 792                      chain=self.chain,
 793                      extra_conf=["bind=127.0.0.1"],
 794                      extra_args=[],
 795                      rpchost=None,
 796                      timewait=self.rpc_timeout,
 797                      timeout_factor=self.options.timeout_factor,
 798                      binaries=self.get_binaries(),
 799                      coverage_dir=None,
 800                      cwd=self.options.tmpdir,
 801                      uses_wallet=self.uses_wallet,
 802                  ))
 803              self.start_node(CACHE_NODE_ID)
 804              cache_node = self.nodes[CACHE_NODE_ID]
 805  
 806              # Wait for RPC connections to be ready
 807              cache_node.wait_for_rpc_connection()
 808  
 809              # Set a time in the past, so that blocks don't end up in the future
 810              cache_node.setmocktime(cache_node.getblockheader(cache_node.getbestblockhash())['time'])
 811  
 812              # Create a 199-block-long chain; each of the 3 first nodes
 813              # gets 25 mature blocks and 25 immature.
 814              # The 4th address gets 25 mature and only 24 immature blocks so that the very last
 815              # block in the cache does not age too much (have an old tip age).
 816              # This is needed so that we are out of IBD when the test starts,
 817              # see the tip age check in IsInitialBlockDownload().
 818              gen_addresses = [k.address for k in TestNode.PRIV_KEYS][:3] + [create_deterministic_address_bcrt1_p2tr_op_true()[0]]
 819              assert_equal(len(gen_addresses), 4)
 820              for i in range(8):
 821                  self.generatetoaddress(
 822                      cache_node,
 823                      nblocks=25 if i != 7 else 24,
 824                      address=gen_addresses[i % len(gen_addresses)],
 825                  )
 826  
 827              assert_equal(cache_node.getblockchaininfo()["blocks"], 199)
 828  
 829              # Shut it down, and clean up cache directories:
 830              self.stop_nodes()
 831              self.nodes = []
 832  
 833              def cache_path(*paths):
 834                  return os.path.join(cache_node_dir, self.chain, *paths)
 835  
 836              os.rmdir(cache_path('wallets'))  # Remove empty wallets dir
 837              for entry in os.listdir(cache_path()):
 838                  if entry not in ['chainstate', 'blocks', 'indexes']:  # Only indexes, chainstate and blocks folders
 839                      os.remove(cache_path(entry))
 840  
 841          for i in range(self.num_nodes):
 842              self.log.debug("Copy cache directory {} to node {}".format(cache_node_dir, i))
 843              to_dir = get_datadir_path(self.options.tmpdir, i)
 844              shutil.copytree(cache_node_dir, to_dir)
 845              initialize_datadir(self.options.tmpdir, i, self.chain, self.disable_autoconnect)  # Overwrite port/rpcport in bitcoin.conf
 846  
 847      def _initialize_chain_clean(self):
 848          """Initialize empty blockchain for use by the test.
 849  
 850          Create an empty blockchain and num_nodes wallets.
 851          Useful if a test case wants complete control over initialization."""
 852          for i in range(self.num_nodes):
 853              initialize_datadir(self.options.tmpdir, i, self.chain, self.disable_autoconnect)
 854  
 855      def skip_if_no_py3_zmq(self):
 856          """Attempt to import the zmq package and skip the test if the import fails."""
 857          try:
 858              import zmq  # noqa
 859          except ImportError:
 860              raise SkipTest("python3-zmq module not available.")
 861  
 862      def skip_if_no_py_sqlite3(self):
 863          """Attempt to import the sqlite3 package and skip the test if the import fails."""
 864          try:
 865              import sqlite3  # noqa
 866          except ImportError:
 867              raise SkipTest("sqlite3 module not available.")
 868  
 869      def skip_if_no_py_capnp(self):
 870          """Attempt to import the capnp package and skip the test if the import fails."""
 871          try:
 872              import capnp  # type: ignore[import] # noqa: F401
 873          except ImportError:
 874              raise SkipTest("capnp module not available.")
 875  
 876      def skip_if_no_python_bcc(self):
 877          """Attempt to import the bcc package and skip the tests if the import fails."""
 878          try:
 879              import bcc  # type: ignore[import] # noqa: F401
 880          except ImportError:
 881              raise SkipTest("bcc python module not available")
 882  
 883      def skip_if_no_bitcoind_tracepoints(self):
 884          """Skip the running test if bitcoind has not been compiled with USDT tracepoint support."""
 885          if not self.is_usdt_compiled():
 886              raise SkipTest("bitcoind has not been built with USDT tracepoints enabled.")
 887  
 888      def skip_if_no_bpf_permissions(self):
 889          """Skip the running test if we don't have permissions to do BPF syscalls and load BPF maps."""
 890          # check for 'root' permissions
 891          if os.geteuid() != 0:
 892              raise SkipTest("no permissions to use BPF (please review the tests carefully before running them with higher privileges)")
 893  
 894      def skip_if_platform_not_linux(self):
 895          """Skip the running test if we are not on a Linux platform"""
 896          if platform.system() != "Linux":
 897              raise SkipTest("not on a Linux system")
 898  
 899      def skip_if_platform_not_posix(self):
 900          """Skip the running test if we are not on a POSIX platform"""
 901          if os.name != 'posix':
 902              raise SkipTest("not on a POSIX system")
 903  
 904      def skip_if_no_bitcoind_zmq(self):
 905          """Skip the running test if bitcoind has not been compiled with zmq support."""
 906          if not self.is_zmq_compiled():
 907              raise SkipTest("bitcoind has not been built with zmq enabled.")
 908  
 909      def skip_if_no_wallet(self):
 910          """Skip the running test if wallet has not been compiled."""
 911          self.uses_wallet = True
 912          if not self.is_wallet_compiled():
 913              raise SkipTest("wallet has not been compiled.")
 914  
 915      def skip_if_no_wallet_tool(self):
 916          """Skip the running test if bitcoin-wallet has not been compiled."""
 917          if not self.is_wallet_tool_compiled():
 918              raise SkipTest("bitcoin-wallet has not been compiled")
 919  
 920      def skip_if_no_bitcoin_tx(self):
 921          """Skip the running test if bitcoin-tx has not been compiled."""
 922          if not self.is_bitcoin_tx_compiled():
 923              raise SkipTest("bitcoin-tx has not been compiled")
 924  
 925      def skip_if_no_bitcoin_util(self):
 926          """Skip the running test if bitcoin-util has not been compiled."""
 927          if not self.is_bitcoin_util_compiled():
 928              raise SkipTest("bitcoin-util has not been compiled")
 929  
 930      def skip_if_no_bitcoin_chainstate(self):
 931          """Skip the running test if bitcoin-chainstate has not been compiled."""
 932          if not self.is_bitcoin_chainstate_compiled():
 933              raise SkipTest("bitcoin-chainstate has not been compiled")
 934  
 935      def skip_if_no_bitcoin_bench(self):
 936          """Skip the running test if bench_bitcoin has not been compiled."""
 937          if not self.is_bench_compiled():
 938              raise SkipTest("bench_bitcoin has not been compiled")
 939  
 940      def skip_if_no_cli(self):
 941          """Skip the running test if bitcoin-cli has not been compiled."""
 942          if not self.is_cli_compiled():
 943              raise SkipTest("bitcoin-cli has not been compiled.")
 944  
 945      def skip_if_no_ipc(self):
 946          """Skip the running test if ipc is not compiled."""
 947          if not self.is_ipc_compiled():
 948              raise SkipTest("ipc has not been compiled.")
 949  
 950      def skip_if_no_previous_releases(self):
 951          """Skip the running test if previous releases are not available."""
 952          if not self.has_previous_releases():
 953              raise SkipTest("previous releases not available or disabled")
 954  
 955      def has_previous_releases(self):
 956          """Checks whether previous releases are present and enabled."""
 957          if not os.path.isdir(self.options.previous_releases_path):
 958              if self.options.prev_releases:
 959                  raise AssertionError(f"Force test of previous releases but releases missing: {self.options.previous_releases_path}\n"
 960                                       "Previous releases binaries can be downloaded via `test/get_previous_releases.py`.")
 961          return self.options.prev_releases
 962  
 963      def skip_if_no_external_signer(self):
 964          """Skip the running test if external signer support has not been compiled."""
 965          if not self.is_external_signer_compiled():
 966              raise SkipTest("external signer support has not been compiled.")
 967  
 968      def skip_if_running_under_valgrind(self):
 969          """Skip the running test if Valgrind is being used."""
 970          if self.options.valgrind:
 971              raise SkipTest("This test is not compatible with Valgrind.")
 972  
 973      def is_bench_compiled(self):
 974          """Checks whether bench_bitcoin was compiled."""
 975          return self.config["components"].getboolean("BUILD_BENCH")
 976  
 977      def is_cli_compiled(self):
 978          """Checks whether bitcoin-cli was compiled."""
 979          return self.config["components"].getboolean("ENABLE_CLI")
 980  
 981      def is_external_signer_compiled(self):
 982          """Checks whether external signer support was compiled."""
 983          return self.config["components"].getboolean("ENABLE_EXTERNAL_SIGNER")
 984  
 985      def is_wallet_compiled(self):
 986          """Checks whether the wallet module was compiled."""
 987          return self.config["components"].getboolean("ENABLE_WALLET")
 988  
 989      def is_wallet_tool_compiled(self):
 990          """Checks whether bitcoin-wallet was compiled."""
 991          return self.config["components"].getboolean("ENABLE_WALLET_TOOL")
 992  
 993      def is_bitcoin_tx_compiled(self):
 994          """Checks whether bitcoin-tx was compiled."""
 995          return self.config["components"].getboolean("BUILD_BITCOIN_TX")
 996  
 997      def is_bitcoin_util_compiled(self):
 998          """Checks whether bitcoin-util was compiled."""
 999          return self.config["components"].getboolean("ENABLE_BITCOIN_UTIL")
1000  
1001      def is_bitcoin_chainstate_compiled(self):
1002          """Checks whether bitcoin-chainstate was compiled."""
1003          return self.config["components"].getboolean("ENABLE_BITCOIN_CHAINSTATE")
1004  
1005      def is_zmq_compiled(self):
1006          """Checks whether the zmq module was compiled."""
1007          return self.config["components"].getboolean("ENABLE_ZMQ")
1008  
1009      def is_embedded_asmap_compiled(self):
1010          """Checks whether ASMap data was embedded during compilation."""
1011          return self.config["components"].getboolean("ENABLE_EMBEDDED_ASMAP")
1012  
1013      def is_usdt_compiled(self):
1014          """Checks whether the USDT tracepoints were compiled."""
1015          return self.config["components"].getboolean("ENABLE_USDT_TRACEPOINTS")
1016  
1017      def is_ipc_compiled(self):
1018          """Checks whether ipc was compiled."""
1019          return self.config["components"].getboolean("ENABLE_IPC")
1020  
1021      def has_blockfile(self, node, filenum: str):
1022          return (node.blocks_path/ f"blk{filenum}.dat").is_file()
1023  
1024      def inspect_sqlite_db(self, path, fn, *args, **kwargs):
1025          try:
1026              import sqlite3 # type: ignore[import]
1027              conn = sqlite3.connect(path)
1028              with conn:
1029                  result = fn(conn, *args, **kwargs)
1030              conn.close()
1031              return result
1032          except ImportError:
1033              self.log.warning("sqlite3 module not available, skipping tests that inspect the database")