wallet_dump.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2016-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test the dumpwallet RPC.""" 6 import datetime 7 import time 8 9 from test_framework.test_framework import BitcoinTestFramework 10 from test_framework.util import ( 11 assert_equal, 12 assert_raises_rpc_error, 13 ) 14 from test_framework.wallet_util import WalletUnlock 15 16 17 def read_dump(file_name, addrs, script_addrs, hd_master_addr_old): 18 """ 19 Read the given dump, count the addrs that match, count change and reserve. 20 Also check that the old hd_master is inactive 21 """ 22 with open(file_name, encoding='utf8') as inputfile: 23 found_comments = [] 24 found_legacy_addr = 0 25 found_p2sh_segwit_addr = 0 26 found_bech32_addr = 0 27 found_script_addr = 0 28 found_addr_chg = 0 29 found_addr_rsv = 0 30 hd_master_addr_ret = None 31 for line in inputfile: 32 line = line.strip() 33 if not line: 34 continue 35 if line[0] == '#': 36 found_comments.append(line) 37 else: 38 # split out some data 39 key_date_label, comment = line.split("#") 40 key_date_label = key_date_label.split(" ") 41 # key = key_date_label[0] 42 date = key_date_label[1] 43 keytype = key_date_label[2] 44 45 imported_key = date == '1970-01-01T00:00:01Z' 46 if imported_key: 47 # Imported keys have multiple addresses, no label (keypath) and timestamp 48 # Skip them 49 continue 50 51 addr_keypath = comment.split(" addr=")[1] 52 addr = addr_keypath.split(" ")[0] 53 keypath = None 54 if keytype == "inactivehdseed=1": 55 # ensure the old master is still available 56 assert hd_master_addr_old == addr 57 elif keytype == "hdseed=1": 58 # ensure we have generated a new hd master key 59 assert hd_master_addr_old != addr 60 hd_master_addr_ret = addr 61 elif keytype == "script=1": 62 # scripts don't have keypaths 63 keypath = None 64 else: 65 keypath = addr_keypath.rstrip().split("hdkeypath=")[1] 66 67 # count key types 68 for addrObj in addrs: 69 if addrObj['address'] == addr.split(",")[0] and addrObj['hdkeypath'] == keypath and keytype == "label=": 70 if addr.startswith('m') or addr.startswith('n'): 71 # P2PKH address 72 found_legacy_addr += 1 73 elif addr.startswith('2'): 74 # P2SH-segwit address 75 found_p2sh_segwit_addr += 1 76 elif addr.startswith('bcrt1'): 77 found_bech32_addr += 1 78 break 79 elif keytype == "change=1": 80 found_addr_chg += 1 81 break 82 elif keytype == "reserve=1": 83 found_addr_rsv += 1 84 break 85 86 # count scripts 87 for script_addr in script_addrs: 88 if script_addr == addr.rstrip() and keytype == "script=1": 89 found_script_addr += 1 90 break 91 92 return found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, hd_master_addr_ret 93 94 95 class WalletDumpTest(BitcoinTestFramework): 96 def add_options(self, parser): 97 self.add_wallet_options(parser, descriptors=False) 98 99 def set_test_params(self): 100 self.num_nodes = 1 101 self.extra_args = [["-keypool=90", "-addresstype=legacy"]] 102 self.rpc_timeout = 120 103 104 def skip_test_if_missing_module(self): 105 self.skip_if_no_wallet() 106 107 def setup_network(self): 108 self.add_nodes(self.num_nodes, extra_args=self.extra_args) 109 self.start_nodes() 110 111 def run_test(self): 112 self.nodes[0].createwallet("dump") 113 114 wallet_unenc_dump = self.nodes[0].datadir_path / "wallet.unencrypted.dump" 115 wallet_enc_dump = self.nodes[0].datadir_path / "wallet.encrypted.dump" 116 117 # generate 30 addresses to compare against the dump 118 # - 10 legacy P2PKH 119 # - 10 P2SH-segwit 120 # - 10 bech32 121 test_addr_count = 10 122 addrs = [] 123 for address_type in ['legacy', 'p2sh-segwit', 'bech32']: 124 for _ in range(test_addr_count): 125 addr = self.nodes[0].getnewaddress(address_type=address_type) 126 vaddr = self.nodes[0].getaddressinfo(addr) # required to get hd keypath 127 addrs.append(vaddr) 128 129 # Test scripts dump by adding a 1-of-1 multisig address 130 multisig_addr = self.nodes[0].addmultisigaddress(1, [addrs[1]["address"]])["address"] 131 132 # Refill the keypool. getnewaddress() refills the keypool *before* taking a key from 133 # the keypool, so the final call to getnewaddress leaves the keypool with one key below 134 # its capacity 135 self.nodes[0].keypoolrefill() 136 137 self.log.info('Mine a block one second before the wallet is dumped') 138 dump_time = int(time.time()) 139 self.nodes[0].setmocktime(dump_time - 1) 140 self.generate(self.nodes[0], 1) 141 self.nodes[0].setmocktime(dump_time) 142 dump_time_str = '# * Created on {}Z'.format( 143 datetime.datetime.fromtimestamp( 144 dump_time, 145 tz=datetime.timezone.utc, 146 ).replace(tzinfo=None).isoformat()) 147 dump_best_block_1 = '# * Best block at time of backup was {} ({}),'.format( 148 self.nodes[0].getblockcount(), 149 self.nodes[0].getbestblockhash(), 150 ) 151 dump_best_block_2 = '# mined on {}Z'.format( 152 datetime.datetime.fromtimestamp( 153 dump_time - 1, 154 tz=datetime.timezone.utc, 155 ).replace(tzinfo=None).isoformat()) 156 157 self.log.info('Dump unencrypted wallet') 158 result = self.nodes[0].dumpwallet(wallet_unenc_dump) 159 assert_equal(result['filename'], str(wallet_unenc_dump)) 160 161 found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, hd_master_addr_unenc = \ 162 read_dump(wallet_unenc_dump, addrs, [multisig_addr], None) 163 assert '# End of dump' in found_comments # Check that file is not corrupt 164 assert_equal(dump_time_str, next(c for c in found_comments if c.startswith('# * Created on'))) 165 assert_equal(dump_best_block_1, next(c for c in found_comments if c.startswith('# * Best block'))) 166 assert_equal(dump_best_block_2, next(c for c in found_comments if c.startswith('# mined on'))) 167 assert_equal(found_legacy_addr, test_addr_count) # all keys must be in the dump 168 assert_equal(found_p2sh_segwit_addr, test_addr_count) # all keys must be in the dump 169 assert_equal(found_bech32_addr, test_addr_count) # all keys must be in the dump 170 assert_equal(found_script_addr, 1) # all scripts must be in the dump 171 assert_equal(found_addr_chg, 0) # 0 blocks where mined 172 assert_equal(found_addr_rsv, 90 * 2) # 90 keys plus 100% internal keys 173 174 # encrypt wallet, restart, unlock and dump 175 self.nodes[0].encryptwallet('test') 176 with WalletUnlock(self.nodes[0], "test"): 177 # Should be a no-op: 178 self.nodes[0].keypoolrefill() 179 self.nodes[0].dumpwallet(wallet_enc_dump) 180 181 found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, _ = \ 182 read_dump(wallet_enc_dump, addrs, [multisig_addr], hd_master_addr_unenc) 183 assert '# End of dump' in found_comments # Check that file is not corrupt 184 assert_equal(dump_time_str, next(c for c in found_comments if c.startswith('# * Created on'))) 185 assert_equal(dump_best_block_1, next(c for c in found_comments if c.startswith('# * Best block'))) 186 assert_equal(dump_best_block_2, next(c for c in found_comments if c.startswith('# mined on'))) 187 assert_equal(found_legacy_addr, test_addr_count) # all keys must be in the dump 188 assert_equal(found_p2sh_segwit_addr, test_addr_count) # all keys must be in the dump 189 assert_equal(found_bech32_addr, test_addr_count) # all keys must be in the dump 190 assert_equal(found_script_addr, 1) 191 assert_equal(found_addr_chg, 90 * 2) # old reserve keys are marked as change now 192 assert_equal(found_addr_rsv, 90 * 2) 193 194 # Overwriting should fail 195 assert_raises_rpc_error(-8, "already exists", lambda: self.nodes[0].dumpwallet(wallet_enc_dump)) 196 197 # Restart node with new wallet, and test importwallet 198 self.restart_node(0) 199 self.nodes[0].createwallet("w2") 200 201 # Make sure the address is not IsMine before import 202 result = self.nodes[0].getaddressinfo(multisig_addr) 203 assert not result['ismine'] 204 205 self.nodes[0].importwallet(wallet_unenc_dump) 206 207 # Now check IsMine is true 208 result = self.nodes[0].getaddressinfo(multisig_addr) 209 assert result['ismine'] 210 211 self.log.info('Check that wallet is flushed') 212 with self.nodes[0].assert_debug_log(['Flushing wallet.dat'], timeout=20): 213 self.nodes[0].getnewaddress() 214 215 # Make sure that dumpwallet doesn't have a lock order issue when there is an unconfirmed tx and it is reloaded 216 # See https://github.com/bitcoin/bitcoin/issues/22489 217 self.nodes[0].createwallet("w3") 218 w3 = self.nodes[0].get_wallet_rpc("w3") 219 w3.importprivkey(privkey=self.nodes[0].get_deterministic_priv_key().key, label="coinbase_import") 220 w3.sendtoaddress(w3.getnewaddress(), 10) 221 w3.unloadwallet() 222 self.nodes[0].loadwallet("w3") 223 w3.dumpwallet(self.nodes[0].datadir_path / "w3.dump") 224 225 if __name__ == '__main__': 226 WalletDumpTest().main()