test_node.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Class for bitcoind node under test""" 6 7 import contextlib 8 import decimal 9 import errno 10 from enum import Enum 11 import http.client 12 import json 13 import logging 14 import os 15 import platform 16 import re 17 import subprocess 18 import tempfile 19 import time 20 import urllib.parse 21 import collections 22 import shlex 23 from pathlib import Path 24 25 from .authproxy import ( 26 JSONRPCException, 27 serialization_fallback, 28 ) 29 from .descriptors import descsum_create 30 from .messages import NODE_P2P_V2 31 from .p2p import P2P_SERVICES, P2P_SUBVERSION 32 from .util import ( 33 MAX_NODES, 34 assert_equal, 35 append_config, 36 delete_cookie_file, 37 get_auth_cookie, 38 get_rpc_proxy, 39 rpc_url, 40 wait_until_helper_internal, 41 p2p_port, 42 ) 43 44 BITCOIND_PROC_WAIT_TIMEOUT = 60 45 46 47 class FailedToStartError(Exception): 48 """Raised when a node fails to start correctly.""" 49 50 51 class ErrorMatch(Enum): 52 FULL_TEXT = 1 53 FULL_REGEX = 2 54 PARTIAL_REGEX = 3 55 56 57 class TestNode(): 58 """A class for representing a bitcoind node under test. 59 60 This class contains: 61 62 - state about the node (whether it's running, etc) 63 - a Python subprocess.Popen object representing the running process 64 - an RPC connection to the node 65 - one or more P2P connections to the node 66 67 68 To make things easier for the test writer, any unrecognised messages will 69 be dispatched to the RPC connection.""" 70 71 def __init__(self, i, datadir_path, *, chain, rpchost, timewait, timeout_factor, bitcoind, bitcoin_cli, coverage_dir, cwd, extra_conf=None, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, descriptors=False, v2transport=False): 72 """ 73 Kwargs: 74 start_perf (bool): If True, begin profiling the node with `perf` as soon as 75 the node starts. 76 """ 77 78 self.index = i 79 self.p2p_conn_index = 1 80 self.datadir_path = datadir_path 81 self.bitcoinconf = self.datadir_path / "bitcoin.conf" 82 self.stdout_dir = self.datadir_path / "stdout" 83 self.stderr_dir = self.datadir_path / "stderr" 84 self.chain = chain 85 self.rpchost = rpchost 86 self.rpc_timeout = timewait 87 self.binary = bitcoind 88 self.coverage_dir = coverage_dir 89 self.cwd = cwd 90 self.descriptors = descriptors 91 if extra_conf is not None: 92 append_config(self.datadir_path, extra_conf) 93 # Most callers will just need to add extra args to the standard list below. 94 # For those callers that need more flexibility, they can just set the args property directly. 95 # Note that common args are set in the config file (see initialize_datadir) 96 self.extra_args = extra_args 97 self.version = version 98 # Configuration for logging is set as command-line args rather than in the bitcoin.conf file. 99 # This means that starting a bitcoind using the temp dir to debug a failed test won't 100 # spam debug.log. 101 self.args = [ 102 self.binary, 103 f"-datadir={self.datadir_path}", 104 "-logtimemicros", 105 "-debug", 106 "-debugexclude=libevent", 107 "-debugexclude=leveldb", 108 "-debugexclude=rand", 109 "-uacomment=testnode%d" % i, 110 ] 111 if self.descriptors is None: 112 self.args.append("-disablewallet") 113 114 # Use valgrind, expect for previous release binaries 115 if use_valgrind and version is None: 116 default_suppressions_file = Path(__file__).parents[3] / "contrib" / "valgrind.supp" 117 suppressions_file = os.getenv("VALGRIND_SUPPRESSIONS_FILE", 118 default_suppressions_file) 119 self.args = ["valgrind", "--suppressions={}".format(suppressions_file), 120 "--gen-suppressions=all", "--exit-on-first-error=yes", 121 "--error-exitcode=1", "--quiet"] + self.args 122 123 if self.version_is_at_least(190000): 124 self.args.append("-logthreadnames") 125 if self.version_is_at_least(219900): 126 self.args.append("-logsourcelocations") 127 if self.version_is_at_least(239000): 128 self.args.append("-loglevel=trace") 129 130 # Default behavior from global -v2transport flag is added to args to persist it over restarts. 131 # May be overwritten in individual tests, using extra_args. 132 self.default_to_v2 = v2transport 133 if self.version_is_at_least(260000): 134 # 26.0 and later support v2transport 135 if v2transport: 136 self.args.append("-v2transport=1") 137 else: 138 self.args.append("-v2transport=0") 139 # if v2transport is requested via global flag but not supported for node version, ignore it 140 141 self.cli = TestNodeCLI(bitcoin_cli, self.datadir_path) 142 self.use_cli = use_cli 143 self.start_perf = start_perf 144 145 self.running = False 146 self.process = None 147 self.rpc_connected = False 148 self.rpc = None 149 self.url = None 150 self.log = logging.getLogger('TestFramework.node%d' % i) 151 self.cleanup_on_exit = True # Whether to kill the node when this object goes away 152 # Cache perf subprocesses here by their data output filename. 153 self.perf_subprocesses = {} 154 155 self.p2ps = [] 156 self.timeout_factor = timeout_factor 157 158 self.mocktime = None 159 160 AddressKeyPair = collections.namedtuple('AddressKeyPair', ['address', 'key']) 161 PRIV_KEYS = [ 162 # address , privkey 163 AddressKeyPair('mjTkW3DjgyZck4KbiRusZsqTgaYTxdSz6z', 'cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW'), 164 AddressKeyPair('msX6jQXvxiNhx3Q62PKeLPrhrqZQdSimTg', 'cUxsWyKyZ9MAQTaAhUQWJmBbSvHMwSmuv59KgxQV7oZQU3PXN3KE'), 165 AddressKeyPair('mnonCMyH9TmAsSj3M59DsbH8H63U3RKoFP', 'cTrh7dkEAeJd6b3MRX9bZK8eRmNqVCMH3LSUkE3dSFDyzjU38QxK'), 166 AddressKeyPair('mqJupas8Dt2uestQDvV2NH3RU8uZh2dqQR', 'cVuKKa7gbehEQvVq717hYcbE9Dqmq7KEBKqWgWrYBa2CKKrhtRim'), 167 AddressKeyPair('msYac7Rvd5ywm6pEmkjyxhbCDKqWsVeYws', 'cQDCBuKcjanpXDpCqacNSjYfxeQj8G6CAtH1Dsk3cXyqLNC4RPuh'), 168 AddressKeyPair('n2rnuUnwLgXqf9kk2kjvVm8R5BZK1yxQBi', 'cQakmfPSLSqKHyMFGwAqKHgWUiofJCagVGhiB4KCainaeCSxeyYq'), 169 AddressKeyPair('myzuPxRwsf3vvGzEuzPfK9Nf2RfwauwYe6', 'cQMpDLJwA8DBe9NcQbdoSb1BhmFxVjWD5gRyrLZCtpuF9Zi3a9RK'), 170 AddressKeyPair('mumwTaMtbxEPUswmLBBN3vM9oGRtGBrys8', 'cSXmRKXVcoouhNNVpcNKFfxsTsToY5pvB9DVsFksF1ENunTzRKsy'), 171 AddressKeyPair('mpV7aGShMkJCZgbW7F6iZgrvuPHjZjH9qg', 'cSoXt6tm3pqy43UMabY6eUTmR3eSUYFtB2iNQDGgb3VUnRsQys2k'), 172 AddressKeyPair('mq4fBNdckGtvY2mijd9am7DRsbRB4KjUkf', 'cN55daf1HotwBAgAKWVgDcoppmUNDtQSfb7XLutTLeAgVc3u8hik'), 173 AddressKeyPair('mpFAHDjX7KregM3rVotdXzQmkbwtbQEnZ6', 'cT7qK7g1wkYEMvKowd2ZrX1E5f6JQ7TM246UfqbCiyF7kZhorpX3'), 174 AddressKeyPair('mzRe8QZMfGi58KyWCse2exxEFry2sfF2Y7', 'cPiRWE8KMjTRxH1MWkPerhfoHFn5iHPWVK5aPqjW8NxmdwenFinJ'), 175 ] 176 177 def get_deterministic_priv_key(self): 178 """Return a deterministic priv key in base58, that only depends on the node's index""" 179 assert len(self.PRIV_KEYS) == MAX_NODES 180 return self.PRIV_KEYS[self.index] 181 182 def _node_msg(self, msg: str) -> str: 183 """Return a modified msg that identifies this node by its index as a debugging aid.""" 184 return "[node %d] %s" % (self.index, msg) 185 186 def _raise_assertion_error(self, msg: str): 187 """Raise an AssertionError with msg modified to identify this node.""" 188 raise AssertionError(self._node_msg(msg)) 189 190 def __del__(self): 191 # Ensure that we don't leave any bitcoind processes lying around after 192 # the test ends 193 if self.process and self.cleanup_on_exit: 194 # Should only happen on test failure 195 # Avoid using logger, as that may have already been shutdown when 196 # this destructor is called. 197 print(self._node_msg("Cleaning up leftover process")) 198 self.process.kill() 199 200 def __getattr__(self, name): 201 """Dispatches any unrecognised messages to the RPC connection or a CLI instance.""" 202 if self.use_cli: 203 return getattr(RPCOverloadWrapper(self.cli, True, self.descriptors), name) 204 else: 205 assert self.rpc_connected and self.rpc is not None, self._node_msg("Error: no RPC connection") 206 return getattr(RPCOverloadWrapper(self.rpc, descriptors=self.descriptors), name) 207 208 def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, env=None, **kwargs): 209 """Start the node.""" 210 if extra_args is None: 211 extra_args = self.extra_args 212 213 self.use_v2transport = "-v2transport=1" in extra_args or (self.default_to_v2 and "-v2transport=0" not in extra_args) 214 215 # Add a new stdout and stderr file each time bitcoind is started 216 if stderr is None: 217 stderr = tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) 218 if stdout is None: 219 stdout = tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) 220 self.stderr = stderr 221 self.stdout = stdout 222 223 if cwd is None: 224 cwd = self.cwd 225 226 # Delete any existing cookie file -- if such a file exists (eg due to 227 # unclean shutdown), it will get overwritten anyway by bitcoind, and 228 # potentially interfere with our attempt to authenticate 229 delete_cookie_file(self.datadir_path, self.chain) 230 231 # add environment variable LIBC_FATAL_STDERR_=1 so that libc errors are written to stderr and not the terminal 232 subp_env = dict(os.environ, LIBC_FATAL_STDERR_="1") 233 if env is not None: 234 subp_env.update(env) 235 236 self.process = subprocess.Popen(self.args + extra_args, env=subp_env, stdout=stdout, stderr=stderr, cwd=cwd, **kwargs) 237 238 self.running = True 239 self.log.debug("bitcoind started, waiting for RPC to come up") 240 241 if self.start_perf: 242 self._start_perf() 243 244 def wait_for_rpc_connection(self): 245 """Sets up an RPC connection to the bitcoind process. Returns False if unable to connect.""" 246 # Poll at a rate of four times per second 247 poll_per_s = 4 248 for _ in range(poll_per_s * self.rpc_timeout): 249 if self.process.poll() is not None: 250 # Attach abrupt shutdown error/s to the exception message 251 self.stderr.seek(0) 252 str_error = ''.join(line.decode('utf-8') for line in self.stderr) 253 str_error += "************************\n" if str_error else '' 254 255 raise FailedToStartError(self._node_msg( 256 f'bitcoind exited with status {self.process.returncode} during initialization. {str_error}')) 257 try: 258 rpc = get_rpc_proxy( 259 rpc_url(self.datadir_path, self.index, self.chain, self.rpchost), 260 self.index, 261 timeout=self.rpc_timeout // 2, # Shorter timeout to allow for one retry in case of ETIMEDOUT 262 coveragedir=self.coverage_dir, 263 ) 264 rpc.getblockcount() 265 # If the call to getblockcount() succeeds then the RPC connection is up 266 if self.version_is_at_least(190000): 267 # getmempoolinfo.loaded is available since commit 268 # bb8ae2c (version 0.19.0) 269 self.wait_until(lambda: rpc.getmempoolinfo()['loaded']) 270 # Wait for the node to finish reindex, block import, and 271 # loading the mempool. Usually importing happens fast or 272 # even "immediate" when the node is started. However, there 273 # is no guarantee and sometimes ImportBlocks might finish 274 # later. This is going to cause intermittent test failures, 275 # because generally the tests assume the node is fully 276 # ready after being started. 277 # 278 # For example, the node will reject block messages from p2p 279 # when it is still importing with the error "Unexpected 280 # block message received" 281 # 282 # The wait is done here to make tests as robust as possible 283 # and prevent racy tests and intermittent failures as much 284 # as possible. Some tests might not need this, but the 285 # overhead is trivial, and the added guarantees are worth 286 # the minimal performance cost. 287 self.log.debug("RPC successfully started") 288 if self.use_cli: 289 return 290 self.rpc = rpc 291 self.rpc_connected = True 292 self.url = self.rpc.rpc_url 293 return 294 except JSONRPCException as e: # Initialization phase 295 # -28 RPC in warmup 296 # -342 Service unavailable, RPC server started but is shutting down due to error 297 if e.error['code'] != -28 and e.error['code'] != -342: 298 raise # unknown JSON RPC exception 299 except ConnectionResetError: 300 # This might happen when the RPC server is in warmup, but shut down before the call to getblockcount 301 # succeeds. Try again to properly raise the FailedToStartError 302 pass 303 except OSError as e: 304 if e.errno == errno.ETIMEDOUT: 305 pass # Treat identical to ConnectionResetError 306 elif e.errno == errno.ECONNREFUSED: 307 pass # Port not yet open? 308 else: 309 raise # unknown OS error 310 except ValueError as e: # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting 311 if "No RPC credentials" not in str(e): 312 raise 313 time.sleep(1.0 / poll_per_s) 314 self._raise_assertion_error("Unable to connect to bitcoind after {}s".format(self.rpc_timeout)) 315 316 def wait_for_cookie_credentials(self): 317 """Ensures auth cookie credentials can be read, e.g. for testing CLI with -rpcwait before RPC connection is up.""" 318 self.log.debug("Waiting for cookie credentials") 319 # Poll at a rate of four times per second. 320 poll_per_s = 4 321 for _ in range(poll_per_s * self.rpc_timeout): 322 try: 323 get_auth_cookie(self.datadir_path, self.chain) 324 self.log.debug("Cookie credentials successfully retrieved") 325 return 326 except ValueError: # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting 327 pass # so we continue polling until RPC credentials are retrieved 328 time.sleep(1.0 / poll_per_s) 329 self._raise_assertion_error("Unable to retrieve cookie credentials after {}s".format(self.rpc_timeout)) 330 331 def generate(self, nblocks, maxtries=1000000, **kwargs): 332 self.log.debug("TestNode.generate() dispatches `generate` call to `generatetoaddress`") 333 return self.generatetoaddress(nblocks=nblocks, address=self.get_deterministic_priv_key().address, maxtries=maxtries, **kwargs) 334 335 def generateblock(self, *args, invalid_call, **kwargs): 336 assert not invalid_call 337 return self.__getattr__('generateblock')(*args, **kwargs) 338 339 def generatetoaddress(self, *args, invalid_call, **kwargs): 340 assert not invalid_call 341 return self.__getattr__('generatetoaddress')(*args, **kwargs) 342 343 def generatetodescriptor(self, *args, invalid_call, **kwargs): 344 assert not invalid_call 345 return self.__getattr__('generatetodescriptor')(*args, **kwargs) 346 347 def setmocktime(self, timestamp): 348 """Wrapper for setmocktime RPC, sets self.mocktime""" 349 if timestamp == 0: 350 # setmocktime(0) resets to system time. 351 self.mocktime = None 352 else: 353 self.mocktime = timestamp 354 return self.__getattr__('setmocktime')(timestamp) 355 356 def get_wallet_rpc(self, wallet_name): 357 if self.use_cli: 358 return RPCOverloadWrapper(self.cli("-rpcwallet={}".format(wallet_name)), True, self.descriptors) 359 else: 360 assert self.rpc_connected and self.rpc, self._node_msg("RPC not connected") 361 wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name)) 362 return RPCOverloadWrapper(self.rpc / wallet_path, descriptors=self.descriptors) 363 364 def version_is_at_least(self, ver): 365 return self.version is None or self.version >= ver 366 367 def stop_node(self, expected_stderr='', *, wait=0, wait_until_stopped=True): 368 """Stop the node.""" 369 if not self.running: 370 return 371 self.log.debug("Stopping node") 372 try: 373 # Do not use wait argument when testing older nodes, e.g. in wallet_backwards_compatibility.py 374 if self.version_is_at_least(180000): 375 self.stop(wait=wait) 376 else: 377 self.stop() 378 except http.client.CannotSendRequest: 379 self.log.exception("Unable to stop node.") 380 381 # If there are any running perf processes, stop them. 382 for profile_name in tuple(self.perf_subprocesses.keys()): 383 self._stop_perf(profile_name) 384 385 del self.p2ps[:] 386 387 assert (not expected_stderr) or wait_until_stopped # Must wait to check stderr 388 if wait_until_stopped: 389 self.wait_until_stopped(expected_stderr=expected_stderr) 390 391 def is_node_stopped(self, *, expected_stderr="", expected_ret_code=0): 392 """Checks whether the node has stopped. 393 394 Returns True if the node has stopped. False otherwise. 395 This method is responsible for freeing resources (self.process).""" 396 if not self.running: 397 return True 398 return_code = self.process.poll() 399 if return_code is None: 400 return False 401 402 # process has stopped. Assert that it didn't return an error code. 403 assert return_code == expected_ret_code, self._node_msg( 404 f"Node returned unexpected exit code ({return_code}) vs ({expected_ret_code}) when stopping") 405 # Check that stderr is as expected 406 self.stderr.seek(0) 407 stderr = self.stderr.read().decode('utf-8').strip() 408 if stderr != expected_stderr: 409 raise AssertionError("Unexpected stderr {} != {}".format(stderr, expected_stderr)) 410 411 self.stdout.close() 412 self.stderr.close() 413 414 self.running = False 415 self.process = None 416 self.rpc_connected = False 417 self.rpc = None 418 self.log.debug("Node stopped") 419 return True 420 421 def wait_until_stopped(self, *, timeout=BITCOIND_PROC_WAIT_TIMEOUT, expect_error=False, **kwargs): 422 expected_ret_code = 1 if expect_error else 0 # Whether node shutdown return EXIT_FAILURE or EXIT_SUCCESS 423 self.wait_until(lambda: self.is_node_stopped(expected_ret_code=expected_ret_code, **kwargs), timeout=timeout) 424 425 def replace_in_config(self, replacements): 426 """ 427 Perform replacements in the configuration file. 428 The substitutions are passed as a list of search-replace-tuples, e.g. 429 [("old", "new"), ("foo", "bar"), ...] 430 """ 431 with open(self.bitcoinconf, 'r', encoding='utf8') as conf: 432 conf_data = conf.read() 433 for replacement in replacements: 434 assert_equal(len(replacement), 2) 435 old, new = replacement[0], replacement[1] 436 conf_data = conf_data.replace(old, new) 437 with open(self.bitcoinconf, 'w', encoding='utf8') as conf: 438 conf.write(conf_data) 439 440 @property 441 def chain_path(self) -> Path: 442 return self.datadir_path / self.chain 443 444 @property 445 def debug_log_path(self) -> Path: 446 return self.chain_path / 'debug.log' 447 448 @property 449 def blocks_path(self) -> Path: 450 return self.chain_path / "blocks" 451 452 @property 453 def wallets_path(self) -> Path: 454 return self.chain_path / "wallets" 455 456 def debug_log_size(self, **kwargs) -> int: 457 with open(self.debug_log_path, **kwargs) as dl: 458 dl.seek(0, 2) 459 return dl.tell() 460 461 @contextlib.contextmanager 462 def assert_debug_log(self, expected_msgs, unexpected_msgs=None, timeout=2): 463 if unexpected_msgs is None: 464 unexpected_msgs = [] 465 assert_equal(type(expected_msgs), list) 466 assert_equal(type(unexpected_msgs), list) 467 468 time_end = time.time() + timeout * self.timeout_factor 469 prev_size = self.debug_log_size(encoding="utf-8") # Must use same encoding that is used to read() below 470 471 yield 472 473 while True: 474 found = True 475 with open(self.debug_log_path, encoding="utf-8", errors="replace") as dl: 476 dl.seek(prev_size) 477 log = dl.read() 478 print_log = " - " + "\n - ".join(log.splitlines()) 479 for unexpected_msg in unexpected_msgs: 480 if re.search(re.escape(unexpected_msg), log, flags=re.MULTILINE): 481 self._raise_assertion_error('Unexpected message "{}" partially matches log:\n\n{}\n\n'.format(unexpected_msg, print_log)) 482 for expected_msg in expected_msgs: 483 if re.search(re.escape(expected_msg), log, flags=re.MULTILINE) is None: 484 found = False 485 if found: 486 return 487 if time.time() >= time_end: 488 break 489 time.sleep(0.05) 490 self._raise_assertion_error('Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(str(expected_msgs), print_log)) 491 492 @contextlib.contextmanager 493 def wait_for_debug_log(self, expected_msgs, timeout=60): 494 """ 495 Block until we see a particular debug log message fragment or until we exceed the timeout. 496 Return: 497 the number of log lines we encountered when matching 498 """ 499 time_end = time.time() + timeout * self.timeout_factor 500 prev_size = self.debug_log_size(mode="rb") # Must use same mode that is used to read() below 501 502 yield 503 504 while True: 505 found = True 506 with open(self.debug_log_path, "rb") as dl: 507 dl.seek(prev_size) 508 log = dl.read() 509 510 for expected_msg in expected_msgs: 511 if expected_msg not in log: 512 found = False 513 514 if found: 515 return 516 517 if time.time() >= time_end: 518 print_log = " - " + "\n - ".join(log.decode("utf8", errors="replace").splitlines()) 519 break 520 521 # No sleep here because we want to detect the message fragment as fast as 522 # possible. 523 524 self._raise_assertion_error( 525 'Expected messages "{}" does not partially match log:\n\n{}\n\n'.format( 526 str(expected_msgs), print_log)) 527 528 @contextlib.contextmanager 529 def wait_for_new_peer(self, timeout=5): 530 """ 531 Wait until the node is connected to at least one new peer. We detect this 532 by watching for an increased highest peer id, using the `getpeerinfo` RPC call. 533 Note that the simpler approach of only accounting for the number of peers 534 suffers from race conditions, as disconnects from unrelated previous peers 535 could happen anytime in-between. 536 """ 537 def get_highest_peer_id(): 538 peer_info = self.getpeerinfo() 539 return peer_info[-1]["id"] if peer_info else -1 540 541 initial_peer_id = get_highest_peer_id() 542 yield 543 self.wait_until(lambda: get_highest_peer_id() > initial_peer_id, timeout=timeout) 544 545 @contextlib.contextmanager 546 def profile_with_perf(self, profile_name: str): 547 """ 548 Context manager that allows easy profiling of node activity using `perf`. 549 550 See `test/functional/README.md` for details on perf usage. 551 552 Args: 553 profile_name: This string will be appended to the 554 profile data filename generated by perf. 555 """ 556 subp = self._start_perf(profile_name) 557 558 yield 559 560 if subp: 561 self._stop_perf(profile_name) 562 563 def _start_perf(self, profile_name=None): 564 """Start a perf process to profile this node. 565 566 Returns the subprocess running perf.""" 567 subp = None 568 569 def test_success(cmd): 570 return subprocess.call( 571 # shell=True required for pipe use below 572 cmd, shell=True, 573 stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) == 0 574 575 if platform.system() != 'Linux': 576 self.log.warning("Can't profile with perf; only available on Linux platforms") 577 return None 578 579 if not test_success('which perf'): 580 self.log.warning("Can't profile with perf; must install perf-tools") 581 return None 582 583 if not test_success('readelf -S {} | grep .debug_str'.format(shlex.quote(self.binary))): 584 self.log.warning( 585 "perf output won't be very useful without debug symbols compiled into bitcoind") 586 587 output_path = tempfile.NamedTemporaryFile( 588 dir=self.datadir_path, 589 prefix="{}.perf.data.".format(profile_name or 'test'), 590 delete=False, 591 ).name 592 593 cmd = [ 594 'perf', 'record', 595 '-g', # Record the callgraph. 596 '--call-graph', 'dwarf', # Compatibility for gcc's --fomit-frame-pointer. 597 '-F', '101', # Sampling frequency in Hz. 598 '-p', str(self.process.pid), 599 '-o', output_path, 600 ] 601 subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 602 self.perf_subprocesses[profile_name] = subp 603 604 return subp 605 606 def _stop_perf(self, profile_name): 607 """Stop (and pop) a perf subprocess.""" 608 subp = self.perf_subprocesses.pop(profile_name) 609 output_path = subp.args[subp.args.index('-o') + 1] 610 611 subp.terminate() 612 subp.wait(timeout=10) 613 614 stderr = subp.stderr.read().decode() 615 if 'Consider tweaking /proc/sys/kernel/perf_event_paranoid' in stderr: 616 self.log.warning( 617 "perf couldn't collect data! Try " 618 "'sudo sysctl -w kernel.perf_event_paranoid=-1'") 619 else: 620 report_cmd = "perf report -i {}".format(output_path) 621 self.log.info("See perf output by running '{}'".format(report_cmd)) 622 623 def assert_start_raises_init_error(self, extra_args=None, expected_msg=None, match=ErrorMatch.FULL_TEXT, *args, **kwargs): 624 """Attempt to start the node and expect it to raise an error. 625 626 extra_args: extra arguments to pass through to bitcoind 627 expected_msg: regex that stderr should match when bitcoind fails 628 629 Will throw if bitcoind starts without an error. 630 Will throw if an expected_msg is provided and it does not match bitcoind's stdout.""" 631 assert not self.running 632 with tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) as log_stderr, \ 633 tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) as log_stdout: 634 try: 635 self.start(extra_args, stdout=log_stdout, stderr=log_stderr, *args, **kwargs) 636 ret = self.process.wait(timeout=self.rpc_timeout) 637 self.log.debug(self._node_msg(f'bitcoind exited with status {ret} during initialization')) 638 assert ret != 0 # Exit code must indicate failure 639 self.running = False 640 self.process = None 641 # Check stderr for expected message 642 if expected_msg is not None: 643 log_stderr.seek(0) 644 stderr = log_stderr.read().decode('utf-8').strip() 645 if match == ErrorMatch.PARTIAL_REGEX: 646 if re.search(expected_msg, stderr, flags=re.MULTILINE) is None: 647 self._raise_assertion_error( 648 'Expected message "{}" does not partially match stderr:\n"{}"'.format(expected_msg, stderr)) 649 elif match == ErrorMatch.FULL_REGEX: 650 if re.fullmatch(expected_msg, stderr) is None: 651 self._raise_assertion_error( 652 'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr)) 653 elif match == ErrorMatch.FULL_TEXT: 654 if expected_msg != stderr: 655 self._raise_assertion_error( 656 'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr)) 657 except subprocess.TimeoutExpired: 658 self.process.kill() 659 self.running = False 660 self.process = None 661 assert_msg = f'bitcoind should have exited within {self.rpc_timeout}s ' 662 if expected_msg is None: 663 assert_msg += "with an error" 664 else: 665 assert_msg += "with expected error " + expected_msg 666 self._raise_assertion_error(assert_msg) 667 668 def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=None, wait_for_v2_handshake=True, **kwargs): 669 """Add an inbound p2p connection to the node. 670 671 This method adds the p2p connection to the self.p2ps list and also 672 returns the connection to the caller. 673 674 When self.use_v2transport is True, TestNode advertises NODE_P2P_V2 service flag 675 676 An inbound connection is made from TestNode <------ P2PConnection 677 - if TestNode doesn't advertise NODE_P2P_V2 service, P2PConnection sends version message and v1 P2P is followed 678 - if TestNode advertises NODE_P2P_V2 service, (and if P2PConnections supports v2 P2P) 679 P2PConnection sends ellswift bytes and v2 P2P is followed 680 """ 681 if 'dstport' not in kwargs: 682 kwargs['dstport'] = p2p_port(self.index) 683 if 'dstaddr' not in kwargs: 684 kwargs['dstaddr'] = '127.0.0.1' 685 if supports_v2_p2p is None: 686 supports_v2_p2p = self.use_v2transport 687 688 689 p2p_conn.p2p_connected_to_node = True 690 if self.use_v2transport: 691 kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 692 supports_v2_p2p = self.use_v2transport and supports_v2_p2p 693 p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p)() 694 695 self.p2ps.append(p2p_conn) 696 p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False) 697 if supports_v2_p2p and wait_for_v2_handshake: 698 p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake) 699 if send_version: 700 p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg) 701 if wait_for_verack: 702 # Wait for the node to send us the version and verack 703 p2p_conn.wait_for_verack() 704 # At this point we have sent our version message and received the version and verack, however the full node 705 # has not yet received the verack from us (in reply to their version). So, the connection is not yet fully 706 # established (fSuccessfullyConnected). 707 # 708 # This shouldn't lead to any issues when sending messages, since the verack will be in-flight before the 709 # message we send. However, it might lead to races where we are expecting to receive a message. E.g. a 710 # transaction that will be added to the mempool as soon as we return here. 711 # 712 # So syncing here is redundant when we only want to send a message, but the cost is low (a few milliseconds) 713 # in comparison to the upside of making tests less fragile and unexpected intermittent errors less likely. 714 p2p_conn.sync_with_ping() 715 716 # Consistency check that the node received our user agent string. 717 # Find our connection in getpeerinfo by our address:port and theirs, as this combination is unique. 718 sockname = p2p_conn._transport.get_extra_info("socket").getsockname() 719 our_addr_and_port = f"{sockname[0]}:{sockname[1]}" 720 dst_addr_and_port = f"{p2p_conn.dstaddr}:{p2p_conn.dstport}" 721 info = [peer for peer in self.getpeerinfo() if peer["addr"] == our_addr_and_port and peer["addrbind"] == dst_addr_and_port] 722 assert_equal(len(info), 1) 723 assert_equal(info[0]["subver"], P2P_SUBVERSION) 724 725 return p2p_conn 726 727 def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, wait_for_disconnect=False, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=None, advertise_v2_p2p=None, **kwargs): 728 """Add an outbound p2p connection from node. Must be an 729 "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection. 730 731 This method adds the p2p connection to the self.p2ps list and returns 732 the connection to the caller. 733 734 p2p_idx must be different for simultaneously connected peers. When reusing it for the next peer 735 after disconnecting the previous one, it is necessary to wait for the disconnect to finish to avoid 736 a race condition. 737 738 Parameters: 739 supports_v2_p2p: whether p2p_conn supports v2 P2P or not 740 advertise_v2_p2p: whether p2p_conn is advertised to support v2 P2P or not 741 742 An outbound connection is made from TestNode -------> P2PConnection 743 - if P2PConnection doesn't advertise_v2_p2p, TestNode sends version message and v1 P2P is followed 744 - if P2PConnection both supports_v2_p2p and advertise_v2_p2p, TestNode sends ellswift bytes and v2 P2P is followed 745 - if P2PConnection doesn't supports_v2_p2p but advertise_v2_p2p, 746 TestNode sends ellswift bytes and P2PConnection disconnects, 747 TestNode reconnects by sending version message and v1 P2P is followed 748 """ 749 750 def addconnection_callback(address, port): 751 self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type)) 752 self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p) 753 754 p2p_conn.p2p_connected_to_node = False 755 if supports_v2_p2p is None: 756 supports_v2_p2p = self.use_v2transport 757 if advertise_v2_p2p is None: 758 advertise_v2_p2p = self.use_v2transport 759 760 if advertise_v2_p2p: 761 kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 762 assert self.use_v2transport # only a v2 TestNode could make a v2 outbound connection 763 764 # if P2PConnection is advertised to support v2 P2P when it doesn't actually support v2 P2P, 765 # reconnection needs to be attempted using v1 P2P by sending version message 766 reconnect = advertise_v2_p2p and not supports_v2_p2p 767 # P2PConnection needs to be advertised to support v2 P2P so that ellswift bytes are sent instead of msg_version 768 supports_v2_p2p = supports_v2_p2p and advertise_v2_p2p 769 p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p, reconnect=reconnect, **kwargs)() 770 771 if reconnect: 772 p2p_conn.wait_for_reconnect() 773 774 if connection_type == "feeler" or wait_for_disconnect: 775 # feeler connections are closed as soon as the node receives a `version` message 776 p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False) 777 p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False) 778 else: 779 p2p_conn.wait_for_connect() 780 self.p2ps.append(p2p_conn) 781 782 if supports_v2_p2p: 783 p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake) 784 p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg) 785 if wait_for_verack: 786 p2p_conn.wait_for_verack() 787 p2p_conn.sync_with_ping() 788 789 return p2p_conn 790 791 def num_test_p2p_connections(self): 792 """Return number of test framework p2p connections to the node.""" 793 return len([peer for peer in self.getpeerinfo() if peer['subver'] == P2P_SUBVERSION]) 794 795 def disconnect_p2ps(self): 796 """Close all p2p connections to the node. 797 Use only after each p2p has sent a version message to ensure the wait works.""" 798 for p in self.p2ps: 799 p.peer_disconnect() 800 del self.p2ps[:] 801 802 self.wait_until(lambda: self.num_test_p2p_connections() == 0) 803 804 def bumpmocktime(self, seconds): 805 """Fast forward using setmocktime to self.mocktime + seconds. Requires setmocktime to have 806 been called at some point in the past.""" 807 assert self.mocktime 808 self.mocktime += seconds 809 self.setmocktime(self.mocktime) 810 811 def wait_until(self, test_function, timeout=60): 812 return wait_until_helper_internal(test_function, timeout=timeout, timeout_factor=self.timeout_factor) 813 814 815 class TestNodeCLIAttr: 816 def __init__(self, cli, command): 817 self.cli = cli 818 self.command = command 819 820 def __call__(self, *args, **kwargs): 821 return self.cli.send_cli(self.command, *args, **kwargs) 822 823 def get_request(self, *args, **kwargs): 824 return lambda: self(*args, **kwargs) 825 826 827 def arg_to_cli(arg): 828 if isinstance(arg, bool): 829 return str(arg).lower() 830 elif arg is None: 831 return 'null' 832 elif isinstance(arg, dict) or isinstance(arg, list): 833 return json.dumps(arg, default=serialization_fallback) 834 else: 835 return str(arg) 836 837 838 class TestNodeCLI(): 839 """Interface to bitcoin-cli for an individual node""" 840 def __init__(self, binary, datadir): 841 self.options = [] 842 self.binary = binary 843 self.datadir = datadir 844 self.input = None 845 self.log = logging.getLogger('TestFramework.bitcoincli') 846 847 def __call__(self, *options, input=None): 848 # TestNodeCLI is callable with bitcoin-cli command-line options 849 cli = TestNodeCLI(self.binary, self.datadir) 850 cli.options = [str(o) for o in options] 851 cli.input = input 852 return cli 853 854 def __getattr__(self, command): 855 return TestNodeCLIAttr(self, command) 856 857 def batch(self, requests): 858 results = [] 859 for request in requests: 860 try: 861 results.append(dict(result=request())) 862 except JSONRPCException as e: 863 results.append(dict(error=e)) 864 return results 865 866 def send_cli(self, clicommand=None, *args, **kwargs): 867 """Run bitcoin-cli command. Deserializes returned string as python object.""" 868 pos_args = [arg_to_cli(arg) for arg in args] 869 named_args = [str(key) + "=" + arg_to_cli(value) for (key, value) in kwargs.items()] 870 p_args = [self.binary, f"-datadir={self.datadir}"] + self.options 871 if named_args: 872 p_args += ["-named"] 873 if clicommand is not None: 874 p_args += [clicommand] 875 p_args += pos_args + named_args 876 self.log.debug("Running bitcoin-cli {}".format(p_args[2:])) 877 process = subprocess.Popen(p_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 878 cli_stdout, cli_stderr = process.communicate(input=self.input) 879 returncode = process.poll() 880 if returncode: 881 match = re.match(r'error code: ([-0-9]+)\nerror message:\n(.*)', cli_stderr) 882 if match: 883 code, message = match.groups() 884 raise JSONRPCException(dict(code=int(code), message=message)) 885 # Ignore cli_stdout, raise with cli_stderr 886 raise subprocess.CalledProcessError(returncode, self.binary, output=cli_stderr) 887 try: 888 return json.loads(cli_stdout, parse_float=decimal.Decimal) 889 except (json.JSONDecodeError, decimal.InvalidOperation): 890 return cli_stdout.rstrip("\n") 891 892 class RPCOverloadWrapper(): 893 def __init__(self, rpc, cli=False, descriptors=False): 894 self.rpc = rpc 895 self.is_cli = cli 896 self.descriptors = descriptors 897 898 def __getattr__(self, name): 899 return getattr(self.rpc, name) 900 901 def createwallet_passthrough(self, *args, **kwargs): 902 return self.__getattr__("createwallet")(*args, **kwargs) 903 904 def createwallet(self, wallet_name, disable_private_keys=None, blank=None, passphrase='', avoid_reuse=None, descriptors=None, load_on_startup=None, external_signer=None): 905 if descriptors is None: 906 descriptors = self.descriptors 907 return self.__getattr__('createwallet')(wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors, load_on_startup, external_signer) 908 909 def importprivkey(self, privkey, label=None, rescan=None): 910 wallet_info = self.getwalletinfo() 911 if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']): 912 return self.__getattr__('importprivkey')(privkey, label, rescan) 913 desc = descsum_create('combo(' + privkey + ')') 914 req = [{ 915 'desc': desc, 916 'timestamp': 0 if rescan else 'now', 917 'label': label if label else '' 918 }] 919 import_res = self.importdescriptors(req) 920 if not import_res[0]['success']: 921 raise JSONRPCException(import_res[0]['error']) 922 923 def addmultisigaddress(self, nrequired, keys, label=None, address_type=None): 924 wallet_info = self.getwalletinfo() 925 if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']): 926 return self.__getattr__('addmultisigaddress')(nrequired, keys, label, address_type) 927 cms = self.createmultisig(nrequired, keys, address_type) 928 req = [{ 929 'desc': cms['descriptor'], 930 'timestamp': 0, 931 'label': label if label else '' 932 }] 933 import_res = self.importdescriptors(req) 934 if not import_res[0]['success']: 935 raise JSONRPCException(import_res[0]['error']) 936 return cms 937 938 def importpubkey(self, pubkey, label=None, rescan=None): 939 wallet_info = self.getwalletinfo() 940 if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']): 941 return self.__getattr__('importpubkey')(pubkey, label, rescan) 942 desc = descsum_create('combo(' + pubkey + ')') 943 req = [{ 944 'desc': desc, 945 'timestamp': 0 if rescan else 'now', 946 'label': label if label else '' 947 }] 948 import_res = self.importdescriptors(req) 949 if not import_res[0]['success']: 950 raise JSONRPCException(import_res[0]['error']) 951 952 def importaddress(self, address, label=None, rescan=None, p2sh=None): 953 wallet_info = self.getwalletinfo() 954 if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']): 955 return self.__getattr__('importaddress')(address, label, rescan, p2sh) 956 is_hex = False 957 try: 958 int(address ,16) 959 is_hex = True 960 desc = descsum_create('raw(' + address + ')') 961 except Exception: 962 desc = descsum_create('addr(' + address + ')') 963 reqs = [{ 964 'desc': desc, 965 'timestamp': 0 if rescan else 'now', 966 'label': label if label else '' 967 }] 968 if is_hex and p2sh: 969 reqs.append({ 970 'desc': descsum_create('p2sh(raw(' + address + '))'), 971 'timestamp': 0 if rescan else 'now', 972 'label': label if label else '' 973 }) 974 import_res = self.importdescriptors(reqs) 975 for res in import_res: 976 if not res['success']: 977 raise JSONRPCException(res['error'])