test_node.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Class for bitcoind node under test""" 6 7 import contextlib 8 import decimal 9 import errno 10 from enum import Enum 11 import json 12 import logging 13 import os 14 import pathlib 15 import platform 16 import re 17 import subprocess 18 import tempfile 19 import time 20 import urllib.parse 21 import collections 22 import shlex 23 import shutil 24 import sys 25 from collections.abc import Iterable 26 from pathlib import Path 27 28 from .authproxy import ( 29 JSONRPCException, 30 serialization_fallback, 31 ) 32 from .messages import NODE_P2P_V2 33 from .p2p import P2P_SERVICES, P2P_SUBVERSION 34 from .util import ( 35 MAX_NODES, 36 assert_equal, 37 assert_not_equal, 38 append_config, 39 delete_cookie_file, 40 get_auth_cookie, 41 get_rpc_proxy, 42 rpc_url, 43 wait_until_helper_internal, 44 p2p_port, 45 tor_port, 46 ) 47 48 BITCOIND_PROC_WAIT_TIMEOUT = 60 49 # The size of the blocks xor key 50 # from InitBlocksdirXorKey::xor_key.size() 51 NUM_XOR_BYTES = 8 52 # Many systems have a 128kB limit for a command size. Depending on the 53 # platform, this limit may be larger or smaller. Moreover, when using the 54 # 'bitcoin' command, it may internally insert more args, which must be 55 # accounted for. There is no need to pick the largest possible value here 56 # anyway and it should be fine to set it to 1kB in tests. 57 TEST_CLI_MAX_ARG_SIZE = 1024 58 59 # The null blocks key (all 0s) 60 NULL_BLK_XOR_KEY = bytes([0] * NUM_XOR_BYTES) 61 BITCOIN_PID_FILENAME_DEFAULT = "bitcoind.pid" 62 63 if sys.platform.startswith("linux"): 64 UNIX_PATH_MAX = 108 # includes the trailing NUL 65 elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")): 66 UNIX_PATH_MAX = 104 67 else: # safest portable value 68 UNIX_PATH_MAX = 92 69 70 71 class FailedToStartError(Exception): 72 """Raised when a node fails to start correctly.""" 73 74 75 class ErrorMatch(Enum): 76 FULL_TEXT = 1 77 FULL_REGEX = 2 78 PARTIAL_REGEX = 3 79 80 81 class TestNode(): 82 """A class for representing a bitcoind node under test. 83 84 This class contains: 85 86 - state about the node (whether it's running, etc) 87 - a Python subprocess.Popen object representing the running process 88 - an RPC connection to the node 89 - one or more P2P connections to the node 90 91 92 To make things easier for the test writer, any unrecognised messages will 93 be dispatched to the RPC connection.""" 94 95 def __init__( 96 self, 97 i, 98 datadir_path, 99 *, 100 chain, 101 rpchost, 102 timewait, 103 timeout_factor, 104 binaries, 105 coverage_dir, 106 cwd, 107 extra_conf=None, 108 extra_args=None, 109 use_cli=False, 110 start_perf=False, 111 version=None, 112 v2transport=False, 113 uses_wallet=False, 114 ipcbind=False, 115 ): 116 """ 117 Kwargs: 118 start_perf (bool): If True, begin profiling the node with `perf` as soon as 119 the node starts. 120 """ 121 122 self.index = i 123 self.p2p_conn_index = 1 124 self.datadir_path = datadir_path 125 self.bitcoinconf = self.datadir_path / "bitcoin.conf" 126 self.stdout_dir = self.datadir_path / "stdout" 127 self.stderr_dir = self.datadir_path / "stderr" 128 self.chain = chain 129 self.rpchost = rpchost 130 self.rpc_timeout = timewait # Already multiplied by timeout_factor 131 self.timeout_factor = timeout_factor 132 self.binaries = binaries 133 self.coverage_dir = coverage_dir 134 self.cwd = cwd 135 self.has_explicit_bind = False 136 if extra_conf is not None: 137 append_config(self.datadir_path, extra_conf) 138 # Remember if there is bind=... in the config file. 139 self.has_explicit_bind = any(e.startswith("bind=") for e in extra_conf) 140 # Most callers will just need to add extra args to the standard list below. 141 # For those callers that need more flexibility, they can just set the args property directly. 142 # Note that common args are set in the config file (see initialize_datadir) 143 self.extra_args = extra_args 144 self.version = version 145 # Configuration for logging is set as command-line args rather than in the bitcoin.conf file. 146 # This means that starting a bitcoind using the temp dir to debug a failed test won't 147 # spam debug.log. 148 self.args = self.binaries.node_argv(need_ipc=ipcbind) + [ 149 f"-datadir={self.datadir_path}", 150 "-logtimemicros", 151 "-debug", 152 "-debugexclude=libevent", 153 "-debugexclude=leveldb", 154 "-debugexclude=rand", 155 "-uacomment=testnode%d" % i, # required for subversion uniqueness across peers 156 ] 157 if uses_wallet is not None and not uses_wallet: 158 self.args.append("-disablewallet") 159 160 self.ipc_tmp_dir = None 161 if ipcbind: 162 self.ipc_socket_path = self.chain_path / "node.sock" 163 if len(os.fsencode(self.ipc_socket_path)) < UNIX_PATH_MAX: 164 self.args.append("-ipcbind=unix") 165 else: 166 # Work around default CI path exceeding maximum socket path length. 167 self.ipc_tmp_dir = pathlib.Path(tempfile.mkdtemp(prefix="test-ipc-")) 168 self.ipc_socket_path = self.ipc_tmp_dir / "node.sock" 169 self.args.append(f"-ipcbind=unix:{self.ipc_socket_path}") 170 171 if self.version_is_at_least(190000): 172 self.args.append("-logthreadnames") 173 if self.version_is_at_least(219900): 174 self.args.append("-logsourcelocations") 175 if self.version_is_at_least(239000): 176 self.args.append("-loglevel=trace") 177 if self.version_is_at_least(290100): 178 self.args.append("-nologratelimit") 179 180 # Default behavior from global -v2transport flag is added to args to persist it over restarts. 181 # May be overwritten in individual tests, using extra_args. 182 self.default_to_v2 = v2transport 183 if self.version_is_at_least(260000): 184 # 26.0 and later support v2transport 185 if v2transport: 186 self.args.append("-v2transport=1") 187 else: 188 self.args.append("-v2transport=0") 189 # if v2transport is requested via global flag but not supported for node version, ignore it 190 191 self.cli = TestNodeCLI( 192 binaries, 193 self.datadir_path, 194 self.rpc_timeout // 2, # timeout identical to the one used in self._rpc 195 ) 196 self.use_cli = use_cli 197 self.start_perf = start_perf 198 199 self.running = False 200 self.process = None 201 self.rpc_connected = False 202 self._rpc = None # Should usually not be accessed directly in tests to allow for --usecli mode 203 self.reuse_http_connections = True # Must be set before calling get_rpc_proxy() i.e. before restarting node 204 self.url = None 205 self.log = logging.getLogger('TestFramework.node%d' % i) 206 # Cache perf subprocesses here by their data output filename. 207 self.perf_subprocesses = {} 208 209 self.p2ps = [] 210 211 self.mocktime = None 212 213 AddressKeyPair = collections.namedtuple('AddressKeyPair', ['address', 'key']) 214 PRIV_KEYS = [ 215 # address , privkey 216 AddressKeyPair('mjTkW3DjgyZck4KbiRusZsqTgaYTxdSz6z', 'cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW'), 217 AddressKeyPair('msX6jQXvxiNhx3Q62PKeLPrhrqZQdSimTg', 'cUxsWyKyZ9MAQTaAhUQWJmBbSvHMwSmuv59KgxQV7oZQU3PXN3KE'), 218 AddressKeyPair('mnonCMyH9TmAsSj3M59DsbH8H63U3RKoFP', 'cTrh7dkEAeJd6b3MRX9bZK8eRmNqVCMH3LSUkE3dSFDyzjU38QxK'), 219 AddressKeyPair('mqJupas8Dt2uestQDvV2NH3RU8uZh2dqQR', 'cVuKKa7gbehEQvVq717hYcbE9Dqmq7KEBKqWgWrYBa2CKKrhtRim'), 220 AddressKeyPair('msYac7Rvd5ywm6pEmkjyxhbCDKqWsVeYws', 'cQDCBuKcjanpXDpCqacNSjYfxeQj8G6CAtH1Dsk3cXyqLNC4RPuh'), 221 AddressKeyPair('n2rnuUnwLgXqf9kk2kjvVm8R5BZK1yxQBi', 'cQakmfPSLSqKHyMFGwAqKHgWUiofJCagVGhiB4KCainaeCSxeyYq'), 222 AddressKeyPair('myzuPxRwsf3vvGzEuzPfK9Nf2RfwauwYe6', 'cQMpDLJwA8DBe9NcQbdoSb1BhmFxVjWD5gRyrLZCtpuF9Zi3a9RK'), 223 AddressKeyPair('mumwTaMtbxEPUswmLBBN3vM9oGRtGBrys8', 'cSXmRKXVcoouhNNVpcNKFfxsTsToY5pvB9DVsFksF1ENunTzRKsy'), 224 AddressKeyPair('mpV7aGShMkJCZgbW7F6iZgrvuPHjZjH9qg', 'cSoXt6tm3pqy43UMabY6eUTmR3eSUYFtB2iNQDGgb3VUnRsQys2k'), 225 AddressKeyPair('mq4fBNdckGtvY2mijd9am7DRsbRB4KjUkf', 'cN55daf1HotwBAgAKWVgDcoppmUNDtQSfb7XLutTLeAgVc3u8hik'), 226 AddressKeyPair('mpFAHDjX7KregM3rVotdXzQmkbwtbQEnZ6', 'cT7qK7g1wkYEMvKowd2ZrX1E5f6JQ7TM246UfqbCiyF7kZhorpX3'), 227 AddressKeyPair('mzRe8QZMfGi58KyWCse2exxEFry2sfF2Y7', 'cPiRWE8KMjTRxH1MWkPerhfoHFn5iHPWVK5aPqjW8NxmdwenFinJ'), 228 ] 229 230 def get_deterministic_priv_key(self): 231 """Return a deterministic priv key in base58, that only depends on the node's index""" 232 assert len(self.PRIV_KEYS) == MAX_NODES 233 return self.PRIV_KEYS[self.index] 234 235 def _node_msg(self, msg: str) -> str: 236 """Return a modified msg that identifies this node by its index as a debugging aid.""" 237 return "[node %d] %s" % (self.index, msg) 238 239 def _raise_assertion_error(self, msg: str): 240 """Raise an AssertionError with msg modified to identify this node.""" 241 raise AssertionError(self._node_msg(msg)) 242 243 def __del__(self): 244 # Ensure that we don't leave any bitcoind processes lying around after 245 # the test ends 246 if self.process: 247 # Should only happen on test failure 248 # Avoid using logger, as that may have already been shutdown when 249 # this destructor is called. 250 print(self._node_msg("Cleaning up leftover process"), file=sys.stderr) 251 self.process.kill() 252 if self.ipc_tmp_dir: 253 print(self._node_msg(f"Cleaning up ipc directory {str(self.ipc_tmp_dir)!r}")) 254 shutil.rmtree(self.ipc_tmp_dir) 255 256 def __getattr__(self, name): 257 """Dispatches any unrecognised messages to the RPC connection or a CLI instance.""" 258 if self.use_cli: 259 return getattr(self.cli, name) 260 else: 261 assert self.rpc_connected and self._rpc is not None, self._node_msg("Error: no RPC connection") 262 return getattr(self._rpc, name) 263 264 def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, env=None, **kwargs): 265 """Start the node.""" 266 if extra_args is None: 267 extra_args = self.extra_args 268 269 # If listening and no -bind is given, then bitcoind would bind P2P ports on 270 # 0.0.0.0:P and 127.0.0.1:P+1 (for incoming Tor connections), where P is 271 # a unique port chosen by the test framework and configured as port=P in 272 # bitcoin.conf. To avoid collisions, change it to 127.0.0.1:tor_port(). 273 will_listen = all(e != "-nolisten" and e != "-listen=0" for e in extra_args) 274 has_explicit_bind = self.has_explicit_bind or any(e.startswith("-bind=") for e in extra_args) 275 if will_listen and not has_explicit_bind: 276 extra_args.append(f"-bind=0.0.0.0:{p2p_port(self.index)}") 277 extra_args.append(f"-bind=127.0.0.1:{tor_port(self.index)}=onion") 278 279 self.use_v2transport = "-v2transport=1" in extra_args or (self.default_to_v2 and "-v2transport=0" not in extra_args) 280 281 # Add a new stdout and stderr file each time bitcoind is started 282 if stderr is None: 283 stderr = tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) 284 if stdout is None: 285 stdout = tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) 286 self.stderr = stderr 287 self.stdout = stdout 288 289 if cwd is None: 290 cwd = self.cwd 291 292 # Delete any existing cookie file -- if such a file exists (eg due to 293 # unclean shutdown), it will get overwritten anyway by bitcoind, and 294 # potentially interfere with our attempt to authenticate 295 delete_cookie_file(self.datadir_path, self.chain) 296 297 # add environment variable LIBC_FATAL_STDERR_=1 so that libc errors are written to stderr and not the terminal 298 subp_env = dict(os.environ, LIBC_FATAL_STDERR_="1") 299 if env is not None: 300 subp_env.update(env) 301 302 self.process = subprocess.Popen(self.args + extra_args, env=subp_env, stdout=stdout, stderr=stderr, cwd=cwd, **kwargs) 303 304 self.running = True 305 self.log.debug("bitcoind started, waiting for RPC to come up") 306 307 if self.start_perf: 308 self._start_perf() 309 310 def wait_for_rpc_connection(self, *, wait_for_import=True): 311 """Sets up an RPC connection to the bitcoind process. Returns False if unable to connect.""" 312 # Poll at a rate of four times per second 313 poll_per_s = 4 314 315 suppressed_errors = collections.defaultdict(int) 316 latest_error = None 317 def suppress_error(category: str, e: Exception): 318 suppressed_errors[category] += 1 319 return (category, repr(e)) 320 321 for _ in range(poll_per_s * self.rpc_timeout): 322 if self.process.poll() is not None: 323 # Attach abrupt shutdown error/s to the exception message 324 self.stderr.seek(0) 325 str_error = ''.join(line.decode('utf-8') for line in self.stderr) 326 str_error += "************************\n" if str_error else '' 327 328 raise FailedToStartError(self._node_msg( 329 f'bitcoind exited with status {self.process.returncode} during initialization. {str_error}')) 330 try: 331 rpc = get_rpc_proxy( 332 rpc_url(self.datadir_path, self.index, self.chain, self.rpchost), 333 self.index, 334 timeout=self.rpc_timeout // 2, # Shorter timeout to allow for one retry in case of ETIMEDOUT 335 coveragedir=self.coverage_dir, 336 ) 337 rpc.auth_service_proxy_instance.reuse_http_connections = self.reuse_http_connections 338 rpc.getblockcount() 339 # If the call to getblockcount() succeeds then the RPC connection is up 340 if self.version_is_at_least(190000) and wait_for_import: 341 # getmempoolinfo.loaded is available since commit 342 # bb8ae2c (version 0.19.0) 343 self.wait_until(lambda: rpc.getmempoolinfo()['loaded']) 344 # Wait for the node to finish reindex, block import, and 345 # loading the mempool. Usually importing happens fast or 346 # even "immediate" when the node is started. However, there 347 # is no guarantee and sometimes ImportBlocks might finish 348 # later. This is going to cause intermittent test failures, 349 # because generally the tests assume the node is fully 350 # ready after being started. 351 # 352 # For example, the node will reject block messages from p2p 353 # when it is still importing with the error "Unexpected 354 # block message received" 355 # 356 # The wait is done here to make tests as robust as possible 357 # and prevent racy tests and intermittent failures as much 358 # as possible. Some tests might not need this, but the 359 # overhead is trivial, and the added guarantees are worth 360 # the minimal performance cost. 361 self.log.debug("RPC successfully started") 362 # Set rpc_connected even if we are in use_cli mode so that we know we can call self.stop() if needed. 363 self.rpc_connected = True 364 if self.use_cli: 365 return 366 self._rpc = rpc 367 self.url = self._rpc.rpc_url 368 return 369 except JSONRPCException as e: 370 # Suppress these as they are expected during initialization. 371 # -28 RPC in warmup 372 # -342 Service unavailable, could be starting up or shutting down 373 if e.error['code'] not in [-28, -342]: 374 raise # unknown JSON RPC exception 375 latest_error = suppress_error(f"JSONRPCException {e.error['code']}", e) 376 except OSError as e: 377 error_num = e.errno 378 if error_num is None: 379 # Work around issue where socket timeouts don't have errno set. 380 # https://github.com/python/cpython/issues/109601 381 if isinstance(e, TimeoutError): 382 error_num = errno.ETIMEDOUT 383 # http.client.RemoteDisconnected inherits from this type and 384 # doesn't specify errno. 385 elif isinstance(e, ConnectionResetError): 386 error_num = errno.ECONNRESET 387 388 # Suppress similarly to the above JSONRPCException errors. 389 if error_num not in [ 390 errno.ECONNRESET, # This might happen when the RPC server is in warmup, 391 # but shut down before the call to getblockcount succeeds. 392 errno.ETIMEDOUT, # Treat identical to ECONNRESET 393 errno.ECONNREFUSED # Port not yet open? 394 ]: 395 raise # unknown OS error 396 latest_error = suppress_error(f"OSError {errno.errorcode[error_num]}", e) 397 except ValueError as e: 398 # Suppress if cookie file isn't generated yet and no rpcuser or rpcpassword; bitcoind may be starting. 399 if "No RPC credentials" not in str(e): 400 raise 401 latest_error = suppress_error("missing_credentials", e) 402 time.sleep(1.0 / poll_per_s) 403 self._raise_assertion_error(f"Unable to connect to bitcoind after {self.rpc_timeout}s (ignored errors: {dict(suppressed_errors)!s}{'' if latest_error is None else f', latest: {latest_error[0]!r}/{latest_error[1]}'})") 404 405 def wait_for_cookie_credentials(self): 406 """Ensures auth cookie credentials can be read, e.g. for testing CLI with -rpcwait before RPC connection is up.""" 407 self.log.debug("Waiting for cookie credentials") 408 # Poll at a rate of four times per second. 409 poll_per_s = 4 410 for _ in range(poll_per_s * self.rpc_timeout): 411 try: 412 get_auth_cookie(self.datadir_path, self.chain) 413 self.log.debug("Cookie credentials successfully retrieved") 414 return 415 except ValueError: # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting 416 pass # so we continue polling until RPC credentials are retrieved 417 time.sleep(1.0 / poll_per_s) 418 self._raise_assertion_error("Unable to retrieve cookie credentials after {}s".format(self.rpc_timeout)) 419 420 def generate(self, nblocks, maxtries=1000000, **kwargs): 421 self.log.debug("TestNode.generate() dispatches `generate` call to `generatetoaddress`") 422 return self.generatetoaddress(nblocks=nblocks, address=self.get_deterministic_priv_key().address, maxtries=maxtries, **kwargs) 423 424 def generateblock(self, *args, called_by_framework, **kwargs): 425 assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly." 426 return self.__getattr__('generateblock')(*args, **kwargs) 427 428 def generatetoaddress(self, *args, called_by_framework, **kwargs): 429 assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly." 430 return self.__getattr__('generatetoaddress')(*args, **kwargs) 431 432 def generatetodescriptor(self, *args, called_by_framework, **kwargs): 433 assert called_by_framework, "Direct call of this mining RPC is discouraged. Please use one of the self.generate* methods on the test framework, which sync the nodes to avoid intermittent test issues. You may use sync_fun=self.no_op to disable the sync explicitly." 434 return self.__getattr__('generatetodescriptor')(*args, **kwargs) 435 436 def setmocktime(self, timestamp): 437 """Wrapper for setmocktime RPC, sets self.mocktime""" 438 if timestamp == 0: 439 # setmocktime(0) resets to system time. 440 self.mocktime = None 441 else: 442 self.mocktime = timestamp 443 return self.__getattr__('setmocktime')(timestamp) 444 445 def get_wallet_rpc(self, wallet_name): 446 if self.use_cli: 447 return self.cli("-rpcwallet={}".format(wallet_name)) 448 else: 449 assert self.rpc_connected and self._rpc, self._node_msg("RPC not connected") 450 wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name)) 451 return self._rpc / wallet_path 452 453 def version_is_at_least(self, ver): 454 return self.version is None or self.version >= ver 455 456 def stop_node(self, expected_stderr='', *, wait=0, wait_until_stopped=True): 457 """Stop the node.""" 458 if not self.running: 459 return 460 assert self.rpc_connected, self._node_msg( 461 "Should only call stop_node() on a running node after wait_for_rpc_connection() succeeded. " 462 f"Did you forget to call the latter after start()? Not connected to process: {self.process.pid}") 463 self.log.debug("Stopping node") 464 # Do not use wait argument when testing older nodes, e.g. in wallet_backwards_compatibility.py 465 if self.version_is_at_least(180000): 466 self.stop(wait=wait) 467 else: 468 self.stop() 469 470 # If there are any running perf processes, stop them. 471 for profile_name in tuple(self.perf_subprocesses.keys()): 472 self._stop_perf(profile_name) 473 474 del self.p2ps[:] 475 476 assert (not expected_stderr) or wait_until_stopped # Must wait to check stderr 477 if wait_until_stopped: 478 self.wait_until_stopped(expected_stderr=expected_stderr) 479 480 def is_node_stopped(self, *, expected_stderr="", expected_ret_code=0): 481 """Checks whether the node has stopped. 482 483 Returns True if the node has stopped. False otherwise. 484 485 If the process has exited, asserts that the exit code matches 486 `expected_ret_code` (which may be a single value or an iterable of values), 487 and that stderr matches `expected_stderr` exactly or, if a regex pattern is 488 provided, contains the pattern. 489 490 This method is responsible for freeing resources (self.process).""" 491 if not self.running: 492 return True 493 return_code = self.process.poll() 494 if return_code is None: 495 return False 496 497 # process has stopped. Assert that it didn't return an error code. 498 if not isinstance(expected_ret_code, Iterable): 499 expected_ret_code = (expected_ret_code,) 500 assert return_code in expected_ret_code, self._node_msg( 501 f"Node returned unexpected exit code ({return_code}) vs ({expected_ret_code}) when stopping") 502 # Check that stderr is as expected 503 self.stderr.seek(0) 504 stderr = self.stderr.read().decode('utf-8').strip() 505 if isinstance(expected_stderr, re.Pattern): 506 if not expected_stderr.search(stderr): 507 raise AssertionError(f"Unexpected stderr {stderr!r} does not contain {expected_stderr.pattern!r}") 508 elif stderr != expected_stderr: 509 raise AssertionError("Unexpected stderr {} != {}".format(stderr, expected_stderr)) 510 511 self.stdout.close() 512 self.stderr.close() 513 514 self.running = False 515 self.process = None 516 self.rpc_connected = False 517 self._rpc = None 518 self.log.debug("Node stopped") 519 return True 520 521 def wait_until_stopped(self, *, timeout=BITCOIND_PROC_WAIT_TIMEOUT, expect_error=False, **kwargs): 522 if "expected_ret_code" not in kwargs: 523 kwargs["expected_ret_code"] = 1 if expect_error else 0 # Whether node shutdown return EXIT_FAILURE or EXIT_SUCCESS 524 self.wait_until(lambda: self.is_node_stopped(**kwargs), timeout=timeout) 525 526 def kill_process(self): 527 self.process.kill() 528 self.wait_until_stopped(expected_ret_code=1 if platform.system() == "Windows" else -9) 529 assert self.is_node_stopped() 530 531 def replace_in_config(self, replacements): 532 """ 533 Perform replacements in the configuration file. 534 The substitutions are passed as a list of search-replace-tuples, e.g. 535 [("old", "new"), ("foo", "bar"), ...] 536 """ 537 with open(self.bitcoinconf, 'r') as conf: 538 conf_data = conf.read() 539 for replacement in replacements: 540 assert_equal(len(replacement), 2) 541 old, new = replacement[0], replacement[1] 542 conf_data = conf_data.replace(old, new) 543 with open(self.bitcoinconf, 'w') as conf: 544 conf.write(conf_data) 545 546 @property 547 def chain_path(self) -> Path: 548 return self.datadir_path / self.chain 549 550 @property 551 def debug_log_path(self) -> Path: 552 return self.chain_path / 'debug.log' 553 554 @property 555 def blocks_path(self) -> Path: 556 return self.chain_path / "blocks" 557 558 @property 559 def blocks_key_path(self) -> Path: 560 return self.blocks_path / "xor.dat" 561 562 def read_xor_key(self) -> bytes: 563 with open(self.blocks_key_path, "rb") as xor_f: 564 return xor_f.read(NUM_XOR_BYTES) 565 566 @property 567 def wallets_path(self) -> Path: 568 return self.chain_path / "wallets" 569 570 def debug_log_size(self, **kwargs) -> int: 571 with open(self.debug_log_path, **kwargs) as dl: 572 dl.seek(0, 2) 573 return dl.tell() 574 575 @contextlib.contextmanager 576 def assert_debug_log(self, expected_msgs, unexpected_msgs=None, *, timeout=0): 577 if unexpected_msgs is None: 578 unexpected_msgs = [] 579 assert_equal(type(expected_msgs), list) 580 assert_equal(type(unexpected_msgs), list) 581 remaining_expected = list(expected_msgs) 582 583 time_end = time.time() + timeout * self.timeout_factor 584 prev_size = self.debug_log_size(encoding="utf-8") # Must use same encoding that is used to read() below 585 586 def join_log(log): 587 return " - " + "\n - ".join(log.splitlines()) 588 589 yield 590 591 while True: 592 with open(self.debug_log_path, encoding="utf-8", errors="replace") as dl: 593 dl.seek(prev_size) 594 log = dl.read() 595 for unexpected_msg in unexpected_msgs: 596 if unexpected_msg in log: 597 self._raise_assertion_error(f'Unexpected message "{unexpected_msg}" ' 598 f'found in log:\n\n{join_log(log)}\n\n') 599 while remaining_expected and remaining_expected[-1] in log: 600 remaining_expected.pop() 601 if not remaining_expected: 602 return 603 if time.time() >= time_end: 604 break 605 time.sleep(0.05) 606 remaining_expected = [e for e in remaining_expected if e not in log] 607 self._raise_assertion_error(f'Expected message(s) {remaining_expected!s} ' 608 f'not found in log:\n\n{join_log(log)}\n\n') 609 610 @contextlib.contextmanager 611 def busy_wait_for_debug_log(self, expected_msgs, timeout=60): 612 """ 613 Block until we see a particular debug log message fragment or until we exceed the timeout. 614 """ 615 time_end = time.time() + timeout * self.timeout_factor 616 prev_size = self.debug_log_size(mode="rb") # Must use same mode that is used to read() below 617 remaining_expected = list(expected_msgs) 618 619 yield 620 621 while True: 622 with open(self.debug_log_path, "rb") as dl: 623 dl.seek(prev_size) 624 log = dl.read() 625 626 while remaining_expected and remaining_expected[-1] in log: 627 remaining_expected.pop() 628 if not remaining_expected: 629 return 630 631 if time.time() >= time_end: 632 print_log = " - " + "\n - ".join(log.decode("utf8", errors="replace").splitlines()) 633 break 634 635 # No sleep here because we want to detect the message fragment as fast as 636 # possible. 637 638 remaining_expected = [e for e in remaining_expected if e not in log] 639 self._raise_assertion_error(f'Expected message(s) {remaining_expected!s} ' 640 f'not found in log:\n\n{print_log}\n\n') 641 642 @contextlib.contextmanager 643 def wait_for_new_peer(self, timeout=5): 644 """ 645 Wait until the node is connected to at least one new peer. We detect this 646 by watching for an increased highest peer id, using the `getpeerinfo` RPC call. 647 Note that the simpler approach of only accounting for the number of peers 648 suffers from race conditions, as disconnects from unrelated previous peers 649 could happen anytime in-between. 650 """ 651 def get_highest_peer_id(): 652 peer_info = self.getpeerinfo() 653 return peer_info[-1]["id"] if peer_info else -1 654 655 initial_peer_id = get_highest_peer_id() 656 yield 657 self.wait_until(lambda: get_highest_peer_id() > initial_peer_id, timeout=timeout) 658 659 @contextlib.contextmanager 660 def profile_with_perf(self, profile_name: str): 661 """ 662 Context manager that allows easy profiling of node activity using `perf`. 663 664 See `test/functional/README.md` for details on perf usage. 665 666 Args: 667 profile_name: This string will be appended to the 668 profile data filename generated by perf. 669 """ 670 subp = self._start_perf(profile_name) 671 672 yield 673 674 if subp: 675 self._stop_perf(profile_name) 676 677 def _start_perf(self, profile_name=None): 678 """Start a perf process to profile this node. 679 680 Returns the subprocess running perf.""" 681 subp = None 682 683 def test_success(cmd): 684 return subprocess.call( 685 # shell=True required for pipe use below 686 cmd, shell=True, 687 stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) == 0 688 689 if platform.system() != 'Linux': 690 self.log.warning("Can't profile with perf; only available on Linux platforms") 691 return None 692 693 if not test_success('which perf'): 694 self.log.warning("Can't profile with perf; must install perf-tools") 695 return None 696 697 if not test_success('readelf -S {} | grep .debug_str'.format(shlex.quote(self.binary))): 698 self.log.warning( 699 "perf output won't be very useful without debug symbols compiled into bitcoind") 700 701 output_path = tempfile.NamedTemporaryFile( 702 dir=self.datadir_path, 703 prefix="{}.perf.data.".format(profile_name or 'test'), 704 delete=False, 705 ).name 706 707 cmd = [ 708 'perf', 'record', 709 '-g', # Record the callgraph. 710 '--call-graph', 'dwarf', # Compatibility for gcc's --fomit-frame-pointer. 711 '-F', '101', # Sampling frequency in Hz. 712 '-p', str(self.process.pid), 713 '-o', output_path, 714 ] 715 subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 716 self.perf_subprocesses[profile_name] = subp 717 718 return subp 719 720 def _stop_perf(self, profile_name): 721 """Stop (and pop) a perf subprocess.""" 722 subp = self.perf_subprocesses.pop(profile_name) 723 output_path = subp.args[subp.args.index('-o') + 1] 724 725 subp.terminate() 726 subp.wait(timeout=10) 727 728 stderr = subp.stderr.read().decode() 729 if 'Consider tweaking /proc/sys/kernel/perf_event_paranoid' in stderr: 730 self.log.warning( 731 "perf couldn't collect data! Try " 732 "'sudo sysctl -w kernel.perf_event_paranoid=-1'") 733 else: 734 report_cmd = "perf report -i {}".format(output_path) 735 self.log.info("See perf output by running '{}'".format(report_cmd)) 736 737 def assert_start_raises_init_error(self, extra_args=None, expected_msg=None, match=ErrorMatch.FULL_TEXT, *args, **kwargs): 738 """Attempt to start the node and expect it to raise an error. 739 740 extra_args: extra arguments to pass through to bitcoind 741 expected_msg: regex that stderr should match when bitcoind fails 742 743 Will raise if bitcoind starts without an error. 744 Will raise if an expected_msg is provided and it does not match bitcoind's stdout.""" 745 assert not self.running 746 with tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) as log_stderr, \ 747 tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) as log_stdout: 748 assert_msg = None 749 try: 750 self.start(extra_args, stdout=log_stdout, stderr=log_stderr, *args, **kwargs) 751 ret = self.process.wait(timeout=self.rpc_timeout) 752 self.log.debug(self._node_msg(f'bitcoind exited with status {ret} during initialization')) 753 assert_not_equal(ret, 0) # Exit code must indicate failure 754 self.running = False 755 self.process = None 756 # Check stderr for expected message 757 if expected_msg is not None: 758 log_stderr.seek(0) 759 stderr = log_stderr.read().decode('utf-8').strip() 760 if match == ErrorMatch.PARTIAL_REGEX: 761 if re.search(expected_msg, stderr, flags=re.MULTILINE) is None: 762 self._raise_assertion_error( 763 'Expected message "{}" does not partially match stderr:\n"{}"'.format(expected_msg, stderr)) 764 elif match == ErrorMatch.FULL_REGEX: 765 if re.fullmatch(expected_msg, stderr) is None: 766 self._raise_assertion_error( 767 'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr)) 768 elif match == ErrorMatch.FULL_TEXT: 769 if expected_msg != stderr: 770 self._raise_assertion_error( 771 'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr)) 772 except subprocess.TimeoutExpired as e: 773 self.process.kill() 774 self.running = False 775 self.process = None 776 assert_msg = f'bitcoind should have exited within {self.rpc_timeout}s ' 777 if expected_msg is None: 778 assert_msg += "with an error" 779 else: 780 assert_msg += "with expected error " + expected_msg 781 assert_msg += f" (cmd: {e.cmd})" 782 783 # Raise assertion outside of except-block above in order for it not to be treated as a knock-on exception. 784 if assert_msg: 785 self._raise_assertion_error(assert_msg) 786 787 def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=None, wait_for_v2_handshake=True, expect_success=True, **kwargs): 788 """Add an inbound p2p connection to the node. 789 790 This method adds the p2p connection to the self.p2ps list and also 791 returns the connection to the caller. 792 793 When self.use_v2transport is True, TestNode advertises NODE_P2P_V2 service flag 794 795 An inbound connection is made from TestNode <------ P2PConnection 796 - if TestNode doesn't advertise NODE_P2P_V2 service, P2PConnection sends version message and v1 P2P is followed 797 - if TestNode advertises NODE_P2P_V2 service, (and if P2PConnections supports v2 P2P) 798 P2PConnection sends ellswift bytes and v2 P2P is followed 799 """ 800 if 'dstport' not in kwargs: 801 kwargs['dstport'] = p2p_port(self.index) 802 if 'dstaddr' not in kwargs: 803 kwargs['dstaddr'] = '127.0.0.1' 804 if supports_v2_p2p is None: 805 supports_v2_p2p = self.use_v2transport 806 807 if self.use_v2transport: 808 kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 809 supports_v2_p2p = self.use_v2transport and supports_v2_p2p 810 p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p)() 811 812 self.p2ps.append(p2p_conn) 813 if not expect_success: 814 return p2p_conn 815 p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False) 816 if supports_v2_p2p and wait_for_v2_handshake: 817 p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake) 818 if send_version: 819 p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg) 820 if wait_for_verack: 821 # Wait for the node to send us the version and verack 822 p2p_conn.wait_for_verack() 823 # At this point we have sent our version message and received the version and verack, however the full node 824 # has not yet received the verack from us (in reply to their version). So, the connection is not yet fully 825 # established (fSuccessfullyConnected). 826 # 827 # This shouldn't lead to any issues when sending messages, since the verack will be in-flight before the 828 # message we send. However, it might lead to races where we are expecting to receive a message. E.g. a 829 # transaction that will be added to the mempool as soon as we return here. 830 # 831 # So syncing here is redundant when we only want to send a message, but the cost is low (a few milliseconds) 832 # in comparison to the upside of making tests less fragile and unexpected intermittent errors less likely. 833 p2p_conn.sync_with_ping() 834 835 # Consistency check that the node received our user agent string. 836 # Find our connection in getpeerinfo by our address:port and theirs, as this combination is unique. 837 sockname = p2p_conn._transport.get_extra_info("socket").getsockname() 838 our_addr_and_port = f"{sockname[0]}:{sockname[1]}" 839 dst_addr_and_port = f"{p2p_conn.dstaddr}:{p2p_conn.dstport}" 840 info = [peer for peer in self.getpeerinfo() if peer["addr"] == our_addr_and_port and peer["addrbind"] == dst_addr_and_port] 841 assert_equal(len(info), 1) 842 assert_equal(info[0]["subver"], P2P_SUBVERSION) 843 844 return p2p_conn 845 846 def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, wait_for_disconnect=False, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=None, advertise_v2_p2p=None, **kwargs): 847 """Add an outbound p2p connection from node. Must be an 848 "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection. 849 850 This method adds the p2p connection to the self.p2ps list and returns 851 the connection to the caller. 852 853 p2p_idx must be different for simultaneously connected peers. When reusing it for the next peer 854 after disconnecting the previous one, it is necessary to wait for the disconnect to finish to avoid 855 a race condition. 856 857 Parameters: 858 supports_v2_p2p: whether p2p_conn supports v2 P2P or not 859 advertise_v2_p2p: whether p2p_conn is advertised to support v2 P2P or not 860 861 An outbound connection is made from TestNode -------> P2PConnection 862 - if P2PConnection doesn't advertise_v2_p2p, TestNode sends version message and v1 P2P is followed 863 - if P2PConnection both supports_v2_p2p and advertise_v2_p2p, TestNode sends ellswift bytes and v2 P2P is followed 864 - if P2PConnection doesn't supports_v2_p2p but advertise_v2_p2p, 865 TestNode sends ellswift bytes and P2PConnection disconnects, 866 TestNode reconnects by sending version message and v1 P2P is followed 867 """ 868 869 def addconnection_callback(address, port): 870 self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type)) 871 self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p) 872 873 if supports_v2_p2p is None: 874 supports_v2_p2p = self.use_v2transport 875 if advertise_v2_p2p is None: 876 advertise_v2_p2p = self.use_v2transport 877 878 if advertise_v2_p2p: 879 kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 880 assert self.use_v2transport # only a v2 TestNode could make a v2 outbound connection 881 882 # if P2PConnection is advertised to support v2 P2P when it doesn't actually support v2 P2P, 883 # reconnection needs to be attempted using v1 P2P by sending version message 884 reconnect = advertise_v2_p2p and not supports_v2_p2p 885 # P2PConnection needs to be advertised to support v2 P2P so that ellswift bytes are sent instead of msg_version 886 supports_v2_p2p = supports_v2_p2p and advertise_v2_p2p 887 p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p, reconnect=reconnect, **kwargs)() 888 889 if reconnect: 890 p2p_conn.wait_for_reconnect() 891 892 if connection_type == "feeler" or wait_for_disconnect: 893 # feeler connections are closed as soon as the node receives a `version` message 894 p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False) 895 p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False) 896 else: 897 p2p_conn.wait_for_connect() 898 self.p2ps.append(p2p_conn) 899 900 if supports_v2_p2p: 901 p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake) 902 p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg) 903 if wait_for_verack: 904 p2p_conn.wait_for_verack() 905 p2p_conn.sync_with_ping() 906 907 return p2p_conn 908 909 def num_test_p2p_connections(self): 910 """Return number of test framework p2p connections to the node.""" 911 return len([peer for peer in self.getpeerinfo() if peer['subver'] == P2P_SUBVERSION]) 912 913 def disconnect_p2ps(self): 914 """Close all p2p connections to the node. 915 The state of the peers (such as txrequests) may not be fully cleared 916 yet, even after this method returns.""" 917 for p in self.p2ps: 918 p.peer_disconnect() 919 del self.p2ps[:] 920 921 self.wait_until(lambda: self.num_test_p2p_connections() == 0) 922 923 def bumpmocktime(self, seconds): 924 """Fast forward using setmocktime to self.mocktime + seconds. Requires setmocktime to have 925 been called at some point in the past.""" 926 assert self.mocktime 927 self.mocktime += seconds 928 self.setmocktime(self.mocktime) 929 930 def wait_until(self, test_function, timeout=60, check_interval=0.05): 931 return wait_until_helper_internal(test_function, timeout=timeout, timeout_factor=self.timeout_factor, check_interval=check_interval) 932 933 class TestNodeCLIAttr: 934 def __init__(self, cli, command): 935 self.cli = cli 936 self.command = command 937 938 def __call__(self, *args, **kwargs): 939 return self.cli.send_cli(self.command, *args, **kwargs) 940 941 def get_request(self, *args, **kwargs): 942 return lambda: self(*args, **kwargs) 943 944 945 def arg_to_cli(arg): 946 if isinstance(arg, bool): 947 return str(arg).lower() 948 elif arg is None: 949 return 'null' 950 elif isinstance(arg, dict) or isinstance(arg, list) or isinstance(arg, tuple): 951 return json.dumps(arg, default=serialization_fallback) 952 else: 953 return str(arg) 954 955 956 class TestNodeCLI(): 957 """Interface to bitcoin-cli for an individual node""" 958 def __init__(self, binaries, datadir, rpc_timeout): 959 self.options = [] 960 self.binaries = binaries 961 self.datadir = datadir 962 self.rpc_timeout = rpc_timeout 963 self.input = None 964 self.log = logging.getLogger('TestFramework.bitcoincli') 965 966 def __call__(self, *options, input=None): 967 # TestNodeCLI is callable with bitcoin-cli command-line options 968 cli = TestNodeCLI(self.binaries, self.datadir, self.rpc_timeout) 969 cli.options = [str(o) for o in options] 970 cli.input = input 971 return cli 972 973 def __getattr__(self, command): 974 return TestNodeCLIAttr(self, command) 975 976 def batch(self, requests): 977 results = [] 978 for request in requests: 979 try: 980 results.append(dict(result=request())) 981 except JSONRPCException as e: 982 results.append(dict(error=e)) 983 return results 984 985 def send_cli(self, clicommand=None, *args, **kwargs): 986 """Run bitcoin-cli command. Deserializes returned string as python object.""" 987 pos_args = [arg_to_cli(arg) for arg in args] 988 named_args = [key + "=" + arg_to_cli(value) for (key, value) in kwargs.items() if value is not None] 989 p_args = self.binaries.rpc_argv() + [ 990 f"-datadir={self.datadir}", 991 f"-rpcclienttimeout={int(self.rpc_timeout)}", 992 ] + self.options 993 if named_args: 994 p_args += ["-named"] 995 base_arg_pos = len(p_args) 996 if clicommand is not None: 997 p_args += [clicommand] 998 p_args += pos_args + named_args 999 1000 # TEST_CLI_MAX_ARG_SIZE is set low enough that checking the string 1001 # length is enough and encoding to bytes is not needed before 1002 # calculating the sum. 1003 sum_arg_size = sum(len(arg) for arg in p_args) 1004 stdin_data = self.input 1005 if sum_arg_size >= TEST_CLI_MAX_ARG_SIZE: 1006 self.log.debug(f"Cli: Command size {sum_arg_size} too large, using stdin") 1007 rpc_args = "\n".join([arg for arg in p_args[base_arg_pos:]]) 1008 if stdin_data is not None: 1009 stdin_data += "\n" + rpc_args 1010 else: 1011 stdin_data = rpc_args 1012 p_args = p_args[:base_arg_pos] + ['-stdin'] 1013 1014 self.log.debug("Running bitcoin-cli {}".format(p_args[2:])) 1015 process = subprocess.Popen(p_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 1016 cli_stdout, cli_stderr = process.communicate(input=stdin_data) 1017 returncode = process.poll() 1018 if returncode: 1019 match = re.match(r'error code: ([-0-9]+)\nerror message:\n(.*)', cli_stderr) 1020 if match: 1021 code, message = match.groups() 1022 raise JSONRPCException(dict(code=int(code), message=message)) 1023 # Ignore cli_stdout, raise with cli_stderr 1024 raise subprocess.CalledProcessError(returncode, p_args, output=cli_stderr) 1025 try: 1026 if not cli_stdout.strip(): 1027 return None 1028 return json.loads(cli_stdout, parse_float=decimal.Decimal) 1029 except (json.JSONDecodeError, decimal.InvalidOperation): 1030 return cli_stdout.rstrip("\n")