wallet_keypool_topup.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test HD Wallet keypool restore function. 6 7 Two nodes. Node1 is under test. Node0 is providing transactions and generating blocks. 8 9 - Start node1, shutdown and backup wallet. 10 - Generate 110 keys (enough to drain the keypool). Store key 90 (in the initial keypool) and key 110 (beyond the initial keypool). Send funds to key 90 and key 110. 11 - Stop node1, clear the datadir, move wallet file back into the datadir and restart node1. 12 - connect node1 to node0. Verify that they sync and node1 receives its funds.""" 13 import shutil 14 15 from test_framework.blocktools import COINBASE_MATURITY 16 from test_framework.test_framework import BitcoinTestFramework 17 from test_framework.util import ( 18 assert_equal, 19 ) 20 21 22 class KeypoolRestoreTest(BitcoinTestFramework): 23 def add_options(self, parser): 24 self.add_wallet_options(parser) 25 26 def set_test_params(self): 27 self.setup_clean_chain = True 28 self.num_nodes = 5 29 self.extra_args = [[]] 30 for _ in range(self.num_nodes - 1): 31 self.extra_args.append(['-keypool=100']) 32 33 def skip_test_if_missing_module(self): 34 self.skip_if_no_wallet() 35 36 def run_test(self): 37 wallet_path = self.nodes[1].wallets_path / self.default_wallet_name / self.wallet_data_filename 38 wallet_backup_path = self.nodes[1].datadir_path / "wallet.bak" 39 self.generate(self.nodes[0], COINBASE_MATURITY + 1) 40 41 self.log.info("Make backup of wallet") 42 self.stop_node(1) 43 shutil.copyfile(wallet_path, wallet_backup_path) 44 self.start_node(1, self.extra_args[1]) 45 for i in [1, 2, 3, 4]: 46 self.connect_nodes(0, i) 47 48 output_types = ["legacy", "p2sh-segwit", "bech32"] 49 if self.options.descriptors: 50 output_types.append("bech32m") 51 for i, output_type in enumerate(output_types): 52 self.log.info("Generate keys for wallet with address type: {}".format(output_type)) 53 idx = i+1 54 for _ in range(90): 55 addr_oldpool = self.nodes[idx].getnewaddress(address_type=output_type) 56 for _ in range(20): 57 addr_extpool = self.nodes[idx].getnewaddress(address_type=output_type) 58 59 # Make sure we're creating the outputs we expect 60 address_details = self.nodes[idx].validateaddress(addr_extpool) 61 if i == 0: 62 assert not address_details["isscript"] and not address_details["iswitness"] 63 elif i == 1: 64 assert address_details["isscript"] and not address_details["iswitness"] 65 elif i == 2: 66 assert not address_details["isscript"] and address_details["iswitness"] 67 elif i == 3: 68 assert address_details["isscript"] and address_details["iswitness"] 69 70 self.log.info("Send funds to wallet") 71 self.nodes[0].sendtoaddress(addr_oldpool, 10) 72 self.generate(self.nodes[0], 1) 73 self.nodes[0].sendtoaddress(addr_extpool, 5) 74 self.generate(self.nodes[0], 1) 75 76 self.log.info("Restart node with wallet backup") 77 self.stop_node(idx) 78 shutil.copyfile(wallet_backup_path, wallet_path) 79 self.start_node(idx, self.extra_args[idx]) 80 self.connect_nodes(0, idx) 81 self.sync_all() 82 83 self.log.info("Verify keypool is restored and balance is correct") 84 assert_equal(self.nodes[idx].getbalance(), 15) 85 assert_equal(self.nodes[idx].listtransactions()[0]['category'], "receive") 86 # Check that we have marked all keys up to the used keypool key as used 87 if self.options.descriptors: 88 if output_type == 'legacy': 89 assert_equal(self.nodes[idx].getaddressinfo(self.nodes[idx].getnewaddress(address_type=output_type))['hdkeypath'], "m/44h/1h/0h/0/110") 90 elif output_type == 'p2sh-segwit': 91 assert_equal(self.nodes[idx].getaddressinfo(self.nodes[idx].getnewaddress(address_type=output_type))['hdkeypath'], "m/49h/1h/0h/0/110") 92 elif output_type == 'bech32': 93 assert_equal(self.nodes[idx].getaddressinfo(self.nodes[idx].getnewaddress(address_type=output_type))['hdkeypath'], "m/84h/1h/0h/0/110") 94 elif output_type == 'bech32m': 95 assert_equal(self.nodes[idx].getaddressinfo(self.nodes[idx].getnewaddress(address_type=output_type))['hdkeypath'], "m/86h/1h/0h/0/110") 96 else: 97 assert_equal(self.nodes[idx].getaddressinfo(self.nodes[idx].getnewaddress(address_type=output_type))['hdkeypath'], "m/0'/0'/110'") 98 99 100 if __name__ == '__main__': 101 KeypoolRestoreTest().main()