test_node.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Class for bitcoind node under test""" 6 7 import contextlib 8 import decimal 9 import errno 10 from enum import Enum 11 import json 12 import logging 13 import os 14 import pathlib 15 import platform 16 import re 17 import subprocess 18 import tempfile 19 import time 20 import urllib.parse 21 import collections 22 import shlex 23 import shutil 24 import sys 25 from pathlib import Path 26 27 from .authproxy import ( 28 JSONRPCException, 29 serialization_fallback, 30 ) 31 from .messages import NODE_P2P_V2 32 from .p2p import P2P_SERVICES, P2P_SUBVERSION 33 from .util import ( 34 MAX_NODES, 35 assert_equal, 36 assert_not_equal, 37 append_config, 38 delete_cookie_file, 39 get_auth_cookie, 40 get_rpc_proxy, 41 rpc_url, 42 wait_until_helper_internal, 43 p2p_port, 44 tor_port, 45 ) 46 47 BITCOIND_PROC_WAIT_TIMEOUT = 60 48 # The size of the blocks xor key 49 # from InitBlocksdirXorKey::xor_key.size() 50 NUM_XOR_BYTES = 8 51 # Many systems have a 128kB limit for a command size. Depending on the 52 # platform, this limit may be larger or smaller. Moreover, when using the 53 # 'bitcoin' command, it may internally insert more args, which must be 54 # accounted for. There is no need to pick the largest possible value here 55 # anyway and it should be fine to set it to 1kB in tests. 56 TEST_CLI_MAX_ARG_SIZE = 1024 57 58 # The null blocks key (all 0s) 59 NULL_BLK_XOR_KEY = bytes([0] * NUM_XOR_BYTES) 60 BITCOIN_PID_FILENAME_DEFAULT = "bitcoind.pid" 61 62 if sys.platform.startswith("linux"): 63 UNIX_PATH_MAX = 108 # includes the trailing NUL 64 elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")): 65 UNIX_PATH_MAX = 104 66 else: # safest portable value 67 UNIX_PATH_MAX = 92 68 69 70 class FailedToStartError(Exception): 71 """Raised when a node fails to start correctly.""" 72 73 74 class ErrorMatch(Enum): 75 FULL_TEXT = 1 76 FULL_REGEX = 2 77 PARTIAL_REGEX = 3 78 79 80 class TestNode(): 81 """A class for representing a bitcoind node under test. 82 83 This class contains: 84 85 - state about the node (whether it's running, etc) 86 - a Python subprocess.Popen object representing the running process 87 - an RPC connection to the node 88 - one or more P2P connections to the node 89 90 91 To make things easier for the test writer, any unrecognised messages will 92 be dispatched to the RPC connection.""" 93 94 def __init__(self, i, datadir_path, *, chain, rpchost, timewait, timeout_factor, binaries, coverage_dir, cwd, extra_conf=None, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, v2transport=False, uses_wallet=False, ipcbind=False): 95 """ 96 Kwargs: 97 start_perf (bool): If True, begin profiling the node with `perf` as soon as 98 the node starts. 99 """ 100 101 self.index = i 102 self.p2p_conn_index = 1 103 self.datadir_path = datadir_path 104 self.bitcoinconf = self.datadir_path / "bitcoin.conf" 105 self.stdout_dir = self.datadir_path / "stdout" 106 self.stderr_dir = self.datadir_path / "stderr" 107 self.chain = chain 108 self.rpchost = rpchost 109 self.rpc_timeout = timewait # Already multiplied by timeout_factor 110 self.timeout_factor = timeout_factor 111 self.binaries = binaries 112 self.coverage_dir = coverage_dir 113 self.cwd = cwd 114 self.has_explicit_bind = False 115 if extra_conf is not None: 116 append_config(self.datadir_path, extra_conf) 117 # Remember if there is bind=... in the config file. 118 self.has_explicit_bind = any(e.startswith("bind=") for e in extra_conf) 119 # Most callers will just need to add extra args to the standard list below. 120 # For those callers that need more flexibility, they can just set the args property directly. 121 # Note that common args are set in the config file (see initialize_datadir) 122 self.extra_args = extra_args 123 self.version = version 124 # Configuration for logging is set as command-line args rather than in the bitcoin.conf file. 125 # This means that starting a bitcoind using the temp dir to debug a failed test won't 126 # spam debug.log. 127 self.args = self.binaries.node_argv(need_ipc=ipcbind) + [ 128 f"-datadir={self.datadir_path}", 129 "-logtimemicros", 130 "-debug", 131 "-debugexclude=libevent", 132 "-debugexclude=leveldb", 133 "-debugexclude=rand", 134 "-uacomment=testnode%d" % i, # required for subversion uniqueness across peers 135 ] 136 if uses_wallet is not None and not uses_wallet: 137 self.args.append("-disablewallet") 138 139 self.ipc_tmp_dir = None 140 if ipcbind: 141 self.ipc_socket_path = self.chain_path / "node.sock" 142 if len(os.fsencode(self.ipc_socket_path)) < UNIX_PATH_MAX: 143 self.args.append("-ipcbind=unix") 144 else: 145 # Work around default CI path exceeding maximum socket path length. 146 self.ipc_tmp_dir = pathlib.Path(tempfile.mkdtemp(prefix="test-ipc-")) 147 self.ipc_socket_path = self.ipc_tmp_dir / "node.sock" 148 self.args.append(f"-ipcbind=unix:{self.ipc_socket_path}") 149 150 # Use valgrind, expect for previous release binaries 151 if use_valgrind and version is None: 152 default_suppressions_file = Path(__file__).parents[3] / "contrib" / "valgrind.supp" 153 suppressions_file = os.getenv("VALGRIND_SUPPRESSIONS_FILE", 154 default_suppressions_file) 155 self.args = ["valgrind", "--suppressions={}".format(suppressions_file), 156 "--gen-suppressions=all", "--exit-on-first-error=yes", 157 "--error-exitcode=1", "--quiet"] + self.args 158 159 if self.version_is_at_least(190000): 160 self.args.append("-logthreadnames") 161 if self.version_is_at_least(219900): 162 self.args.append("-logsourcelocations") 163 if self.version_is_at_least(239000): 164 self.args.append("-loglevel=trace") 165 if self.version_is_at_least(290100): 166 self.args.append("-nologratelimit") 167 168 # Default behavior from global -v2transport flag is added to args to persist it over restarts. 169 # May be overwritten in individual tests, using extra_args. 170 self.default_to_v2 = v2transport 171 if self.version_is_at_least(260000): 172 # 26.0 and later support v2transport 173 if v2transport: 174 self.args.append("-v2transport=1") 175 else: 176 self.args.append("-v2transport=0") 177 # if v2transport is requested via global flag but not supported for node version, ignore it 178 179 self.cli = TestNodeCLI( 180 binaries, 181 self.datadir_path, 182 self.rpc_timeout // 2, # timeout identical to the one used in self._rpc 183 ) 184 self.use_cli = use_cli 185 self.start_perf = start_perf 186 187 self.running = False 188 self.process = None 189 self.rpc_connected = False 190 self._rpc = None # Should usually not be accessed directly in tests to allow for --usecli mode 191 self.reuse_http_connections = True # Must be set before calling get_rpc_proxy() i.e. before restarting node 192 self.url = None 193 self.log = logging.getLogger('TestFramework.node%d' % i) 194 # Cache perf subprocesses here by their data output filename. 195 self.perf_subprocesses = {} 196 197 self.p2ps = [] 198 199 self.mocktime = None 200 201 AddressKeyPair = collections.namedtuple('AddressKeyPair', ['address', 'key']) 202 PRIV_KEYS = [ 203 # address , privkey 204 AddressKeyPair('mjTkW3DjgyZck4KbiRusZsqTgaYTxdSz6z', 'cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW'), 205 AddressKeyPair('msX6jQXvxiNhx3Q62PKeLPrhrqZQdSimTg', 'cUxsWyKyZ9MAQTaAhUQWJmBbSvHMwSmuv59KgxQV7oZQU3PXN3KE'), 206 AddressKeyPair('mnonCMyH9TmAsSj3M59DsbH8H63U3RKoFP', 'cTrh7dkEAeJd6b3MRX9bZK8eRmNqVCMH3LSUkE3dSFDyzjU38QxK'), 207 AddressKeyPair('mqJupas8Dt2uestQDvV2NH3RU8uZh2dqQR', 'cVuKKa7gbehEQvVq717hYcbE9Dqmq7KEBKqWgWrYBa2CKKrhtRim'), 208 AddressKeyPair('msYac7Rvd5ywm6pEmkjyxhbCDKqWsVeYws', 'cQDCBuKcjanpXDpCqacNSjYfxeQj8G6CAtH1Dsk3cXyqLNC4RPuh'), 209 AddressKeyPair('n2rnuUnwLgXqf9kk2kjvVm8R5BZK1yxQBi', 'cQakmfPSLSqKHyMFGwAqKHgWUiofJCagVGhiB4KCainaeCSxeyYq'), 210 AddressKeyPair('myzuPxRwsf3vvGzEuzPfK9Nf2RfwauwYe6', 'cQMpDLJwA8DBe9NcQbdoSb1BhmFxVjWD5gRyrLZCtpuF9Zi3a9RK'), 211 AddressKeyPair('mumwTaMtbxEPUswmLBBN3vM9oGRtGBrys8', 'cSXmRKXVcoouhNNVpcNKFfxsTsToY5pvB9DVsFksF1ENunTzRKsy'), 212 AddressKeyPair('mpV7aGShMkJCZgbW7F6iZgrvuPHjZjH9qg', 'cSoXt6tm3pqy43UMabY6eUTmR3eSUYFtB2iNQDGgb3VUnRsQys2k'), 213 AddressKeyPair('mq4fBNdckGtvY2mijd9am7DRsbRB4KjUkf', 'cN55daf1HotwBAgAKWVgDcoppmUNDtQSfb7XLutTLeAgVc3u8hik'), 214 AddressKeyPair('mpFAHDjX7KregM3rVotdXzQmkbwtbQEnZ6', 'cT7qK7g1wkYEMvKowd2ZrX1E5f6JQ7TM246UfqbCiyF7kZhorpX3'), 215 AddressKeyPair('mzRe8QZMfGi58KyWCse2exxEFry2sfF2Y7', 'cPiRWE8KMjTRxH1MWkPerhfoHFn5iHPWVK5aPqjW8NxmdwenFinJ'), 216 ] 217 218 def get_deterministic_priv_key(self): 219 """Return a deterministic priv key in base58, that only depends on the node's index""" 220 assert len(self.PRIV_KEYS) == MAX_NODES 221 return self.PRIV_KEYS[self.index] 222 223 def _node_msg(self, msg: str) -> str: 224 """Return a modified msg that identifies this node by its index as a debugging aid.""" 225 return "[node %d] %s" % (self.index, msg) 226 227 def _raise_assertion_error(self, msg: str): 228 """Raise an AssertionError with msg modified to identify this node.""" 229 raise AssertionError(self._node_msg(msg)) 230 231 def __del__(self): 232 # Ensure that we don't leave any bitcoind processes lying around after 233 # the test ends 234 if self.process: 235 # Should only happen on test failure 236 # Avoid using logger, as that may have already been shutdown when 237 # this destructor is called. 238 print(self._node_msg("Cleaning up leftover process"), file=sys.stderr) 239 self.process.kill() 240 if self.ipc_tmp_dir: 241 print(self._node_msg(f"Cleaning up ipc directory {str(self.ipc_tmp_dir)!r}")) 242 shutil.rmtree(self.ipc_tmp_dir) 243 244 def __getattr__(self, name): 245 """Dispatches any unrecognised messages to the RPC connection or a CLI instance.""" 246 if self.use_cli: 247 return getattr(self.cli, name) 248 else: 249 assert self.rpc_connected and self._rpc is not None, self._node_msg("Error: no RPC connection") 250 return getattr(self._rpc, name) 251 252 def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, env=None, **kwargs): 253 """Start the node.""" 254 if extra_args is None: 255 extra_args = self.extra_args 256 257 # If listening and no -bind is given, then bitcoind would bind P2P ports on 258 # 0.0.0.0:P and 127.0.0.1:P+1 (for incoming Tor connections), where P is 259 # a unique port chosen by the test framework and configured as port=P in 260 # bitcoin.conf. To avoid collisions, change it to 127.0.0.1:tor_port(). 261 will_listen = all(e != "-nolisten" and e != "-listen=0" for e in extra_args) 262 has_explicit_bind = self.has_explicit_bind or any(e.startswith("-bind=") for e in extra_args) 263 if will_listen and not has_explicit_bind: 264 extra_args.append(f"-bind=0.0.0.0:{p2p_port(self.index)}") 265 extra_args.append(f"-bind=127.0.0.1:{tor_port(self.index)}=onion") 266 267 self.use_v2transport = "-v2transport=1" in extra_args or (self.default_to_v2 and "-v2transport=0" not in extra_args) 268 269 # Add a new stdout and stderr file each time bitcoind is started 270 if stderr is None: 271 stderr = tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) 272 if stdout is None: 273 stdout = tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) 274 self.stderr = stderr 275 self.stdout = stdout 276 277 if cwd is None: 278 cwd = self.cwd 279 280 # Delete any existing cookie file -- if such a file exists (eg due to 281 # unclean shutdown), it will get overwritten anyway by bitcoind, and 282 # potentially interfere with our attempt to authenticate 283 delete_cookie_file(self.datadir_path, self.chain) 284 285 # add environment variable LIBC_FATAL_STDERR_=1 so that libc errors are written to stderr and not the terminal 286 subp_env = dict(os.environ, LIBC_FATAL_STDERR_="1") 287 if env is not None: 288 subp_env.update(env) 289 290 self.process = subprocess.Popen(self.args + extra_args, env=subp_env, stdout=stdout, stderr=stderr, cwd=cwd, **kwargs) 291 292 self.running = True 293 self.log.debug("bitcoind started, waiting for RPC to come up") 294 295 if self.start_perf: 296 self._start_perf() 297 298 def wait_for_rpc_connection(self, *, wait_for_import=True): 299 """Sets up an RPC connection to the bitcoind process. Returns False if unable to connect.""" 300 # Poll at a rate of four times per second 301 poll_per_s = 4 302 303 suppressed_errors = collections.defaultdict(int) 304 latest_error = None 305 def suppress_error(category: str, e: Exception): 306 suppressed_errors[category] += 1 307 return (category, repr(e)) 308 309 for _ in range(poll_per_s * self.rpc_timeout): 310 if self.process.poll() is not None: 311 # Attach abrupt shutdown error/s to the exception message 312 self.stderr.seek(0) 313 str_error = ''.join(line.decode('utf-8') for line in self.stderr) 314 str_error += "************************\n" if str_error else '' 315 316 raise FailedToStartError(self._node_msg( 317 f'bitcoind exited with status {self.process.returncode} during initialization. {str_error}')) 318 try: 319 rpc = get_rpc_proxy( 320 rpc_url(self.datadir_path, self.index, self.chain, self.rpchost), 321 self.index, 322 timeout=self.rpc_timeout // 2, # Shorter timeout to allow for one retry in case of ETIMEDOUT 323 coveragedir=self.coverage_dir, 324 ) 325 rpc.auth_service_proxy_instance.reuse_http_connections = self.reuse_http_connections 326 rpc.getblockcount() 327 # If the call to getblockcount() succeeds then the RPC connection is up 328 if self.version_is_at_least(190000) and wait_for_import: 329 # getmempoolinfo.loaded is available since commit 330 # bb8ae2c (version 0.19.0) 331 self.wait_until(lambda: rpc.getmempoolinfo()['loaded']) 332 # Wait for the node to finish reindex, block import, and 333 # loading the mempool. Usually importing happens fast or 334 # even "immediate" when the node is started. However, there 335 # is no guarantee and sometimes ImportBlocks might finish 336 # later. This is going to cause intermittent test failures, 337 # because generally the tests assume the node is fully 338 # ready after being started. 339 # 340 # For example, the node will reject block messages from p2p 341 # when it is still importing with the error "Unexpected 342 # block message received" 343 # 344 # The wait is done here to make tests as robust as possible 345 # and prevent racy tests and intermittent failures as much 346 # as possible. Some tests might not need this, but the 347 # overhead is trivial, and the added guarantees are worth 348 # the minimal performance cost. 349 self.log.debug("RPC successfully started") 350 # Set rpc_connected even if we are in use_cli mode so that we know we can call self.stop() if needed. 351 self.rpc_connected = True 352 if self.use_cli: 353 return 354 self._rpc = rpc 355 self.url = self._rpc.rpc_url 356 return 357 except JSONRPCException as e: 358 # Suppress these as they are expected during initialization. 359 # -28 RPC in warmup 360 # -342 Service unavailable, could be starting up or shutting down 361 if e.error['code'] not in [-28, -342]: 362 raise # unknown JSON RPC exception 363 latest_error = suppress_error(f"JSONRPCException {e.error['code']}", e) 364 except OSError as e: 365 error_num = e.errno 366 # Work around issue where socket timeouts don't have errno set. 367 # https://github.com/python/cpython/issues/109601 368 if error_num is None and isinstance(e, TimeoutError): 369 error_num = errno.ETIMEDOUT 370 371 # Suppress similarly to the above JSONRPCException errors. 372 if error_num not in [ 373 errno.ECONNRESET, # This might happen when the RPC server is in warmup, 374 # but shut down before the call to getblockcount succeeds. 375 errno.ETIMEDOUT, # Treat identical to ECONNRESET 376 errno.ECONNREFUSED # Port not yet open? 377 ]: 378 raise # unknown OS error 379 latest_error = suppress_error(f"OSError {errno.errorcode[error_num]}", e) 380 except ValueError as e: 381 # Suppress if cookie file isn't generated yet and no rpcuser or rpcpassword; bitcoind may be starting. 382 if "No RPC credentials" not in str(e): 383 raise 384 latest_error = suppress_error("missing_credentials", e) 385 time.sleep(1.0 / poll_per_s) 386 self._raise_assertion_error(f"Unable to connect to bitcoind after {self.rpc_timeout}s (ignored errors: {dict(suppressed_errors)!s}{'' if latest_error is None else f', latest: {latest_error[0]!r}/{latest_error[1]}'})") 387 388 def wait_for_cookie_credentials(self): 389 """Ensures auth cookie credentials can be read, e.g. for testing CLI with -rpcwait before RPC connection is up.""" 390 self.log.debug("Waiting for cookie credentials") 391 # Poll at a rate of four times per second. 392 poll_per_s = 4 393 for _ in range(poll_per_s * self.rpc_timeout): 394 try: 395 get_auth_cookie(self.datadir_path, self.chain) 396 self.log.debug("Cookie credentials successfully retrieved") 397 return 398 except ValueError: # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting 399 pass # so we continue polling until RPC credentials are retrieved 400 time.sleep(1.0 / poll_per_s) 401 self._raise_assertion_error("Unable to retrieve cookie credentials after {}s".format(self.rpc_timeout)) 402 403 def generate(self, nblocks, maxtries=1000000, **kwargs): 404 self.log.debug("TestNode.generate() dispatches `generate` call to `generatetoaddress`") 405 return self.generatetoaddress(nblocks=nblocks, address=self.get_deterministic_priv_key().address, maxtries=maxtries, **kwargs) 406 407 def generateblock(self, *args, called_by_framework, **kwargs): 408 assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly." 409 return self.__getattr__('generateblock')(*args, **kwargs) 410 411 def generatetoaddress(self, *args, called_by_framework, **kwargs): 412 assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly." 413 return self.__getattr__('generatetoaddress')(*args, **kwargs) 414 415 def generatetodescriptor(self, *args, called_by_framework, **kwargs): 416 assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly." 417 return self.__getattr__('generatetodescriptor')(*args, **kwargs) 418 419 def setmocktime(self, timestamp): 420 """Wrapper for setmocktime RPC, sets self.mocktime""" 421 if timestamp == 0: 422 # setmocktime(0) resets to system time. 423 self.mocktime = None 424 else: 425 self.mocktime = timestamp 426 return self.__getattr__('setmocktime')(timestamp) 427 428 def get_wallet_rpc(self, wallet_name): 429 if self.use_cli: 430 return self.cli("-rpcwallet={}".format(wallet_name)) 431 else: 432 assert self.rpc_connected and self._rpc, self._node_msg("RPC not connected") 433 wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name)) 434 return self._rpc / wallet_path 435 436 def version_is_at_least(self, ver): 437 return self.version is None or self.version >= ver 438 439 def stop_node(self, expected_stderr='', *, wait=0, wait_until_stopped=True): 440 """Stop the node.""" 441 if not self.running: 442 return 443 assert self.rpc_connected, self._node_msg( 444 "Should only call stop_node() on a running node after wait_for_rpc_connection() succeeded. " 445 f"Did you forget to call the latter after start()? Not connected to process: {self.process.pid}") 446 self.log.debug("Stopping node") 447 # Do not use wait argument when testing older nodes, e.g. in wallet_backwards_compatibility.py 448 if self.version_is_at_least(180000): 449 self.stop(wait=wait) 450 else: 451 self.stop() 452 453 # If there are any running perf processes, stop them. 454 for profile_name in tuple(self.perf_subprocesses.keys()): 455 self._stop_perf(profile_name) 456 457 del self.p2ps[:] 458 459 assert (not expected_stderr) or wait_until_stopped # Must wait to check stderr 460 if wait_until_stopped: 461 self.wait_until_stopped(expected_stderr=expected_stderr) 462 463 def is_node_stopped(self, *, expected_stderr="", expected_ret_code=0): 464 """Checks whether the node has stopped. 465 466 Returns True if the node has stopped. False otherwise. 467 This method is responsible for freeing resources (self.process).""" 468 if not self.running: 469 return True 470 return_code = self.process.poll() 471 if return_code is None: 472 return False 473 474 # process has stopped. Assert that it didn't return an error code. 475 assert return_code == expected_ret_code, self._node_msg( 476 f"Node returned unexpected exit code ({return_code}) vs ({expected_ret_code}) when stopping") 477 # Check that stderr is as expected 478 self.stderr.seek(0) 479 stderr = self.stderr.read().decode('utf-8').strip() 480 if stderr != expected_stderr: 481 raise AssertionError("Unexpected stderr {} != {}".format(stderr, expected_stderr)) 482 483 self.stdout.close() 484 self.stderr.close() 485 486 self.running = False 487 self.process = None 488 self.rpc_connected = False 489 self._rpc = None 490 self.log.debug("Node stopped") 491 return True 492 493 def wait_until_stopped(self, *, timeout=BITCOIND_PROC_WAIT_TIMEOUT, expect_error=False, **kwargs): 494 if "expected_ret_code" not in kwargs: 495 kwargs["expected_ret_code"] = 1 if expect_error else 0 # Whether node shutdown return EXIT_FAILURE or EXIT_SUCCESS 496 self.wait_until(lambda: self.is_node_stopped(**kwargs), timeout=timeout) 497 498 def kill_process(self): 499 self.process.kill() 500 self.wait_until_stopped(expected_ret_code=1 if platform.system() == "Windows" else -9) 501 assert self.is_node_stopped() 502 503 def replace_in_config(self, replacements): 504 """ 505 Perform replacements in the configuration file. 506 The substitutions are passed as a list of search-replace-tuples, e.g. 507 [("old", "new"), ("foo", "bar"), ...] 508 """ 509 with open(self.bitcoinconf, 'r') as conf: 510 conf_data = conf.read() 511 for replacement in replacements: 512 assert_equal(len(replacement), 2) 513 old, new = replacement[0], replacement[1] 514 conf_data = conf_data.replace(old, new) 515 with open(self.bitcoinconf, 'w') as conf: 516 conf.write(conf_data) 517 518 @property 519 def chain_path(self) -> Path: 520 return self.datadir_path / self.chain 521 522 @property 523 def debug_log_path(self) -> Path: 524 return self.chain_path / 'debug.log' 525 526 @property 527 def blocks_path(self) -> Path: 528 return self.chain_path / "blocks" 529 530 @property 531 def blocks_key_path(self) -> Path: 532 return self.blocks_path / "xor.dat" 533 534 def read_xor_key(self) -> bytes: 535 with open(self.blocks_key_path, "rb") as xor_f: 536 return xor_f.read(NUM_XOR_BYTES) 537 538 @property 539 def wallets_path(self) -> Path: 540 return self.chain_path / "wallets" 541 542 def debug_log_size(self, **kwargs) -> int: 543 with open(self.debug_log_path, **kwargs) as dl: 544 dl.seek(0, 2) 545 return dl.tell() 546 547 @contextlib.contextmanager 548 def assert_debug_log(self, expected_msgs, unexpected_msgs=None, timeout=2): 549 if unexpected_msgs is None: 550 unexpected_msgs = [] 551 assert_equal(type(expected_msgs), list) 552 assert_equal(type(unexpected_msgs), list) 553 554 time_end = time.time() + timeout * self.timeout_factor 555 prev_size = self.debug_log_size(encoding="utf-8") # Must use same encoding that is used to read() below 556 557 yield 558 559 while True: 560 found = True 561 with open(self.debug_log_path, encoding="utf-8", errors="replace") as dl: 562 dl.seek(prev_size) 563 log = dl.read() 564 print_log = " - " + "\n - ".join(log.splitlines()) 565 for unexpected_msg in unexpected_msgs: 566 if re.search(re.escape(unexpected_msg), log, flags=re.MULTILINE): 567 self._raise_assertion_error('Unexpected message "{}" partially matches log:\n\n{}\n\n'.format(unexpected_msg, print_log)) 568 for expected_msg in expected_msgs: 569 if re.search(re.escape(expected_msg), log, flags=re.MULTILINE) is None: 570 found = False 571 if found: 572 return 573 if time.time() >= time_end: 574 break 575 time.sleep(0.05) 576 self._raise_assertion_error('Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(str(expected_msgs), print_log)) 577 578 @contextlib.contextmanager 579 def busy_wait_for_debug_log(self, expected_msgs, timeout=60): 580 """ 581 Block until we see a particular debug log message fragment or until we exceed the timeout. 582 Return: 583 the number of log lines we encountered when matching 584 """ 585 time_end = time.time() + timeout * self.timeout_factor 586 prev_size = self.debug_log_size(mode="rb") # Must use same mode that is used to read() below 587 588 yield 589 590 while True: 591 found = True 592 with open(self.debug_log_path, "rb") as dl: 593 dl.seek(prev_size) 594 log = dl.read() 595 596 for expected_msg in expected_msgs: 597 if expected_msg not in log: 598 found = False 599 600 if found: 601 return 602 603 if time.time() >= time_end: 604 print_log = " - " + "\n - ".join(log.decode("utf8", errors="replace").splitlines()) 605 break 606 607 # No sleep here because we want to detect the message fragment as fast as 608 # possible. 609 610 self._raise_assertion_error( 611 'Expected messages "{}" does not partially match log:\n\n{}\n\n'.format( 612 str(expected_msgs), print_log)) 613 614 @contextlib.contextmanager 615 def wait_for_new_peer(self, timeout=5): 616 """ 617 Wait until the node is connected to at least one new peer. We detect this 618 by watching for an increased highest peer id, using the `getpeerinfo` RPC call. 619 Note that the simpler approach of only accounting for the number of peers 620 suffers from race conditions, as disconnects from unrelated previous peers 621 could happen anytime in-between. 622 """ 623 def get_highest_peer_id(): 624 peer_info = self.getpeerinfo() 625 return peer_info[-1]["id"] if peer_info else -1 626 627 initial_peer_id = get_highest_peer_id() 628 yield 629 self.wait_until(lambda: get_highest_peer_id() > initial_peer_id, timeout=timeout) 630 631 @contextlib.contextmanager 632 def profile_with_perf(self, profile_name: str): 633 """ 634 Context manager that allows easy profiling of node activity using `perf`. 635 636 See `test/functional/README.md` for details on perf usage. 637 638 Args: 639 profile_name: This string will be appended to the 640 profile data filename generated by perf. 641 """ 642 subp = self._start_perf(profile_name) 643 644 yield 645 646 if subp: 647 self._stop_perf(profile_name) 648 649 def _start_perf(self, profile_name=None): 650 """Start a perf process to profile this node. 651 652 Returns the subprocess running perf.""" 653 subp = None 654 655 def test_success(cmd): 656 return subprocess.call( 657 # shell=True required for pipe use below 658 cmd, shell=True, 659 stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) == 0 660 661 if platform.system() != 'Linux': 662 self.log.warning("Can't profile with perf; only available on Linux platforms") 663 return None 664 665 if not test_success('which perf'): 666 self.log.warning("Can't profile with perf; must install perf-tools") 667 return None 668 669 if not test_success('readelf -S {} | grep .debug_str'.format(shlex.quote(self.binary))): 670 self.log.warning( 671 "perf output won't be very useful without debug symbols compiled into bitcoind") 672 673 output_path = tempfile.NamedTemporaryFile( 674 dir=self.datadir_path, 675 prefix="{}.perf.data.".format(profile_name or 'test'), 676 delete=False, 677 ).name 678 679 cmd = [ 680 'perf', 'record', 681 '-g', # Record the callgraph. 682 '--call-graph', 'dwarf', # Compatibility for gcc's --fomit-frame-pointer. 683 '-F', '101', # Sampling frequency in Hz. 684 '-p', str(self.process.pid), 685 '-o', output_path, 686 ] 687 subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 688 self.perf_subprocesses[profile_name] = subp 689 690 return subp 691 692 def _stop_perf(self, profile_name): 693 """Stop (and pop) a perf subprocess.""" 694 subp = self.perf_subprocesses.pop(profile_name) 695 output_path = subp.args[subp.args.index('-o') + 1] 696 697 subp.terminate() 698 subp.wait(timeout=10) 699 700 stderr = subp.stderr.read().decode() 701 if 'Consider tweaking /proc/sys/kernel/perf_event_paranoid' in stderr: 702 self.log.warning( 703 "perf couldn't collect data! Try " 704 "'sudo sysctl -w kernel.perf_event_paranoid=-1'") 705 else: 706 report_cmd = "perf report -i {}".format(output_path) 707 self.log.info("See perf output by running '{}'".format(report_cmd)) 708 709 def assert_start_raises_init_error(self, extra_args=None, expected_msg=None, match=ErrorMatch.FULL_TEXT, *args, **kwargs): 710 """Attempt to start the node and expect it to raise an error. 711 712 extra_args: extra arguments to pass through to bitcoind 713 expected_msg: regex that stderr should match when bitcoind fails 714 715 Will throw if bitcoind starts without an error. 716 Will throw if an expected_msg is provided and it does not match bitcoind's stdout.""" 717 assert not self.running 718 with tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) as log_stderr, \ 719 tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) as log_stdout: 720 try: 721 self.start(extra_args, stdout=log_stdout, stderr=log_stderr, *args, **kwargs) 722 ret = self.process.wait(timeout=self.rpc_timeout) 723 self.log.debug(self._node_msg(f'bitcoind exited with status {ret} during initialization')) 724 assert_not_equal(ret, 0) # Exit code must indicate failure 725 self.running = False 726 self.process = None 727 # Check stderr for expected message 728 if expected_msg is not None: 729 log_stderr.seek(0) 730 stderr = log_stderr.read().decode('utf-8').strip() 731 if match == ErrorMatch.PARTIAL_REGEX: 732 if re.search(expected_msg, stderr, flags=re.MULTILINE) is None: 733 self._raise_assertion_error( 734 'Expected message "{}" does not partially match stderr:\n"{}"'.format(expected_msg, stderr)) 735 elif match == ErrorMatch.FULL_REGEX: 736 if re.fullmatch(expected_msg, stderr) is None: 737 self._raise_assertion_error( 738 'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr)) 739 elif match == ErrorMatch.FULL_TEXT: 740 if expected_msg != stderr: 741 self._raise_assertion_error( 742 'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr)) 743 except subprocess.TimeoutExpired: 744 self.process.kill() 745 self.running = False 746 self.process = None 747 assert_msg = f'bitcoind should have exited within {self.rpc_timeout}s ' 748 if expected_msg is None: 749 assert_msg += "with an error" 750 else: 751 assert_msg += "with expected error " + expected_msg 752 self._raise_assertion_error(assert_msg) 753 754 def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=None, wait_for_v2_handshake=True, expect_success=True, **kwargs): 755 """Add an inbound p2p connection to the node. 756 757 This method adds the p2p connection to the self.p2ps list and also 758 returns the connection to the caller. 759 760 When self.use_v2transport is True, TestNode advertises NODE_P2P_V2 service flag 761 762 An inbound connection is made from TestNode <------ P2PConnection 763 - if TestNode doesn't advertise NODE_P2P_V2 service, P2PConnection sends version message and v1 P2P is followed 764 - if TestNode advertises NODE_P2P_V2 service, (and if P2PConnections supports v2 P2P) 765 P2PConnection sends ellswift bytes and v2 P2P is followed 766 """ 767 if 'dstport' not in kwargs: 768 kwargs['dstport'] = p2p_port(self.index) 769 if 'dstaddr' not in kwargs: 770 kwargs['dstaddr'] = '127.0.0.1' 771 if supports_v2_p2p is None: 772 supports_v2_p2p = self.use_v2transport 773 774 if self.use_v2transport: 775 kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 776 supports_v2_p2p = self.use_v2transport and supports_v2_p2p 777 p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p)() 778 779 self.p2ps.append(p2p_conn) 780 if not expect_success: 781 return p2p_conn 782 p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False) 783 if supports_v2_p2p and wait_for_v2_handshake: 784 p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake) 785 if send_version: 786 p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg) 787 if wait_for_verack: 788 # Wait for the node to send us the version and verack 789 p2p_conn.wait_for_verack() 790 # At this point we have sent our version message and received the version and verack, however the full node 791 # has not yet received the verack from us (in reply to their version). So, the connection is not yet fully 792 # established (fSuccessfullyConnected). 793 # 794 # This shouldn't lead to any issues when sending messages, since the verack will be in-flight before the 795 # message we send. However, it might lead to races where we are expecting to receive a message. E.g. a 796 # transaction that will be added to the mempool as soon as we return here. 797 # 798 # So syncing here is redundant when we only want to send a message, but the cost is low (a few milliseconds) 799 # in comparison to the upside of making tests less fragile and unexpected intermittent errors less likely. 800 p2p_conn.sync_with_ping() 801 802 # Consistency check that the node received our user agent string. 803 # Find our connection in getpeerinfo by our address:port and theirs, as this combination is unique. 804 sockname = p2p_conn._transport.get_extra_info("socket").getsockname() 805 our_addr_and_port = f"{sockname[0]}:{sockname[1]}" 806 dst_addr_and_port = f"{p2p_conn.dstaddr}:{p2p_conn.dstport}" 807 info = [peer for peer in self.getpeerinfo() if peer["addr"] == our_addr_and_port and peer["addrbind"] == dst_addr_and_port] 808 assert_equal(len(info), 1) 809 assert_equal(info[0]["subver"], P2P_SUBVERSION) 810 811 return p2p_conn 812 813 def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, wait_for_disconnect=False, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=None, advertise_v2_p2p=None, **kwargs): 814 """Add an outbound p2p connection from node. Must be an 815 "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection. 816 817 This method adds the p2p connection to the self.p2ps list and returns 818 the connection to the caller. 819 820 p2p_idx must be different for simultaneously connected peers. When reusing it for the next peer 821 after disconnecting the previous one, it is necessary to wait for the disconnect to finish to avoid 822 a race condition. 823 824 Parameters: 825 supports_v2_p2p: whether p2p_conn supports v2 P2P or not 826 advertise_v2_p2p: whether p2p_conn is advertised to support v2 P2P or not 827 828 An outbound connection is made from TestNode -------> P2PConnection 829 - if P2PConnection doesn't advertise_v2_p2p, TestNode sends version message and v1 P2P is followed 830 - if P2PConnection both supports_v2_p2p and advertise_v2_p2p, TestNode sends ellswift bytes and v2 P2P is followed 831 - if P2PConnection doesn't supports_v2_p2p but advertise_v2_p2p, 832 TestNode sends ellswift bytes and P2PConnection disconnects, 833 TestNode reconnects by sending version message and v1 P2P is followed 834 """ 835 836 def addconnection_callback(address, port): 837 self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type)) 838 self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p) 839 840 if supports_v2_p2p is None: 841 supports_v2_p2p = self.use_v2transport 842 if advertise_v2_p2p is None: 843 advertise_v2_p2p = self.use_v2transport 844 845 if advertise_v2_p2p: 846 kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 847 assert self.use_v2transport # only a v2 TestNode could make a v2 outbound connection 848 849 # if P2PConnection is advertised to support v2 P2P when it doesn't actually support v2 P2P, 850 # reconnection needs to be attempted using v1 P2P by sending version message 851 reconnect = advertise_v2_p2p and not supports_v2_p2p 852 # P2PConnection needs to be advertised to support v2 P2P so that ellswift bytes are sent instead of msg_version 853 supports_v2_p2p = supports_v2_p2p and advertise_v2_p2p 854 p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p, reconnect=reconnect, **kwargs)() 855 856 if reconnect: 857 p2p_conn.wait_for_reconnect() 858 859 if connection_type == "feeler" or wait_for_disconnect: 860 # feeler connections are closed as soon as the node receives a `version` message 861 p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False) 862 p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False) 863 else: 864 p2p_conn.wait_for_connect() 865 self.p2ps.append(p2p_conn) 866 867 if supports_v2_p2p: 868 p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake) 869 p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg) 870 if wait_for_verack: 871 p2p_conn.wait_for_verack() 872 p2p_conn.sync_with_ping() 873 874 return p2p_conn 875 876 def num_test_p2p_connections(self): 877 """Return number of test framework p2p connections to the node.""" 878 return len([peer for peer in self.getpeerinfo() if peer['subver'] == P2P_SUBVERSION]) 879 880 def disconnect_p2ps(self): 881 """Close all p2p connections to the node. 882 The state of the peers (such as txrequests) may not be fully cleared 883 yet, even after this method returns.""" 884 for p in self.p2ps: 885 p.peer_disconnect() 886 del self.p2ps[:] 887 888 self.wait_until(lambda: self.num_test_p2p_connections() == 0) 889 890 def bumpmocktime(self, seconds): 891 """Fast forward using setmocktime to self.mocktime + seconds. Requires setmocktime to have 892 been called at some point in the past.""" 893 assert self.mocktime 894 self.mocktime += seconds 895 self.setmocktime(self.mocktime) 896 897 def wait_until(self, test_function, timeout=60, check_interval=0.05): 898 return wait_until_helper_internal(test_function, timeout=timeout, timeout_factor=self.timeout_factor, check_interval=check_interval) 899 900 901 class TestNodeCLIAttr: 902 def __init__(self, cli, command): 903 self.cli = cli 904 self.command = command 905 906 def __call__(self, *args, **kwargs): 907 return self.cli.send_cli(self.command, *args, **kwargs) 908 909 def get_request(self, *args, **kwargs): 910 return lambda: self(*args, **kwargs) 911 912 913 def arg_to_cli(arg): 914 if isinstance(arg, bool): 915 return str(arg).lower() 916 elif arg is None: 917 return 'null' 918 elif isinstance(arg, dict) or isinstance(arg, list) or isinstance(arg, tuple): 919 return json.dumps(arg, default=serialization_fallback) 920 else: 921 return str(arg) 922 923 924 class TestNodeCLI(): 925 """Interface to bitcoin-cli for an individual node""" 926 def __init__(self, binaries, datadir, rpc_timeout): 927 self.options = [] 928 self.binaries = binaries 929 self.datadir = datadir 930 self.rpc_timeout = rpc_timeout 931 self.input = None 932 self.log = logging.getLogger('TestFramework.bitcoincli') 933 934 def __call__(self, *options, input=None): 935 # TestNodeCLI is callable with bitcoin-cli command-line options 936 cli = TestNodeCLI(self.binaries, self.datadir, self.rpc_timeout) 937 cli.options = [str(o) for o in options] 938 cli.input = input 939 return cli 940 941 def __getattr__(self, command): 942 return TestNodeCLIAttr(self, command) 943 944 def batch(self, requests): 945 results = [] 946 for request in requests: 947 try: 948 results.append(dict(result=request())) 949 except JSONRPCException as e: 950 results.append(dict(error=e)) 951 return results 952 953 def send_cli(self, clicommand=None, *args, **kwargs): 954 """Run bitcoin-cli command. Deserializes returned string as python object.""" 955 pos_args = [arg_to_cli(arg) for arg in args] 956 named_args = [key + "=" + arg_to_cli(value) for (key, value) in kwargs.items() if value is not None] 957 p_args = self.binaries.rpc_argv() + [ 958 f"-datadir={self.datadir}", 959 f"-rpcclienttimeout={int(self.rpc_timeout)}", 960 ] + self.options 961 if named_args: 962 p_args += ["-named"] 963 base_arg_pos = len(p_args) 964 if clicommand is not None: 965 p_args += [clicommand] 966 p_args += pos_args + named_args 967 968 # TEST_CLI_MAX_ARG_SIZE is set low enough that checking the string 969 # length is enough and encoding to bytes is not needed before 970 # calculating the sum. 971 sum_arg_size = sum(len(arg) for arg in p_args) 972 stdin_data = self.input 973 if sum_arg_size >= TEST_CLI_MAX_ARG_SIZE: 974 self.log.debug(f"Cli: Command size {sum_arg_size} too large, using stdin") 975 rpc_args = "\n".join([arg for arg in p_args[base_arg_pos:]]) 976 if stdin_data is not None: 977 stdin_data += "\n" + rpc_args 978 else: 979 stdin_data = rpc_args 980 p_args = p_args[:base_arg_pos] + ['-stdin'] 981 982 self.log.debug("Running bitcoin-cli {}".format(p_args[2:])) 983 process = subprocess.Popen(p_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 984 cli_stdout, cli_stderr = process.communicate(input=stdin_data) 985 returncode = process.poll() 986 if returncode: 987 match = re.match(r'error code: ([-0-9]+)\nerror message:\n(.*)', cli_stderr) 988 if match: 989 code, message = match.groups() 990 raise JSONRPCException(dict(code=int(code), message=message)) 991 # Ignore cli_stdout, raise with cli_stderr 992 raise subprocess.CalledProcessError(returncode, p_args, output=cli_stderr) 993 try: 994 if not cli_stdout.strip(): 995 return None 996 return json.loads(cli_stdout, parse_float=decimal.Decimal) 997 except (json.JSONDecodeError, decimal.InvalidOperation): 998 return cli_stdout.rstrip("\n")