wallet_hd.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2016-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test Hierarchical Deterministic wallet function.""" 6 7 import shutil 8 9 from test_framework.blocktools import COINBASE_MATURITY 10 from test_framework.test_framework import BitcoinTestFramework 11 from test_framework.util import ( 12 assert_equal, 13 assert_raises_rpc_error, 14 ) 15 16 17 class WalletHDTest(BitcoinTestFramework): 18 def add_options(self, parser): 19 self.add_wallet_options(parser) 20 21 def set_test_params(self): 22 self.setup_clean_chain = True 23 self.num_nodes = 2 24 self.extra_args = [[], ['-keypool=0']] 25 # whitelist peers to speed up tx relay / mempool sync 26 self.noban_tx_relay = True 27 28 self.supports_cli = False 29 30 def skip_test_if_missing_module(self): 31 self.skip_if_no_wallet() 32 33 def run_test(self): 34 # Make sure we use hd, keep masterkeyid 35 hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint'] 36 assert_equal(len(hd_fingerprint), 8) 37 38 # create an internal key 39 change_addr = self.nodes[1].getrawchangeaddress() 40 change_addrV = self.nodes[1].getaddressinfo(change_addr) 41 if self.options.descriptors: 42 assert_equal(change_addrV["hdkeypath"], "m/84h/1h/0h/1/0") 43 else: 44 assert_equal(change_addrV["hdkeypath"], "m/0'/1'/0'") #first internal child key 45 46 # Import a non-HD private key in the HD wallet 47 non_hd_add = 'bcrt1qmevj8zfx0wdvp05cqwkmr6mxkfx60yezwjksmt' 48 non_hd_key = 'cS9umN9w6cDMuRVYdbkfE4c7YUFLJRoXMfhQ569uY4odiQbVN8Rt' 49 self.nodes[1].importprivkey(non_hd_key) 50 51 # This should be enough to keep the master key and the non-HD key 52 self.nodes[1].backupwallet(self.nodes[1].datadir_path / "hd.bak") 53 #self.nodes[1].dumpwallet(self.nodes[1].datadir_path / "hd.dump") 54 55 # Derive some HD addresses and remember the last 56 # Also send funds to each add 57 self.generate(self.nodes[0], COINBASE_MATURITY + 1) 58 hd_add = None 59 NUM_HD_ADDS = 10 60 for i in range(1, NUM_HD_ADDS + 1): 61 hd_add = self.nodes[1].getnewaddress() 62 hd_info = self.nodes[1].getaddressinfo(hd_add) 63 if self.options.descriptors: 64 assert_equal(hd_info["hdkeypath"], "m/84h/1h/0h/0/" + str(i)) 65 else: 66 assert_equal(hd_info["hdkeypath"], "m/0'/0'/" + str(i) + "'") 67 assert_equal(hd_info["hdmasterfingerprint"], hd_fingerprint) 68 self.nodes[0].sendtoaddress(hd_add, 1) 69 self.generate(self.nodes[0], 1) 70 self.nodes[0].sendtoaddress(non_hd_add, 1) 71 self.generate(self.nodes[0], 1) 72 73 # create an internal key (again) 74 change_addr = self.nodes[1].getrawchangeaddress() 75 change_addrV = self.nodes[1].getaddressinfo(change_addr) 76 if self.options.descriptors: 77 assert_equal(change_addrV["hdkeypath"], "m/84h/1h/0h/1/1") 78 else: 79 assert_equal(change_addrV["hdkeypath"], "m/0'/1'/1'") #second internal child key 80 81 self.sync_all() 82 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 83 84 self.log.info("Restore backup ...") 85 self.stop_node(1) 86 # we need to delete the complete chain directory 87 # otherwise node1 would auto-recover all funds in flag the keypool keys as used 88 shutil.rmtree(self.nodes[1].blocks_path) 89 shutil.rmtree(self.nodes[1].chain_path / "chainstate") 90 shutil.copyfile( 91 self.nodes[1].datadir_path / "hd.bak", 92 self.nodes[1].wallets_path / self.default_wallet_name / self.wallet_data_filename 93 ) 94 self.start_node(1) 95 96 # Assert that derivation is deterministic 97 hd_add_2 = None 98 for i in range(1, NUM_HD_ADDS + 1): 99 hd_add_2 = self.nodes[1].getnewaddress() 100 hd_info_2 = self.nodes[1].getaddressinfo(hd_add_2) 101 if self.options.descriptors: 102 assert_equal(hd_info_2["hdkeypath"], "m/84h/1h/0h/0/" + str(i)) 103 else: 104 assert_equal(hd_info_2["hdkeypath"], "m/0'/0'/" + str(i) + "'") 105 assert_equal(hd_info_2["hdmasterfingerprint"], hd_fingerprint) 106 assert_equal(hd_add, hd_add_2) 107 self.connect_nodes(0, 1) 108 self.sync_all() 109 110 # Needs rescan 111 self.nodes[1].rescanblockchain() 112 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 113 114 # Try a RPC based rescan 115 self.stop_node(1) 116 shutil.rmtree(self.nodes[1].blocks_path) 117 shutil.rmtree(self.nodes[1].chain_path / "chainstate") 118 shutil.copyfile( 119 self.nodes[1].datadir_path / "hd.bak", 120 self.nodes[1].wallets_path / self.default_wallet_name / self.wallet_data_filename 121 ) 122 self.start_node(1, extra_args=self.extra_args[1]) 123 self.connect_nodes(0, 1) 124 self.sync_all() 125 # Wallet automatically scans blocks older than key on startup 126 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 127 out = self.nodes[1].rescanblockchain(0, 1) 128 assert_equal(out['start_height'], 0) 129 assert_equal(out['stop_height'], 1) 130 out = self.nodes[1].rescanblockchain() 131 assert_equal(out['start_height'], 0) 132 assert_equal(out['stop_height'], self.nodes[1].getblockcount()) 133 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 134 135 # send a tx and make sure its using the internal chain for the changeoutput 136 txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1) 137 outs = self.nodes[1].gettransaction(txid=txid, verbose=True)['decoded']['vout'] 138 keypath = "" 139 for out in outs: 140 if out['value'] != 1: 141 keypath = self.nodes[1].getaddressinfo(out['scriptPubKey']['address'])['hdkeypath'] 142 143 if self.options.descriptors: 144 assert_equal(keypath[0:14], "m/84h/1h/0h/1/") 145 else: 146 assert_equal(keypath[0:7], "m/0'/1'") 147 148 if not self.options.descriptors: 149 # Generate a new HD seed on node 1 and make sure it is set 150 orig_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid'] 151 self.nodes[1].sethdseed() 152 new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid'] 153 assert orig_masterkeyid != new_masterkeyid 154 addr = self.nodes[1].getnewaddress() 155 # Make sure the new address is the first from the keypool 156 assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/0\'') 157 self.nodes[1].keypoolrefill(1) # Fill keypool with 1 key 158 159 # Set a new HD seed on node 1 without flushing the keypool 160 new_seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress()) 161 orig_masterkeyid = new_masterkeyid 162 self.nodes[1].sethdseed(False, new_seed) 163 new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid'] 164 assert orig_masterkeyid != new_masterkeyid 165 addr = self.nodes[1].getnewaddress() 166 assert_equal(orig_masterkeyid, self.nodes[1].getaddressinfo(addr)['hdseedid']) 167 # Make sure the new address continues previous keypool 168 assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/1\'') 169 170 # Check that the next address is from the new seed 171 self.nodes[1].keypoolrefill(1) 172 next_addr = self.nodes[1].getnewaddress() 173 assert_equal(new_masterkeyid, self.nodes[1].getaddressinfo(next_addr)['hdseedid']) 174 # Make sure the new address is not from previous keypool 175 assert_equal(self.nodes[1].getaddressinfo(next_addr)['hdkeypath'], 'm/0\'/0\'/0\'') 176 assert next_addr != addr 177 178 # Sethdseed parameter validity 179 assert_raises_rpc_error(-1, 'sethdseed', self.nodes[0].sethdseed, False, new_seed, 0) 180 assert_raises_rpc_error(-5, "Invalid private key", self.nodes[1].sethdseed, False, "not_wif") 181 assert_raises_rpc_error(-3, "JSON value of type string is not of expected type bool", self.nodes[1].sethdseed, "Not_bool") 182 assert_raises_rpc_error(-3, "JSON value of type bool is not of expected type string", self.nodes[1].sethdseed, False, True) 183 assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, new_seed) 184 assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, self.nodes[1].dumpprivkey(self.nodes[1].getnewaddress())) 185 186 self.log.info('Test sethdseed restoring with keys outside of the initial keypool') 187 self.generate(self.nodes[0], 10) 188 # Restart node 1 with keypool of 3 and a different wallet 189 self.nodes[1].createwallet(wallet_name='origin', blank=True) 190 self.restart_node(1, extra_args=['-keypool=3', '-wallet=origin']) 191 self.connect_nodes(0, 1) 192 193 # sethdseed restoring and seeing txs to addresses out of the keypool 194 origin_rpc = self.nodes[1].get_wallet_rpc('origin') 195 seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress()) 196 origin_rpc.sethdseed(True, seed) 197 198 self.nodes[1].createwallet(wallet_name='restore', blank=True) 199 restore_rpc = self.nodes[1].get_wallet_rpc('restore') 200 restore_rpc.sethdseed(True, seed) # Set to be the same seed as origin_rpc 201 restore_rpc.sethdseed(True) # Rotate to a new seed, making original `seed` inactive 202 203 self.nodes[1].createwallet(wallet_name='restore2', blank=True) 204 restore2_rpc = self.nodes[1].get_wallet_rpc('restore2') 205 restore2_rpc.sethdseed(True, seed) # Set to be the same seed as origin_rpc 206 restore2_rpc.sethdseed(True) # Rotate to a new seed, making original `seed` inactive 207 208 # Check persistence of inactive seed by reloading restore. restore2 is still loaded to test the case where the wallet is not reloaded 209 restore_rpc.unloadwallet() 210 self.nodes[1].loadwallet('restore') 211 restore_rpc = self.nodes[1].get_wallet_rpc('restore') 212 213 # Empty origin keypool and get an address that is beyond the initial keypool 214 origin_rpc.getnewaddress() 215 origin_rpc.getnewaddress() 216 last_addr = origin_rpc.getnewaddress() # Last address of initial keypool 217 addr = origin_rpc.getnewaddress() # First address beyond initial keypool 218 219 # Check that the restored seed has last_addr but does not have addr 220 info = restore_rpc.getaddressinfo(last_addr) 221 assert_equal(info['ismine'], True) 222 info = restore_rpc.getaddressinfo(addr) 223 assert_equal(info['ismine'], False) 224 info = restore2_rpc.getaddressinfo(last_addr) 225 assert_equal(info['ismine'], True) 226 info = restore2_rpc.getaddressinfo(addr) 227 assert_equal(info['ismine'], False) 228 # Check that the origin seed has addr 229 info = origin_rpc.getaddressinfo(addr) 230 assert_equal(info['ismine'], True) 231 232 # Send a transaction to addr, which is out of the initial keypool. 233 # The wallet that has set a new seed (restore_rpc) should not detect this transaction. 234 txid = self.nodes[0].sendtoaddress(addr, 1) 235 origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex']) 236 self.generate(self.nodes[0], 1) 237 origin_rpc.gettransaction(txid) 238 assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, txid) 239 out_of_kp_txid = txid 240 241 # Send a transaction to last_addr, which is in the initial keypool. 242 # The wallet that has set a new seed (restore_rpc) should detect this transaction and generate 3 new keys from the initial seed. 243 # The previous transaction (out_of_kp_txid) should still not be detected as a rescan is required. 244 txid = self.nodes[0].sendtoaddress(last_addr, 1) 245 origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex']) 246 self.generate(self.nodes[0], 1) 247 origin_rpc.gettransaction(txid) 248 restore_rpc.gettransaction(txid) 249 assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, out_of_kp_txid) 250 restore2_rpc.gettransaction(txid) 251 assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore2_rpc.gettransaction, out_of_kp_txid) 252 253 # After rescanning, restore_rpc should now see out_of_kp_txid and generate an additional key. 254 # addr should now be part of restore_rpc and be ismine 255 restore_rpc.rescanblockchain() 256 restore_rpc.gettransaction(out_of_kp_txid) 257 info = restore_rpc.getaddressinfo(addr) 258 assert_equal(info['ismine'], True) 259 restore2_rpc.rescanblockchain() 260 restore2_rpc.gettransaction(out_of_kp_txid) 261 info = restore2_rpc.getaddressinfo(addr) 262 assert_equal(info['ismine'], True) 263 264 # Check again that 3 keys were derived. 265 # Empty keypool and get an address that is beyond the initial keypool 266 origin_rpc.getnewaddress() 267 origin_rpc.getnewaddress() 268 last_addr = origin_rpc.getnewaddress() 269 addr = origin_rpc.getnewaddress() 270 271 # Check that the restored seed has last_addr but does not have addr 272 info = restore_rpc.getaddressinfo(last_addr) 273 assert_equal(info['ismine'], True) 274 info = restore_rpc.getaddressinfo(addr) 275 assert_equal(info['ismine'], False) 276 info = restore2_rpc.getaddressinfo(last_addr) 277 assert_equal(info['ismine'], True) 278 info = restore2_rpc.getaddressinfo(addr) 279 assert_equal(info['ismine'], False) 280 281 282 if __name__ == '__main__': 283 WalletHDTest().main()