/ test / functional / test_framework / test_node.py
test_node.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2017-2022 The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Class for bitcoind node under test"""
  6  
  7  import contextlib
  8  import decimal
  9  import errno
 10  from enum import Enum
 11  import http.client
 12  import json
 13  import logging
 14  import os
 15  import platform
 16  import re
 17  import subprocess
 18  import tempfile
 19  import time
 20  import urllib.parse
 21  import collections
 22  import shlex
 23  from pathlib import Path
 24  
 25  from .authproxy import (
 26      JSONRPCException,
 27      serialization_fallback,
 28  )
 29  from .descriptors import descsum_create
 30  from .messages import NODE_P2P_V2
 31  from .p2p import P2P_SERVICES, P2P_SUBVERSION
 32  from .util import (
 33      MAX_NODES,
 34      assert_equal,
 35      append_config,
 36      delete_cookie_file,
 37      get_auth_cookie,
 38      get_rpc_proxy,
 39      rpc_url,
 40      wait_until_helper_internal,
 41      p2p_port,
 42  )
 43  
 44  BITCOIND_PROC_WAIT_TIMEOUT = 60
 45  
 46  
 47  class FailedToStartError(Exception):
 48      """Raised when a node fails to start correctly."""
 49  
 50  
 51  class ErrorMatch(Enum):
 52      FULL_TEXT = 1
 53      FULL_REGEX = 2
 54      PARTIAL_REGEX = 3
 55  
 56  
 57  class TestNode():
 58      """A class for representing a bitcoind node under test.
 59  
 60      This class contains:
 61  
 62      - state about the node (whether it's running, etc)
 63      - a Python subprocess.Popen object representing the running process
 64      - an RPC connection to the node
 65      - one or more P2P connections to the node
 66  
 67  
 68      To make things easier for the test writer, any unrecognised messages will
 69      be dispatched to the RPC connection."""
 70  
 71      def __init__(self, i, datadir_path, *, chain, rpchost, timewait, timeout_factor, bitcoind, bitcoin_cli, coverage_dir, cwd, extra_conf=None, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, descriptors=False, v2transport=False):
 72          """
 73          Kwargs:
 74              start_perf (bool): If True, begin profiling the node with `perf` as soon as
 75                  the node starts.
 76          """
 77  
 78          self.index = i
 79          self.p2p_conn_index = 1
 80          self.datadir_path = datadir_path
 81          self.bitcoinconf = self.datadir_path / "bitcoin.conf"
 82          self.stdout_dir = self.datadir_path / "stdout"
 83          self.stderr_dir = self.datadir_path / "stderr"
 84          self.chain = chain
 85          self.rpchost = rpchost
 86          self.rpc_timeout = timewait
 87          self.binary = bitcoind
 88          self.coverage_dir = coverage_dir
 89          self.cwd = cwd
 90          self.descriptors = descriptors
 91          if extra_conf is not None:
 92              append_config(self.datadir_path, extra_conf)
 93          # Most callers will just need to add extra args to the standard list below.
 94          # For those callers that need more flexibility, they can just set the args property directly.
 95          # Note that common args are set in the config file (see initialize_datadir)
 96          self.extra_args = extra_args
 97          self.version = version
 98          # Configuration for logging is set as command-line args rather than in the bitcoin.conf file.
 99          # This means that starting a bitcoind using the temp dir to debug a failed test won't
100          # spam debug.log.
101          self.args = [
102              self.binary,
103              f"-datadir={self.datadir_path}",
104              "-logtimemicros",
105              "-debug",
106              "-debugexclude=libevent",
107              "-debugexclude=leveldb",
108              "-debugexclude=rand",
109              "-uacomment=testnode%d" % i,
110          ]
111          if self.descriptors is None:
112              self.args.append("-disablewallet")
113  
114          # Use valgrind, expect for previous release binaries
115          if use_valgrind and version is None:
116              default_suppressions_file = Path(__file__).parents[3] / "contrib" / "valgrind.supp"
117              suppressions_file = os.getenv("VALGRIND_SUPPRESSIONS_FILE",
118                                            default_suppressions_file)
119              self.args = ["valgrind", "--suppressions={}".format(suppressions_file),
120                           "--gen-suppressions=all", "--exit-on-first-error=yes",
121                           "--error-exitcode=1", "--quiet"] + self.args
122  
123          if self.version_is_at_least(190000):
124              self.args.append("-logthreadnames")
125          if self.version_is_at_least(219900):
126              self.args.append("-logsourcelocations")
127          if self.version_is_at_least(239000):
128              self.args.append("-loglevel=trace")
129  
130          # Default behavior from global -v2transport flag is added to args to persist it over restarts.
131          # May be overwritten in individual tests, using extra_args.
132          self.default_to_v2 = v2transport
133          if self.version_is_at_least(260000):
134              # 26.0 and later support v2transport
135              if v2transport:
136                  self.args.append("-v2transport=1")
137              else:
138                  self.args.append("-v2transport=0")
139          # if v2transport is requested via global flag but not supported for node version, ignore it
140  
141          self.cli = TestNodeCLI(bitcoin_cli, self.datadir_path)
142          self.use_cli = use_cli
143          self.start_perf = start_perf
144  
145          self.running = False
146          self.process = None
147          self.rpc_connected = False
148          self.rpc = None
149          self.url = None
150          self.log = logging.getLogger('TestFramework.node%d' % i)
151          self.cleanup_on_exit = True # Whether to kill the node when this object goes away
152          # Cache perf subprocesses here by their data output filename.
153          self.perf_subprocesses = {}
154  
155          self.p2ps = []
156          self.timeout_factor = timeout_factor
157  
158          self.mocktime = None
159  
160      AddressKeyPair = collections.namedtuple('AddressKeyPair', ['address', 'key'])
161      PRIV_KEYS = [
162              # address , privkey
163              AddressKeyPair('mjTkW3DjgyZck4KbiRusZsqTgaYTxdSz6z', 'cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW'),
164              AddressKeyPair('msX6jQXvxiNhx3Q62PKeLPrhrqZQdSimTg', 'cUxsWyKyZ9MAQTaAhUQWJmBbSvHMwSmuv59KgxQV7oZQU3PXN3KE'),
165              AddressKeyPair('mnonCMyH9TmAsSj3M59DsbH8H63U3RKoFP', 'cTrh7dkEAeJd6b3MRX9bZK8eRmNqVCMH3LSUkE3dSFDyzjU38QxK'),
166              AddressKeyPair('mqJupas8Dt2uestQDvV2NH3RU8uZh2dqQR', 'cVuKKa7gbehEQvVq717hYcbE9Dqmq7KEBKqWgWrYBa2CKKrhtRim'),
167              AddressKeyPair('msYac7Rvd5ywm6pEmkjyxhbCDKqWsVeYws', 'cQDCBuKcjanpXDpCqacNSjYfxeQj8G6CAtH1Dsk3cXyqLNC4RPuh'),
168              AddressKeyPair('n2rnuUnwLgXqf9kk2kjvVm8R5BZK1yxQBi', 'cQakmfPSLSqKHyMFGwAqKHgWUiofJCagVGhiB4KCainaeCSxeyYq'),
169              AddressKeyPair('myzuPxRwsf3vvGzEuzPfK9Nf2RfwauwYe6', 'cQMpDLJwA8DBe9NcQbdoSb1BhmFxVjWD5gRyrLZCtpuF9Zi3a9RK'),
170              AddressKeyPair('mumwTaMtbxEPUswmLBBN3vM9oGRtGBrys8', 'cSXmRKXVcoouhNNVpcNKFfxsTsToY5pvB9DVsFksF1ENunTzRKsy'),
171              AddressKeyPair('mpV7aGShMkJCZgbW7F6iZgrvuPHjZjH9qg', 'cSoXt6tm3pqy43UMabY6eUTmR3eSUYFtB2iNQDGgb3VUnRsQys2k'),
172              AddressKeyPair('mq4fBNdckGtvY2mijd9am7DRsbRB4KjUkf', 'cN55daf1HotwBAgAKWVgDcoppmUNDtQSfb7XLutTLeAgVc3u8hik'),
173              AddressKeyPair('mpFAHDjX7KregM3rVotdXzQmkbwtbQEnZ6', 'cT7qK7g1wkYEMvKowd2ZrX1E5f6JQ7TM246UfqbCiyF7kZhorpX3'),
174              AddressKeyPair('mzRe8QZMfGi58KyWCse2exxEFry2sfF2Y7', 'cPiRWE8KMjTRxH1MWkPerhfoHFn5iHPWVK5aPqjW8NxmdwenFinJ'),
175      ]
176  
177      def get_deterministic_priv_key(self):
178          """Return a deterministic priv key in base58, that only depends on the node's index"""
179          assert len(self.PRIV_KEYS) == MAX_NODES
180          return self.PRIV_KEYS[self.index]
181  
182      def _node_msg(self, msg: str) -> str:
183          """Return a modified msg that identifies this node by its index as a debugging aid."""
184          return "[node %d] %s" % (self.index, msg)
185  
186      def _raise_assertion_error(self, msg: str):
187          """Raise an AssertionError with msg modified to identify this node."""
188          raise AssertionError(self._node_msg(msg))
189  
190      def __del__(self):
191          # Ensure that we don't leave any bitcoind processes lying around after
192          # the test ends
193          if self.process and self.cleanup_on_exit:
194              # Should only happen on test failure
195              # Avoid using logger, as that may have already been shutdown when
196              # this destructor is called.
197              print(self._node_msg("Cleaning up leftover process"))
198              self.process.kill()
199  
200      def __getattr__(self, name):
201          """Dispatches any unrecognised messages to the RPC connection or a CLI instance."""
202          if self.use_cli:
203              return getattr(RPCOverloadWrapper(self.cli, True, self.descriptors), name)
204          else:
205              assert self.rpc_connected and self.rpc is not None, self._node_msg("Error: no RPC connection")
206              return getattr(RPCOverloadWrapper(self.rpc, descriptors=self.descriptors), name)
207  
208      def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, env=None, **kwargs):
209          """Start the node."""
210          if extra_args is None:
211              extra_args = self.extra_args
212  
213          self.use_v2transport = "-v2transport=1" in extra_args or (self.default_to_v2 and "-v2transport=0" not in extra_args)
214  
215          # Add a new stdout and stderr file each time bitcoind is started
216          if stderr is None:
217              stderr = tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False)
218          if stdout is None:
219              stdout = tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False)
220          self.stderr = stderr
221          self.stdout = stdout
222  
223          if cwd is None:
224              cwd = self.cwd
225  
226          # Delete any existing cookie file -- if such a file exists (eg due to
227          # unclean shutdown), it will get overwritten anyway by bitcoind, and
228          # potentially interfere with our attempt to authenticate
229          delete_cookie_file(self.datadir_path, self.chain)
230  
231          # add environment variable LIBC_FATAL_STDERR_=1 so that libc errors are written to stderr and not the terminal
232          subp_env = dict(os.environ, LIBC_FATAL_STDERR_="1")
233          if env is not None:
234              subp_env.update(env)
235  
236          self.process = subprocess.Popen(self.args + extra_args, env=subp_env, stdout=stdout, stderr=stderr, cwd=cwd, **kwargs)
237  
238          self.running = True
239          self.log.debug("bitcoind started, waiting for RPC to come up")
240  
241          if self.start_perf:
242              self._start_perf()
243  
244      def wait_for_rpc_connection(self):
245          """Sets up an RPC connection to the bitcoind process. Returns False if unable to connect."""
246          # Poll at a rate of four times per second
247          poll_per_s = 4
248          for _ in range(poll_per_s * self.rpc_timeout):
249              if self.process.poll() is not None:
250                  # Attach abrupt shutdown error/s to the exception message
251                  self.stderr.seek(0)
252                  str_error = ''.join(line.decode('utf-8') for line in self.stderr)
253                  str_error += "************************\n" if str_error else ''
254  
255                  raise FailedToStartError(self._node_msg(
256                      f'bitcoind exited with status {self.process.returncode} during initialization. {str_error}'))
257              try:
258                  rpc = get_rpc_proxy(
259                      rpc_url(self.datadir_path, self.index, self.chain, self.rpchost),
260                      self.index,
261                      timeout=self.rpc_timeout // 2,  # Shorter timeout to allow for one retry in case of ETIMEDOUT
262                      coveragedir=self.coverage_dir,
263                  )
264                  rpc.getblockcount()
265                  # If the call to getblockcount() succeeds then the RPC connection is up
266                  if self.version_is_at_least(190000):
267                      # getmempoolinfo.loaded is available since commit
268                      # bb8ae2c (version 0.19.0)
269                      self.wait_until(lambda: rpc.getmempoolinfo()['loaded'])
270                      # Wait for the node to finish reindex, block import, and
271                      # loading the mempool. Usually importing happens fast or
272                      # even "immediate" when the node is started. However, there
273                      # is no guarantee and sometimes ImportBlocks might finish
274                      # later. This is going to cause intermittent test failures,
275                      # because generally the tests assume the node is fully
276                      # ready after being started.
277                      #
278                      # For example, the node will reject block messages from p2p
279                      # when it is still importing with the error "Unexpected
280                      # block message received"
281                      #
282                      # The wait is done here to make tests as robust as possible
283                      # and prevent racy tests and intermittent failures as much
284                      # as possible. Some tests might not need this, but the
285                      # overhead is trivial, and the added guarantees are worth
286                      # the minimal performance cost.
287                  self.log.debug("RPC successfully started")
288                  if self.use_cli:
289                      return
290                  self.rpc = rpc
291                  self.rpc_connected = True
292                  self.url = self.rpc.rpc_url
293                  return
294              except JSONRPCException as e:  # Initialization phase
295                  # -28 RPC in warmup
296                  # -342 Service unavailable, RPC server started but is shutting down due to error
297                  if e.error['code'] != -28 and e.error['code'] != -342:
298                      raise  # unknown JSON RPC exception
299              except ConnectionResetError:
300                  # This might happen when the RPC server is in warmup, but shut down before the call to getblockcount
301                  # succeeds. Try again to properly raise the FailedToStartError
302                  pass
303              except OSError as e:
304                  if e.errno == errno.ETIMEDOUT:
305                      pass  # Treat identical to ConnectionResetError
306                  elif e.errno == errno.ECONNREFUSED:
307                      pass  # Port not yet open?
308                  else:
309                      raise  # unknown OS error
310              except ValueError as e:  # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting
311                  if "No RPC credentials" not in str(e):
312                      raise
313              time.sleep(1.0 / poll_per_s)
314          self._raise_assertion_error("Unable to connect to bitcoind after {}s".format(self.rpc_timeout))
315  
316      def wait_for_cookie_credentials(self):
317          """Ensures auth cookie credentials can be read, e.g. for testing CLI with -rpcwait before RPC connection is up."""
318          self.log.debug("Waiting for cookie credentials")
319          # Poll at a rate of four times per second.
320          poll_per_s = 4
321          for _ in range(poll_per_s * self.rpc_timeout):
322              try:
323                  get_auth_cookie(self.datadir_path, self.chain)
324                  self.log.debug("Cookie credentials successfully retrieved")
325                  return
326              except ValueError:  # cookie file not found and no rpcuser or rpcpassword; bitcoind is still starting
327                  pass            # so we continue polling until RPC credentials are retrieved
328              time.sleep(1.0 / poll_per_s)
329          self._raise_assertion_error("Unable to retrieve cookie credentials after {}s".format(self.rpc_timeout))
330  
331      def generate(self, nblocks, maxtries=1000000, **kwargs):
332          self.log.debug("TestNode.generate() dispatches `generate` call to `generatetoaddress`")
333          return self.generatetoaddress(nblocks=nblocks, address=self.get_deterministic_priv_key().address, maxtries=maxtries, **kwargs)
334  
335      def generateblock(self, *args, invalid_call, **kwargs):
336          assert not invalid_call
337          return self.__getattr__('generateblock')(*args, **kwargs)
338  
339      def generatetoaddress(self, *args, invalid_call, **kwargs):
340          assert not invalid_call
341          return self.__getattr__('generatetoaddress')(*args, **kwargs)
342  
343      def generatetodescriptor(self, *args, invalid_call, **kwargs):
344          assert not invalid_call
345          return self.__getattr__('generatetodescriptor')(*args, **kwargs)
346  
347      def setmocktime(self, timestamp):
348          """Wrapper for setmocktime RPC, sets self.mocktime"""
349          if timestamp == 0:
350              # setmocktime(0) resets to system time.
351              self.mocktime = None
352          else:
353              self.mocktime = timestamp
354          return self.__getattr__('setmocktime')(timestamp)
355  
356      def get_wallet_rpc(self, wallet_name):
357          if self.use_cli:
358              return RPCOverloadWrapper(self.cli("-rpcwallet={}".format(wallet_name)), True, self.descriptors)
359          else:
360              assert self.rpc_connected and self.rpc, self._node_msg("RPC not connected")
361              wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name))
362              return RPCOverloadWrapper(self.rpc / wallet_path, descriptors=self.descriptors)
363  
364      def version_is_at_least(self, ver):
365          return self.version is None or self.version >= ver
366  
367      def stop_node(self, expected_stderr='', *, wait=0, wait_until_stopped=True):
368          """Stop the node."""
369          if not self.running:
370              return
371          self.log.debug("Stopping node")
372          try:
373              # Do not use wait argument when testing older nodes, e.g. in wallet_backwards_compatibility.py
374              if self.version_is_at_least(180000):
375                  self.stop(wait=wait)
376              else:
377                  self.stop()
378          except http.client.CannotSendRequest:
379              self.log.exception("Unable to stop node.")
380  
381          # If there are any running perf processes, stop them.
382          for profile_name in tuple(self.perf_subprocesses.keys()):
383              self._stop_perf(profile_name)
384  
385          del self.p2ps[:]
386  
387          assert (not expected_stderr) or wait_until_stopped  # Must wait to check stderr
388          if wait_until_stopped:
389              self.wait_until_stopped(expected_stderr=expected_stderr)
390  
391      def is_node_stopped(self, *, expected_stderr="", expected_ret_code=0):
392          """Checks whether the node has stopped.
393  
394          Returns True if the node has stopped. False otherwise.
395          This method is responsible for freeing resources (self.process)."""
396          if not self.running:
397              return True
398          return_code = self.process.poll()
399          if return_code is None:
400              return False
401  
402          # process has stopped. Assert that it didn't return an error code.
403          assert return_code == expected_ret_code, self._node_msg(
404              f"Node returned unexpected exit code ({return_code}) vs ({expected_ret_code}) when stopping")
405          # Check that stderr is as expected
406          self.stderr.seek(0)
407          stderr = self.stderr.read().decode('utf-8').strip()
408          if stderr != expected_stderr:
409              raise AssertionError("Unexpected stderr {} != {}".format(stderr, expected_stderr))
410  
411          self.stdout.close()
412          self.stderr.close()
413  
414          self.running = False
415          self.process = None
416          self.rpc_connected = False
417          self.rpc = None
418          self.log.debug("Node stopped")
419          return True
420  
421      def wait_until_stopped(self, *, timeout=BITCOIND_PROC_WAIT_TIMEOUT, expect_error=False, **kwargs):
422          expected_ret_code = 1 if expect_error else 0  # Whether node shutdown return EXIT_FAILURE or EXIT_SUCCESS
423          self.wait_until(lambda: self.is_node_stopped(expected_ret_code=expected_ret_code, **kwargs), timeout=timeout)
424  
425      def replace_in_config(self, replacements):
426          """
427          Perform replacements in the configuration file.
428          The substitutions are passed as a list of search-replace-tuples, e.g.
429              [("old", "new"), ("foo", "bar"), ...]
430          """
431          with open(self.bitcoinconf, 'r', encoding='utf8') as conf:
432              conf_data = conf.read()
433          for replacement in replacements:
434              assert_equal(len(replacement), 2)
435              old, new = replacement[0], replacement[1]
436              conf_data = conf_data.replace(old, new)
437          with open(self.bitcoinconf, 'w', encoding='utf8') as conf:
438              conf.write(conf_data)
439  
440      @property
441      def chain_path(self) -> Path:
442          return self.datadir_path / self.chain
443  
444      @property
445      def debug_log_path(self) -> Path:
446          return self.chain_path / 'debug.log'
447  
448      @property
449      def blocks_path(self) -> Path:
450          return self.chain_path / "blocks"
451  
452      @property
453      def wallets_path(self) -> Path:
454          return self.chain_path / "wallets"
455  
456      def debug_log_size(self, **kwargs) -> int:
457          with open(self.debug_log_path, **kwargs) as dl:
458              dl.seek(0, 2)
459              return dl.tell()
460  
461      @contextlib.contextmanager
462      def assert_debug_log(self, expected_msgs, unexpected_msgs=None, timeout=2):
463          if unexpected_msgs is None:
464              unexpected_msgs = []
465          assert_equal(type(expected_msgs), list)
466          assert_equal(type(unexpected_msgs), list)
467  
468          time_end = time.time() + timeout * self.timeout_factor
469          prev_size = self.debug_log_size(encoding="utf-8")  # Must use same encoding that is used to read() below
470  
471          yield
472  
473          while True:
474              found = True
475              with open(self.debug_log_path, encoding="utf-8", errors="replace") as dl:
476                  dl.seek(prev_size)
477                  log = dl.read()
478              print_log = " - " + "\n - ".join(log.splitlines())
479              for unexpected_msg in unexpected_msgs:
480                  if re.search(re.escape(unexpected_msg), log, flags=re.MULTILINE):
481                      self._raise_assertion_error('Unexpected message "{}" partially matches log:\n\n{}\n\n'.format(unexpected_msg, print_log))
482              for expected_msg in expected_msgs:
483                  if re.search(re.escape(expected_msg), log, flags=re.MULTILINE) is None:
484                      found = False
485              if found:
486                  return
487              if time.time() >= time_end:
488                  break
489              time.sleep(0.05)
490          self._raise_assertion_error('Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(str(expected_msgs), print_log))
491  
492      @contextlib.contextmanager
493      def wait_for_debug_log(self, expected_msgs, timeout=60):
494          """
495          Block until we see a particular debug log message fragment or until we exceed the timeout.
496          Return:
497              the number of log lines we encountered when matching
498          """
499          time_end = time.time() + timeout * self.timeout_factor
500          prev_size = self.debug_log_size(mode="rb")  # Must use same mode that is used to read() below
501  
502          yield
503  
504          while True:
505              found = True
506              with open(self.debug_log_path, "rb") as dl:
507                  dl.seek(prev_size)
508                  log = dl.read()
509  
510              for expected_msg in expected_msgs:
511                  if expected_msg not in log:
512                      found = False
513  
514              if found:
515                  return
516  
517              if time.time() >= time_end:
518                  print_log = " - " + "\n - ".join(log.decode("utf8", errors="replace").splitlines())
519                  break
520  
521              # No sleep here because we want to detect the message fragment as fast as
522              # possible.
523  
524          self._raise_assertion_error(
525              'Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(
526                  str(expected_msgs), print_log))
527  
528      @contextlib.contextmanager
529      def wait_for_new_peer(self, timeout=5):
530          """
531          Wait until the node is connected to at least one new peer. We detect this
532          by watching for an increased highest peer id, using the `getpeerinfo` RPC call.
533          Note that the simpler approach of only accounting for the number of peers
534          suffers from race conditions, as disconnects from unrelated previous peers
535          could happen anytime in-between.
536          """
537          def get_highest_peer_id():
538              peer_info = self.getpeerinfo()
539              return peer_info[-1]["id"] if peer_info else -1
540  
541          initial_peer_id = get_highest_peer_id()
542          yield
543          self.wait_until(lambda: get_highest_peer_id() > initial_peer_id, timeout=timeout)
544  
545      @contextlib.contextmanager
546      def profile_with_perf(self, profile_name: str):
547          """
548          Context manager that allows easy profiling of node activity using `perf`.
549  
550          See `test/functional/README.md` for details on perf usage.
551  
552          Args:
553              profile_name: This string will be appended to the
554                  profile data filename generated by perf.
555          """
556          subp = self._start_perf(profile_name)
557  
558          yield
559  
560          if subp:
561              self._stop_perf(profile_name)
562  
563      def _start_perf(self, profile_name=None):
564          """Start a perf process to profile this node.
565  
566          Returns the subprocess running perf."""
567          subp = None
568  
569          def test_success(cmd):
570              return subprocess.call(
571                  # shell=True required for pipe use below
572                  cmd, shell=True,
573                  stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) == 0
574  
575          if platform.system() != 'Linux':
576              self.log.warning("Can't profile with perf; only available on Linux platforms")
577              return None
578  
579          if not test_success('which perf'):
580              self.log.warning("Can't profile with perf; must install perf-tools")
581              return None
582  
583          if not test_success('readelf -S {} | grep .debug_str'.format(shlex.quote(self.binary))):
584              self.log.warning(
585                  "perf output won't be very useful without debug symbols compiled into bitcoind")
586  
587          output_path = tempfile.NamedTemporaryFile(
588              dir=self.datadir_path,
589              prefix="{}.perf.data.".format(profile_name or 'test'),
590              delete=False,
591          ).name
592  
593          cmd = [
594              'perf', 'record',
595              '-g',                     # Record the callgraph.
596              '--call-graph', 'dwarf',  # Compatibility for gcc's --fomit-frame-pointer.
597              '-F', '101',              # Sampling frequency in Hz.
598              '-p', str(self.process.pid),
599              '-o', output_path,
600          ]
601          subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
602          self.perf_subprocesses[profile_name] = subp
603  
604          return subp
605  
606      def _stop_perf(self, profile_name):
607          """Stop (and pop) a perf subprocess."""
608          subp = self.perf_subprocesses.pop(profile_name)
609          output_path = subp.args[subp.args.index('-o') + 1]
610  
611          subp.terminate()
612          subp.wait(timeout=10)
613  
614          stderr = subp.stderr.read().decode()
615          if 'Consider tweaking /proc/sys/kernel/perf_event_paranoid' in stderr:
616              self.log.warning(
617                  "perf couldn't collect data! Try "
618                  "'sudo sysctl -w kernel.perf_event_paranoid=-1'")
619          else:
620              report_cmd = "perf report -i {}".format(output_path)
621              self.log.info("See perf output by running '{}'".format(report_cmd))
622  
623      def assert_start_raises_init_error(self, extra_args=None, expected_msg=None, match=ErrorMatch.FULL_TEXT, *args, **kwargs):
624          """Attempt to start the node and expect it to raise an error.
625  
626          extra_args: extra arguments to pass through to bitcoind
627          expected_msg: regex that stderr should match when bitcoind fails
628  
629          Will throw if bitcoind starts without an error.
630          Will throw if an expected_msg is provided and it does not match bitcoind's stdout."""
631          assert not self.running
632          with tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) as log_stderr, \
633               tempfile.NamedTemporaryFile(dir=self.stdout_dir, delete=False) as log_stdout:
634              try:
635                  self.start(extra_args, stdout=log_stdout, stderr=log_stderr, *args, **kwargs)
636                  ret = self.process.wait(timeout=self.rpc_timeout)
637                  self.log.debug(self._node_msg(f'bitcoind exited with status {ret} during initialization'))
638                  assert ret != 0  # Exit code must indicate failure
639                  self.running = False
640                  self.process = None
641                  # Check stderr for expected message
642                  if expected_msg is not None:
643                      log_stderr.seek(0)
644                      stderr = log_stderr.read().decode('utf-8').strip()
645                      if match == ErrorMatch.PARTIAL_REGEX:
646                          if re.search(expected_msg, stderr, flags=re.MULTILINE) is None:
647                              self._raise_assertion_error(
648                                  'Expected message "{}" does not partially match stderr:\n"{}"'.format(expected_msg, stderr))
649                      elif match == ErrorMatch.FULL_REGEX:
650                          if re.fullmatch(expected_msg, stderr) is None:
651                              self._raise_assertion_error(
652                                  'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr))
653                      elif match == ErrorMatch.FULL_TEXT:
654                          if expected_msg != stderr:
655                              self._raise_assertion_error(
656                                  'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr))
657              except subprocess.TimeoutExpired:
658                  self.process.kill()
659                  self.running = False
660                  self.process = None
661                  assert_msg = f'bitcoind should have exited within {self.rpc_timeout}s '
662                  if expected_msg is None:
663                      assert_msg += "with an error"
664                  else:
665                      assert_msg += "with expected error " + expected_msg
666                  self._raise_assertion_error(assert_msg)
667  
668      def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=None, wait_for_v2_handshake=True, **kwargs):
669          """Add an inbound p2p connection to the node.
670  
671          This method adds the p2p connection to the self.p2ps list and also
672          returns the connection to the caller.
673  
674          When self.use_v2transport is True, TestNode advertises NODE_P2P_V2 service flag
675  
676          An inbound connection is made from TestNode <------ P2PConnection
677          - if TestNode doesn't advertise NODE_P2P_V2 service, P2PConnection sends version message and v1 P2P is followed
678          - if TestNode advertises NODE_P2P_V2 service, (and if P2PConnections supports v2 P2P)
679                  P2PConnection sends ellswift bytes and v2 P2P is followed
680          """
681          if 'dstport' not in kwargs:
682              kwargs['dstport'] = p2p_port(self.index)
683          if 'dstaddr' not in kwargs:
684              kwargs['dstaddr'] = '127.0.0.1'
685          if supports_v2_p2p is None:
686              supports_v2_p2p = self.use_v2transport
687  
688  
689          p2p_conn.p2p_connected_to_node = True
690          if self.use_v2transport:
691              kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
692          supports_v2_p2p = self.use_v2transport and supports_v2_p2p
693          p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p)()
694  
695          self.p2ps.append(p2p_conn)
696          p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False)
697          if supports_v2_p2p and wait_for_v2_handshake:
698              p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake)
699          if send_version:
700              p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg)
701          if wait_for_verack:
702              # Wait for the node to send us the version and verack
703              p2p_conn.wait_for_verack()
704              # At this point we have sent our version message and received the version and verack, however the full node
705              # has not yet received the verack from us (in reply to their version). So, the connection is not yet fully
706              # established (fSuccessfullyConnected).
707              #
708              # This shouldn't lead to any issues when sending messages, since the verack will be in-flight before the
709              # message we send. However, it might lead to races where we are expecting to receive a message. E.g. a
710              # transaction that will be added to the mempool as soon as we return here.
711              #
712              # So syncing here is redundant when we only want to send a message, but the cost is low (a few milliseconds)
713              # in comparison to the upside of making tests less fragile and unexpected intermittent errors less likely.
714              p2p_conn.sync_with_ping()
715  
716              # Consistency check that the node received our user agent string.
717              # Find our connection in getpeerinfo by our address:port and theirs, as this combination is unique.
718              sockname = p2p_conn._transport.get_extra_info("socket").getsockname()
719              our_addr_and_port = f"{sockname[0]}:{sockname[1]}"
720              dst_addr_and_port = f"{p2p_conn.dstaddr}:{p2p_conn.dstport}"
721              info = [peer for peer in self.getpeerinfo() if peer["addr"] == our_addr_and_port and peer["addrbind"] == dst_addr_and_port]
722              assert_equal(len(info), 1)
723              assert_equal(info[0]["subver"], P2P_SUBVERSION)
724  
725          return p2p_conn
726  
727      def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, wait_for_disconnect=False, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=None, advertise_v2_p2p=None, **kwargs):
728          """Add an outbound p2p connection from node. Must be an
729          "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection.
730  
731          This method adds the p2p connection to the self.p2ps list and returns
732          the connection to the caller.
733  
734          p2p_idx must be different for simultaneously connected peers. When reusing it for the next peer
735          after disconnecting the previous one, it is necessary to wait for the disconnect to finish to avoid
736          a race condition.
737  
738          Parameters:
739              supports_v2_p2p: whether p2p_conn supports v2 P2P or not
740              advertise_v2_p2p: whether p2p_conn is advertised to support v2 P2P or not
741  
742          An outbound connection is made from TestNode -------> P2PConnection
743              - if P2PConnection doesn't advertise_v2_p2p, TestNode sends version message and v1 P2P is followed
744              - if P2PConnection both supports_v2_p2p and advertise_v2_p2p, TestNode sends ellswift bytes and v2 P2P is followed
745              - if P2PConnection doesn't supports_v2_p2p but advertise_v2_p2p,
746                  TestNode sends ellswift bytes and P2PConnection disconnects,
747                  TestNode reconnects by sending version message and v1 P2P is followed
748          """
749  
750          def addconnection_callback(address, port):
751              self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type))
752              self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p)
753  
754          p2p_conn.p2p_connected_to_node = False
755          if supports_v2_p2p is None:
756              supports_v2_p2p = self.use_v2transport
757          if advertise_v2_p2p is None:
758              advertise_v2_p2p = self.use_v2transport
759  
760          if advertise_v2_p2p:
761              kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
762              assert self.use_v2transport  # only a v2 TestNode could make a v2 outbound connection
763  
764          # if P2PConnection is advertised to support v2 P2P when it doesn't actually support v2 P2P,
765          # reconnection needs to be attempted using v1 P2P by sending version message
766          reconnect = advertise_v2_p2p and not supports_v2_p2p
767          # P2PConnection needs to be advertised to support v2 P2P so that ellswift bytes are sent instead of msg_version
768          supports_v2_p2p = supports_v2_p2p and advertise_v2_p2p
769          p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p, reconnect=reconnect, **kwargs)()
770  
771          if reconnect:
772              p2p_conn.wait_for_reconnect()
773  
774          if connection_type == "feeler" or wait_for_disconnect:
775              # feeler connections are closed as soon as the node receives a `version` message
776              p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False)
777              p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False)
778          else:
779              p2p_conn.wait_for_connect()
780              self.p2ps.append(p2p_conn)
781  
782              if supports_v2_p2p:
783                  p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake)
784              p2p_conn.wait_until(lambda: not p2p_conn.on_connection_send_msg)
785              if wait_for_verack:
786                  p2p_conn.wait_for_verack()
787                  p2p_conn.sync_with_ping()
788  
789          return p2p_conn
790  
791      def num_test_p2p_connections(self):
792          """Return number of test framework p2p connections to the node."""
793          return len([peer for peer in self.getpeerinfo() if peer['subver'] == P2P_SUBVERSION])
794  
795      def disconnect_p2ps(self):
796          """Close all p2p connections to the node.
797          Use only after each p2p has sent a version message to ensure the wait works."""
798          for p in self.p2ps:
799              p.peer_disconnect()
800          del self.p2ps[:]
801  
802          self.wait_until(lambda: self.num_test_p2p_connections() == 0)
803  
804      def bumpmocktime(self, seconds):
805          """Fast forward using setmocktime to self.mocktime + seconds. Requires setmocktime to have
806          been called at some point in the past."""
807          assert self.mocktime
808          self.mocktime += seconds
809          self.setmocktime(self.mocktime)
810  
811      def wait_until(self, test_function, timeout=60):
812          return wait_until_helper_internal(test_function, timeout=timeout, timeout_factor=self.timeout_factor)
813  
814  
815  class TestNodeCLIAttr:
816      def __init__(self, cli, command):
817          self.cli = cli
818          self.command = command
819  
820      def __call__(self, *args, **kwargs):
821          return self.cli.send_cli(self.command, *args, **kwargs)
822  
823      def get_request(self, *args, **kwargs):
824          return lambda: self(*args, **kwargs)
825  
826  
827  def arg_to_cli(arg):
828      if isinstance(arg, bool):
829          return str(arg).lower()
830      elif arg is None:
831          return 'null'
832      elif isinstance(arg, dict) or isinstance(arg, list):
833          return json.dumps(arg, default=serialization_fallback)
834      else:
835          return str(arg)
836  
837  
838  class TestNodeCLI():
839      """Interface to bitcoin-cli for an individual node"""
840      def __init__(self, binary, datadir):
841          self.options = []
842          self.binary = binary
843          self.datadir = datadir
844          self.input = None
845          self.log = logging.getLogger('TestFramework.bitcoincli')
846  
847      def __call__(self, *options, input=None):
848          # TestNodeCLI is callable with bitcoin-cli command-line options
849          cli = TestNodeCLI(self.binary, self.datadir)
850          cli.options = [str(o) for o in options]
851          cli.input = input
852          return cli
853  
854      def __getattr__(self, command):
855          return TestNodeCLIAttr(self, command)
856  
857      def batch(self, requests):
858          results = []
859          for request in requests:
860              try:
861                  results.append(dict(result=request()))
862              except JSONRPCException as e:
863                  results.append(dict(error=e))
864          return results
865  
866      def send_cli(self, clicommand=None, *args, **kwargs):
867          """Run bitcoin-cli command. Deserializes returned string as python object."""
868          pos_args = [arg_to_cli(arg) for arg in args]
869          named_args = [str(key) + "=" + arg_to_cli(value) for (key, value) in kwargs.items()]
870          p_args = [self.binary, f"-datadir={self.datadir}"] + self.options
871          if named_args:
872              p_args += ["-named"]
873          if clicommand is not None:
874              p_args += [clicommand]
875          p_args += pos_args + named_args
876          self.log.debug("Running bitcoin-cli {}".format(p_args[2:]))
877          process = subprocess.Popen(p_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
878          cli_stdout, cli_stderr = process.communicate(input=self.input)
879          returncode = process.poll()
880          if returncode:
881              match = re.match(r'error code: ([-0-9]+)\nerror message:\n(.*)', cli_stderr)
882              if match:
883                  code, message = match.groups()
884                  raise JSONRPCException(dict(code=int(code), message=message))
885              # Ignore cli_stdout, raise with cli_stderr
886              raise subprocess.CalledProcessError(returncode, self.binary, output=cli_stderr)
887          try:
888              return json.loads(cli_stdout, parse_float=decimal.Decimal)
889          except (json.JSONDecodeError, decimal.InvalidOperation):
890              return cli_stdout.rstrip("\n")
891  
892  class RPCOverloadWrapper():
893      def __init__(self, rpc, cli=False, descriptors=False):
894          self.rpc = rpc
895          self.is_cli = cli
896          self.descriptors = descriptors
897  
898      def __getattr__(self, name):
899          return getattr(self.rpc, name)
900  
901      def createwallet_passthrough(self, *args, **kwargs):
902          return self.__getattr__("createwallet")(*args, **kwargs)
903  
904      def createwallet(self, wallet_name, disable_private_keys=None, blank=None, passphrase='', avoid_reuse=None, descriptors=None, load_on_startup=None, external_signer=None):
905          if descriptors is None:
906              descriptors = self.descriptors
907          return self.__getattr__('createwallet')(wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors, load_on_startup, external_signer)
908  
909      def importprivkey(self, privkey, label=None, rescan=None):
910          wallet_info = self.getwalletinfo()
911          if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
912              return self.__getattr__('importprivkey')(privkey, label, rescan)
913          desc = descsum_create('combo(' + privkey + ')')
914          req = [{
915              'desc': desc,
916              'timestamp': 0 if rescan else 'now',
917              'label': label if label else ''
918          }]
919          import_res = self.importdescriptors(req)
920          if not import_res[0]['success']:
921              raise JSONRPCException(import_res[0]['error'])
922  
923      def addmultisigaddress(self, nrequired, keys, label=None, address_type=None):
924          wallet_info = self.getwalletinfo()
925          if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
926              return self.__getattr__('addmultisigaddress')(nrequired, keys, label, address_type)
927          cms = self.createmultisig(nrequired, keys, address_type)
928          req = [{
929              'desc': cms['descriptor'],
930              'timestamp': 0,
931              'label': label if label else ''
932          }]
933          import_res = self.importdescriptors(req)
934          if not import_res[0]['success']:
935              raise JSONRPCException(import_res[0]['error'])
936          return cms
937  
938      def importpubkey(self, pubkey, label=None, rescan=None):
939          wallet_info = self.getwalletinfo()
940          if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
941              return self.__getattr__('importpubkey')(pubkey, label, rescan)
942          desc = descsum_create('combo(' + pubkey + ')')
943          req = [{
944              'desc': desc,
945              'timestamp': 0 if rescan else 'now',
946              'label': label if label else ''
947          }]
948          import_res = self.importdescriptors(req)
949          if not import_res[0]['success']:
950              raise JSONRPCException(import_res[0]['error'])
951  
952      def importaddress(self, address, label=None, rescan=None, p2sh=None):
953          wallet_info = self.getwalletinfo()
954          if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
955              return self.__getattr__('importaddress')(address, label, rescan, p2sh)
956          is_hex = False
957          try:
958              int(address ,16)
959              is_hex = True
960              desc = descsum_create('raw(' + address + ')')
961          except Exception:
962              desc = descsum_create('addr(' + address + ')')
963          reqs = [{
964              'desc': desc,
965              'timestamp': 0 if rescan else 'now',
966              'label': label if label else ''
967          }]
968          if is_hex and p2sh:
969              reqs.append({
970                  'desc': descsum_create('p2sh(raw(' + address + '))'),
971                  'timestamp': 0 if rescan else 'now',
972                  'label': label if label else ''
973              })
974          import_res = self.importdescriptors(reqs)
975          for res in import_res:
976              if not res['success']:
977                  raise JSONRPCException(res['error'])