wallet_descriptor.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2019-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test descriptor wallet function.""" 6 7 try: 8 import sqlite3 9 except ImportError: 10 pass 11 12 import re 13 14 from test_framework.blocktools import COINBASE_MATURITY 15 from test_framework.test_framework import BitcoinTestFramework 16 from test_framework.util import ( 17 assert_not_equal, 18 assert_equal, 19 assert_raises_rpc_error 20 ) 21 from test_framework.wallet_util import WalletUnlock 22 23 24 class WalletDescriptorTest(BitcoinTestFramework): 25 def set_test_params(self): 26 self.setup_clean_chain = True 27 self.num_nodes = 1 28 self.extra_args = [['-keypool=100']] 29 30 def skip_test_if_missing_module(self): 31 self.skip_if_no_wallet() 32 self.skip_if_no_py_sqlite3() 33 34 def test_parent_descriptors(self): 35 self.log.info("Check that parent_descs is the same for all RPCs and is normalized") 36 self.nodes[0].createwallet(wallet_name="parent_descs") 37 wallet = self.nodes[0].get_wallet_rpc("parent_descs") 38 default_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) 39 40 addr = wallet.getnewaddress() 41 parent_desc = wallet.getaddressinfo(addr)["parent_desc"] 42 43 # Verify that the parent descriptor is normalized 44 # First remove the checksum 45 desc_verify = parent_desc.split("#")[0] 46 # Next extract the xpub 47 desc_verify = re.sub(r"tpub\w+?(?=/)", "", desc_verify) 48 # Extract origin info 49 origin_match = re.search(r'\[([\da-fh/]+)\]', desc_verify) 50 origin_part = origin_match.group(1) if origin_match else "" 51 # Split on "]" for everything after the origin info 52 after_origin = desc_verify.split("]", maxsplit=1)[-1] 53 # Look for the hardened markers “h” inside each piece 54 # We don't need to check for aspostrophe as normalization will not output aspostrophe 55 found_hardened_in_origin = "h" in origin_part 56 found_hardened_after_origin = "h" in after_origin 57 assert_equal(found_hardened_in_origin, True) 58 assert_equal(found_hardened_after_origin, False) 59 60 # Send some coins so we can check listunspent, listtransactions, listunspent, and gettransaction 61 since_block = self.nodes[0].getbestblockhash() 62 txid = default_wallet.sendtoaddress(addr, 1) 63 self.generate(self.nodes[0], 1) 64 65 unspent = wallet.listunspent() 66 assert_equal(len(unspent), 1) 67 assert_equal(unspent[0]["parent_descs"], [parent_desc]) 68 69 txs = wallet.listtransactions() 70 assert_equal(len(txs), 1) 71 assert_equal(txs[0]["parent_descs"], [parent_desc]) 72 73 txs = wallet.listsinceblock(since_block)["transactions"] 74 assert_equal(len(txs), 1) 75 assert_equal(txs[0]["parent_descs"], [parent_desc]) 76 77 tx = wallet.gettransaction(txid=txid, verbose=True) 78 assert_equal(tx["details"][0]["parent_descs"], [parent_desc]) 79 80 wallet.unloadwallet() 81 82 def run_test(self): 83 self.generate(self.nodes[0], COINBASE_MATURITY + 1) 84 85 # Make a descriptor wallet 86 self.log.info("Making a descriptor wallet") 87 self.nodes[0].createwallet(wallet_name="desc1") 88 wallet = self.nodes[0].get_wallet_rpc("desc1") 89 90 # A descriptor wallet should have 100 addresses * 4 types = 400 keys 91 self.log.info("Checking wallet info") 92 wallet_info = wallet.getwalletinfo() 93 assert_equal(wallet_info['format'], 'sqlite') 94 assert_equal(wallet_info['keypoolsize'], 400) 95 assert_equal(wallet_info['keypoolsize_hd_internal'], 400) 96 assert 'keypoololdest' not in wallet_info 97 98 # Check that getnewaddress works 99 self.log.info("Test that getnewaddress and getrawchangeaddress work") 100 addr = wallet.getnewaddress("", "legacy") 101 addr_info = wallet.getaddressinfo(addr) 102 assert addr_info['desc'].startswith('pkh(') 103 assert_equal(addr_info['hdkeypath'], 'm/44h/1h/0h/0/0') 104 105 addr = wallet.getnewaddress("", "p2sh-segwit") 106 addr_info = wallet.getaddressinfo(addr) 107 assert addr_info['desc'].startswith('sh(wpkh(') 108 assert_equal(addr_info['hdkeypath'], 'm/49h/1h/0h/0/0') 109 110 addr = wallet.getnewaddress("", "bech32") 111 addr_info = wallet.getaddressinfo(addr) 112 assert addr_info['desc'].startswith('wpkh(') 113 assert_equal(addr_info['hdkeypath'], 'm/84h/1h/0h/0/0') 114 115 addr = wallet.getnewaddress("", "bech32m") 116 addr_info = wallet.getaddressinfo(addr) 117 assert addr_info['desc'].startswith('tr(') 118 assert_equal(addr_info['hdkeypath'], 'm/86h/1h/0h/0/0') 119 120 # Check that getrawchangeaddress works 121 addr = wallet.getrawchangeaddress("legacy") 122 addr_info = wallet.getaddressinfo(addr) 123 assert addr_info['desc'].startswith('pkh(') 124 assert_equal(addr_info['hdkeypath'], 'm/44h/1h/0h/1/0') 125 126 addr = wallet.getrawchangeaddress("p2sh-segwit") 127 addr_info = wallet.getaddressinfo(addr) 128 assert addr_info['desc'].startswith('sh(wpkh(') 129 assert_equal(addr_info['hdkeypath'], 'm/49h/1h/0h/1/0') 130 131 addr = wallet.getrawchangeaddress("bech32") 132 addr_info = wallet.getaddressinfo(addr) 133 assert addr_info['desc'].startswith('wpkh(') 134 assert_equal(addr_info['hdkeypath'], 'm/84h/1h/0h/1/0') 135 136 addr = wallet.getrawchangeaddress("bech32m") 137 addr_info = wallet.getaddressinfo(addr) 138 assert addr_info['desc'].startswith('tr(') 139 assert_equal(addr_info['hdkeypath'], 'm/86h/1h/0h/1/0') 140 141 # Make a wallet to receive coins at 142 self.nodes[0].createwallet(wallet_name="desc2") 143 recv_wrpc = self.nodes[0].get_wallet_rpc("desc2") 144 send_wrpc = self.nodes[0].get_wallet_rpc("desc1") 145 146 # Generate some coins 147 self.generatetoaddress(self.nodes[0], COINBASE_MATURITY + 1, send_wrpc.getnewaddress()) 148 149 # Make transactions 150 self.log.info("Test sending and receiving") 151 addr = recv_wrpc.getnewaddress() 152 send_wrpc.sendtoaddress(addr, 10) 153 154 self.log.info("Test encryption") 155 # Get the master fingerprint before encrypt 156 info1 = send_wrpc.getaddressinfo(send_wrpc.getnewaddress()) 157 158 # Encrypt wallet 0 159 send_wrpc.encryptwallet('pass') 160 with WalletUnlock(send_wrpc, "pass"): 161 addr = send_wrpc.getnewaddress() 162 info2 = send_wrpc.getaddressinfo(addr) 163 assert_not_equal(info1['hdmasterfingerprint'], info2['hdmasterfingerprint']) 164 assert 'hdmasterfingerprint' in send_wrpc.getaddressinfo(send_wrpc.getnewaddress()) 165 info3 = send_wrpc.getaddressinfo(addr) 166 assert_equal(info2['desc'], info3['desc']) 167 168 self.log.info("Test that getnewaddress still works after keypool is exhausted in an encrypted wallet") 169 for _ in range(500): 170 send_wrpc.getnewaddress() 171 172 self.log.info("Test that unlock is needed when deriving only hardened keys in an encrypted wallet") 173 with WalletUnlock(send_wrpc, "pass"): 174 send_wrpc.importdescriptors([{ 175 "desc": "wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h)#y4dfsj7n", 176 "timestamp": "now", 177 "range": [0,10], 178 "active": True 179 }]) 180 # Exhaust keypool of 100 181 for _ in range(100): 182 send_wrpc.getnewaddress(address_type='bech32') 183 # This should now error 184 assert_raises_rpc_error(-12, "Keypool ran out, please call keypoolrefill first", send_wrpc.getnewaddress, '', 'bech32') 185 186 self.log.info("Test born encrypted wallets") 187 self.nodes[0].createwallet('desc_enc', False, False, 'pass', False, True) 188 enc_rpc = self.nodes[0].get_wallet_rpc('desc_enc') 189 enc_rpc.getnewaddress() # Makes sure that we can get a new address from a born encrypted wallet 190 191 self.log.info("Test blank descriptor wallets") 192 self.nodes[0].createwallet(wallet_name='desc_blank', blank=True) 193 blank_rpc = self.nodes[0].get_wallet_rpc('desc_blank') 194 assert_raises_rpc_error(-4, 'This wallet has no available keys', blank_rpc.getnewaddress) 195 196 self.log.info("Test descriptor wallet with disabled private keys") 197 self.nodes[0].createwallet(wallet_name='desc_no_priv', disable_private_keys=True) 198 nopriv_rpc = self.nodes[0].get_wallet_rpc('desc_no_priv') 199 assert_raises_rpc_error(-4, 'This wallet has no available keys', nopriv_rpc.getnewaddress) 200 201 self.log.info("Test descriptor exports") 202 self.nodes[0].createwallet(wallet_name='desc_export') 203 exp_rpc = self.nodes[0].get_wallet_rpc('desc_export') 204 self.nodes[0].createwallet(wallet_name='desc_import', disable_private_keys=True) 205 imp_rpc = self.nodes[0].get_wallet_rpc('desc_import') 206 207 addr_types = [('legacy', False, 'pkh(', '44h/1h/0h', -13), 208 ('p2sh-segwit', False, 'sh(wpkh(', '49h/1h/0h', -14), 209 ('bech32', False, 'wpkh(', '84h/1h/0h', -13), 210 ('bech32m', False, 'tr(', '86h/1h/0h', -13), 211 ('legacy', True, 'pkh(', '44h/1h/0h', -13), 212 ('p2sh-segwit', True, 'sh(wpkh(', '49h/1h/0h', -14), 213 ('bech32', True, 'wpkh(', '84h/1h/0h', -13), 214 ('bech32m', True, 'tr(', '86h/1h/0h', -13)] 215 216 for addr_type, internal, desc_prefix, deriv_path, int_idx in addr_types: 217 int_str = 'internal' if internal else 'external' 218 219 self.log.info("Testing descriptor address type for {} {}".format(addr_type, int_str)) 220 if internal: 221 addr = exp_rpc.getrawchangeaddress(address_type=addr_type) 222 else: 223 addr = exp_rpc.getnewaddress(address_type=addr_type) 224 desc = exp_rpc.getaddressinfo(addr)['parent_desc'] 225 assert_equal(desc_prefix, desc[0:len(desc_prefix)]) 226 idx = desc.index('/') + 1 227 assert_equal(deriv_path, desc[idx:idx + 9]) 228 if internal: 229 assert_equal('1', desc[int_idx]) 230 else: 231 assert_equal('0', desc[int_idx]) 232 233 self.log.info("Testing the same descriptor is returned for address type {} {}".format(addr_type, int_str)) 234 for i in range(0, 10): 235 if internal: 236 addr = exp_rpc.getrawchangeaddress(address_type=addr_type) 237 else: 238 addr = exp_rpc.getnewaddress(address_type=addr_type) 239 test_desc = exp_rpc.getaddressinfo(addr)['parent_desc'] 240 assert_equal(desc, test_desc) 241 242 self.log.info("Testing import of exported {} descriptor".format(addr_type)) 243 imp_rpc.importdescriptors([{ 244 'desc': desc, 245 'active': True, 246 'next_index': 11, 247 'timestamp': 'now', 248 'internal': internal 249 }]) 250 251 for i in range(0, 10): 252 if internal: 253 exp_addr = exp_rpc.getrawchangeaddress(address_type=addr_type) 254 imp_addr = imp_rpc.getrawchangeaddress(address_type=addr_type) 255 else: 256 exp_addr = exp_rpc.getnewaddress(address_type=addr_type) 257 imp_addr = imp_rpc.getnewaddress(address_type=addr_type) 258 assert_equal(exp_addr, imp_addr) 259 260 self.log.info("Test that loading descriptor wallet containing legacy key types throws error") 261 self.nodes[0].createwallet(wallet_name="crashme") 262 self.nodes[0].unloadwallet("crashme") 263 wallet_db = self.nodes[0].wallets_path / "crashme" / self.wallet_data_filename 264 conn = sqlite3.connect(wallet_db) 265 with conn: 266 # add "cscript" entry: key type is uint160 (20 bytes), value type is CScript (zero-length here) 267 conn.execute('INSERT INTO main VALUES(?, ?)', (b'\x07cscript' + b'\x00'*20, b'\x00')) 268 conn.close() 269 assert_raises_rpc_error(-4, "Unexpected legacy entry in descriptor wallet found.", self.nodes[0].loadwallet, "crashme") 270 271 self.test_parent_descriptors() 272 273 if __name__ == '__main__': 274 WalletDescriptorTest(__file__).main()