/ test / functional / rpc_scanblocks.py
rpc_scanblocks.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2021-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test the scanblocks RPC call."""
  6  from test_framework.address import address_to_scriptpubkey
  7  from test_framework.blockfilter import (
  8      bip158_basic_element_hash,
  9      bip158_relevant_scriptpubkeys,
 10  )
 11  from test_framework.messages import COIN
 12  from test_framework.test_framework import BitcoinTestFramework
 13  from test_framework.util import (
 14      assert_equal,
 15      assert_raises_rpc_error,
 16  )
 17  from test_framework.wallet import (
 18      MiniWallet,
 19      getnewdestination,
 20  )
 21  
 22  
 23  class ScanblocksTest(BitcoinTestFramework):
 24      def set_test_params(self):
 25          self.num_nodes = 2
 26          self.extra_args = [["-blockfilterindex=1"], []]
 27  
 28      def run_test(self):
 29          node = self.nodes[0]
 30          wallet = MiniWallet(node)
 31  
 32          # send 1.0, mempool only
 33          _, spk_1, addr_1 = getnewdestination()
 34          wallet.send_to(from_node=node, scriptPubKey=spk_1, amount=1 * COIN)
 35  
 36          parent_key = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
 37          # send 1.0, mempool only
 38          # childkey 5 of `parent_key`
 39          wallet.send_to(from_node=node,
 40                         scriptPubKey=address_to_scriptpubkey("mkS4HXoTYWRTescLGaUTGbtTTYX5EjJyEE"),
 41                         amount=1 * COIN)
 42  
 43          # mine a block and assure that the mined blockhash is in the filterresult
 44          blockhash = self.generate(node, 1)[0]
 45          height = node.getblockheader(blockhash)['height']
 46          self.wait_until(lambda: all(i["synced"] for i in node.getindexinfo().values()))
 47  
 48          out = node.scanblocks("start", [f"addr({addr_1})"])
 49          assert blockhash in out['relevant_blocks']
 50          assert_equal(height, out['to_height'])
 51          assert_equal(0, out['from_height'])
 52          assert_equal(True, out['completed'])
 53  
 54          # mine another block
 55          blockhash_new = self.generate(node, 1)[0]
 56          height_new = node.getblockheader(blockhash_new)['height']
 57  
 58          # make sure the blockhash is not in the filter result if we set the start_height
 59          # to the just mined block (unlikely to hit a false positive)
 60          assert blockhash not in node.scanblocks(
 61              "start", [f"addr({addr_1})"], height_new)['relevant_blocks']
 62  
 63          # make sure the blockhash is present when using the first mined block as start_height
 64          assert blockhash in node.scanblocks(
 65              "start", [f"addr({addr_1})"], height)['relevant_blocks']
 66          for v in [False, True]:
 67              assert blockhash in node.scanblocks(
 68                  action="start",
 69                  scanobjects=[f"addr({addr_1})"],
 70                  start_height=height,
 71                  options={"filter_false_positives": v})['relevant_blocks']
 72  
 73          # also test the stop height
 74          assert blockhash in node.scanblocks(
 75              "start", [f"addr({addr_1})"], height, height)['relevant_blocks']
 76  
 77          # use the stop_height to exclude the relevant block
 78          assert blockhash not in node.scanblocks(
 79              "start", [f"addr({addr_1})"], 0, height - 1)['relevant_blocks']
 80  
 81          # make sure the blockhash is present when using the first mined block as start_height
 82          assert blockhash in node.scanblocks(
 83              "start", [{"desc": f"pkh({parent_key}/*)", "range": [0, 100]}], height)['relevant_blocks']
 84  
 85          # check that false-positives are included in the result now; note that
 86          # finding a false-positive at runtime would take too long, hence we simply
 87          # use a pre-calculated one that collides with the regtest genesis block's
 88          # coinbase output and verify that their BIP158 ranged hashes match
 89          genesis_blockhash = node.getblockhash(0)
 90          genesis_spks = bip158_relevant_scriptpubkeys(node, genesis_blockhash)
 91          assert_equal(len(genesis_spks), 1)
 92          genesis_coinbase_spk = list(genesis_spks)[0]
 93          false_positive_spk = bytes.fromhex("001400000000000000000000000000000000000cadcb")
 94  
 95          genesis_coinbase_hash = bip158_basic_element_hash(genesis_coinbase_spk, 1, genesis_blockhash)
 96          false_positive_hash = bip158_basic_element_hash(false_positive_spk, 1, genesis_blockhash)
 97          assert_equal(genesis_coinbase_hash, false_positive_hash)
 98  
 99          assert genesis_blockhash in node.scanblocks(
100              "start", [{"desc": f"raw({genesis_coinbase_spk.hex()})"}], 0, 0)['relevant_blocks']
101          assert genesis_blockhash in node.scanblocks(
102              "start", [{"desc": f"raw({false_positive_spk.hex()})"}], 0, 0)['relevant_blocks']
103  
104          # check that the filter_false_positives option works
105          assert genesis_blockhash in node.scanblocks(
106              "start", [{"desc": f"raw({genesis_coinbase_spk.hex()})"}], 0, 0, "basic", {"filter_false_positives": True})['relevant_blocks']
107          assert genesis_blockhash not in node.scanblocks(
108              "start", [{"desc": f"raw({false_positive_spk.hex()})"}], 0, 0, "basic", {"filter_false_positives": True})['relevant_blocks']
109  
110          # test node with disabled blockfilterindex
111          assert_raises_rpc_error(-1, "Index is not enabled for filtertype basic",
112                                  self.nodes[1].scanblocks, "start", [f"addr({addr_1})"])
113  
114          # test unknown filtertype
115          assert_raises_rpc_error(-5, "Unknown filtertype",
116                                  node.scanblocks, "start", [f"addr({addr_1})"], 0, 10, "extended")
117  
118          # test invalid start_height
119          assert_raises_rpc_error(-1, "Invalid start_height",
120                                  node.scanblocks, "start", [f"addr({addr_1})"], 100000000)
121  
122          # test invalid stop_height
123          assert_raises_rpc_error(-1, "Invalid stop_height",
124                                  node.scanblocks, "start", [f"addr({addr_1})"], 10, 0)
125          assert_raises_rpc_error(-1, "Invalid stop_height",
126                                  node.scanblocks, "start", [f"addr({addr_1})"], 10, 100000000)
127  
128          # test accessing the status (must be empty)
129          assert_equal(node.scanblocks("status"), None)
130  
131          # test aborting the current scan (there is no, must return false)
132          assert_equal(node.scanblocks("abort"), False)
133  
134          # test invalid command
135          assert_raises_rpc_error(-8, "Invalid action 'foobar'", node.scanblocks, "foobar")
136  
137  
138  if __name__ == '__main__':
139      ScanblocksTest(__file__).main()