wallet_descriptor.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2019-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test descriptor wallet function.""" 6 7 try: 8 import sqlite3 9 except ImportError: 10 pass 11 12 import concurrent.futures 13 14 from test_framework.blocktools import COINBASE_MATURITY 15 from test_framework.descriptors import descsum_create 16 from test_framework.test_framework import BitcoinTestFramework 17 from test_framework.util import ( 18 assert_equal, 19 assert_raises_rpc_error 20 ) 21 from test_framework.wallet_util import WalletUnlock 22 23 24 class WalletDescriptorTest(BitcoinTestFramework): 25 def add_options(self, parser): 26 self.add_wallet_options(parser, legacy=False) 27 28 def set_test_params(self): 29 self.setup_clean_chain = True 30 self.num_nodes = 1 31 self.extra_args = [['-keypool=100']] 32 self.wallet_names = [] 33 34 def skip_test_if_missing_module(self): 35 self.skip_if_no_wallet() 36 self.skip_if_no_sqlite() 37 self.skip_if_no_py_sqlite3() 38 39 def test_concurrent_writes(self): 40 self.log.info("Test sqlite concurrent writes are in the correct order") 41 self.restart_node(0, extra_args=["-unsafesqlitesync=0"]) 42 self.nodes[0].createwallet(wallet_name="concurrency", blank=True) 43 wallet = self.nodes[0].get_wallet_rpc("concurrency") 44 # First import a descriptor that uses hardened dervation so that topping up 45 # Will require writing a ton to db 46 wallet.importdescriptors([{"desc":descsum_create("wpkh(tprv8ZgxMBicQKsPeuVhWwi6wuMQGfPKi9Li5GtX35jVNknACgqe3CY4g5xgkfDDJcmtF7o1QnxWDRYw4H5P26PXq7sbcUkEqeR4fg3Kxp2tigg/0h/0h/*h)"), "timestamp": "now", "active": True}]) 47 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread: 48 topup = thread.submit(wallet.keypoolrefill, newsize=1000) 49 50 # Then while the topup is running, we need to do something that will call 51 # ChainStateFlushed which will trigger a write to the db, hopefully at the 52 # same time that the topup still has an open db transaction. 53 self.nodes[0].cli.gettxoutsetinfo() 54 assert_equal(topup.result(), None) 55 56 wallet.unloadwallet() 57 58 # Check that everything was written 59 wallet_db = self.nodes[0].wallets_path / "concurrency" / self.wallet_data_filename 60 conn = sqlite3.connect(wallet_db) 61 with conn: 62 # Retrieve the bestblock_nomerkle record 63 bestblock_rec = conn.execute("SELECT value FROM main WHERE hex(key) = '1262657374626C6F636B5F6E6F6D65726B6C65'").fetchone()[0] 64 # Retrieve the number of descriptor cache records 65 # Since we store binary data, sqlite's comparison operators don't work everywhere 66 # so just retrieve all records and process them ourselves. 67 db_keys = conn.execute("SELECT key FROM main").fetchall() 68 cache_records = len([k[0] for k in db_keys if b"walletdescriptorcache" in k[0]]) 69 conn.close() 70 71 assert_equal(bestblock_rec[5:37][::-1].hex(), self.nodes[0].getbestblockhash()) 72 assert_equal(cache_records, 1000) 73 74 def run_test(self): 75 if self.is_bdb_compiled(): 76 # Make a legacy wallet and check it is BDB 77 self.nodes[0].createwallet(wallet_name="legacy1", descriptors=False) 78 wallet_info = self.nodes[0].getwalletinfo() 79 assert_equal(wallet_info['format'], 'bdb') 80 self.nodes[0].unloadwallet("legacy1") 81 else: 82 self.log.warning("Skipping BDB test") 83 84 # Make a descriptor wallet 85 self.log.info("Making a descriptor wallet") 86 self.nodes[0].createwallet(wallet_name="desc1", descriptors=True) 87 88 # A descriptor wallet should have 100 addresses * 4 types = 400 keys 89 self.log.info("Checking wallet info") 90 wallet_info = self.nodes[0].getwalletinfo() 91 assert_equal(wallet_info['format'], 'sqlite') 92 assert_equal(wallet_info['keypoolsize'], 400) 93 assert_equal(wallet_info['keypoolsize_hd_internal'], 400) 94 assert 'keypoololdest' not in wallet_info 95 96 # Check that getnewaddress works 97 self.log.info("Test that getnewaddress and getrawchangeaddress work") 98 addr = self.nodes[0].getnewaddress("", "legacy") 99 addr_info = self.nodes[0].getaddressinfo(addr) 100 assert addr_info['desc'].startswith('pkh(') 101 assert_equal(addr_info['hdkeypath'], 'm/44h/1h/0h/0/0') 102 103 addr = self.nodes[0].getnewaddress("", "p2sh-segwit") 104 addr_info = self.nodes[0].getaddressinfo(addr) 105 assert addr_info['desc'].startswith('sh(wpkh(') 106 assert_equal(addr_info['hdkeypath'], 'm/49h/1h/0h/0/0') 107 108 addr = self.nodes[0].getnewaddress("", "bech32") 109 addr_info = self.nodes[0].getaddressinfo(addr) 110 assert addr_info['desc'].startswith('wpkh(') 111 assert_equal(addr_info['hdkeypath'], 'm/84h/1h/0h/0/0') 112 113 addr = self.nodes[0].getnewaddress("", "bech32m") 114 addr_info = self.nodes[0].getaddressinfo(addr) 115 assert addr_info['desc'].startswith('tr(') 116 assert_equal(addr_info['hdkeypath'], 'm/86h/1h/0h/0/0') 117 118 # Check that getrawchangeaddress works 119 addr = self.nodes[0].getrawchangeaddress("legacy") 120 addr_info = self.nodes[0].getaddressinfo(addr) 121 assert addr_info['desc'].startswith('pkh(') 122 assert_equal(addr_info['hdkeypath'], 'm/44h/1h/0h/1/0') 123 124 addr = self.nodes[0].getrawchangeaddress("p2sh-segwit") 125 addr_info = self.nodes[0].getaddressinfo(addr) 126 assert addr_info['desc'].startswith('sh(wpkh(') 127 assert_equal(addr_info['hdkeypath'], 'm/49h/1h/0h/1/0') 128 129 addr = self.nodes[0].getrawchangeaddress("bech32") 130 addr_info = self.nodes[0].getaddressinfo(addr) 131 assert addr_info['desc'].startswith('wpkh(') 132 assert_equal(addr_info['hdkeypath'], 'm/84h/1h/0h/1/0') 133 134 addr = self.nodes[0].getrawchangeaddress("bech32m") 135 addr_info = self.nodes[0].getaddressinfo(addr) 136 assert addr_info['desc'].startswith('tr(') 137 assert_equal(addr_info['hdkeypath'], 'm/86h/1h/0h/1/0') 138 139 # Make a wallet to receive coins at 140 self.nodes[0].createwallet(wallet_name="desc2", descriptors=True) 141 recv_wrpc = self.nodes[0].get_wallet_rpc("desc2") 142 send_wrpc = self.nodes[0].get_wallet_rpc("desc1") 143 144 # Generate some coins 145 self.generatetoaddress(self.nodes[0], COINBASE_MATURITY + 1, send_wrpc.getnewaddress()) 146 147 # Make transactions 148 self.log.info("Test sending and receiving") 149 addr = recv_wrpc.getnewaddress() 150 send_wrpc.sendtoaddress(addr, 10) 151 152 # Make sure things are disabled 153 self.log.info("Test disabled RPCs") 154 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.importprivkey, "cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW") 155 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.importpubkey, send_wrpc.getaddressinfo(send_wrpc.getnewaddress())["pubkey"]) 156 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.importaddress, recv_wrpc.getnewaddress()) 157 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.importmulti, []) 158 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.addmultisigaddress, 1, [recv_wrpc.getnewaddress()]) 159 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.dumpprivkey, recv_wrpc.getnewaddress()) 160 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.dumpwallet, 'wallet.dump') 161 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.importwallet, 'wallet.dump') 162 assert_raises_rpc_error(-4, "Only legacy wallets are supported by this command", recv_wrpc.rpc.sethdseed) 163 164 self.log.info("Test encryption") 165 # Get the master fingerprint before encrypt 166 info1 = send_wrpc.getaddressinfo(send_wrpc.getnewaddress()) 167 168 # Encrypt wallet 0 169 send_wrpc.encryptwallet('pass') 170 with WalletUnlock(send_wrpc, "pass"): 171 addr = send_wrpc.getnewaddress() 172 info2 = send_wrpc.getaddressinfo(addr) 173 assert info1['hdmasterfingerprint'] != info2['hdmasterfingerprint'] 174 assert 'hdmasterfingerprint' in send_wrpc.getaddressinfo(send_wrpc.getnewaddress()) 175 info3 = send_wrpc.getaddressinfo(addr) 176 assert_equal(info2['desc'], info3['desc']) 177 178 self.log.info("Test that getnewaddress still works after keypool is exhausted in an encrypted wallet") 179 for _ in range(500): 180 send_wrpc.getnewaddress() 181 182 self.log.info("Test that unlock is needed when deriving only hardened keys in an encrypted wallet") 183 with WalletUnlock(send_wrpc, "pass"): 184 send_wrpc.importdescriptors([{ 185 "desc": "wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h)#y4dfsj7n", 186 "timestamp": "now", 187 "range": [0,10], 188 "active": True 189 }]) 190 # Exhaust keypool of 100 191 for _ in range(100): 192 send_wrpc.getnewaddress(address_type='bech32') 193 # This should now error 194 assert_raises_rpc_error(-12, "Keypool ran out, please call keypoolrefill first", send_wrpc.getnewaddress, '', 'bech32') 195 196 self.log.info("Test born encrypted wallets") 197 self.nodes[0].createwallet('desc_enc', False, False, 'pass', False, True) 198 enc_rpc = self.nodes[0].get_wallet_rpc('desc_enc') 199 enc_rpc.getnewaddress() # Makes sure that we can get a new address from a born encrypted wallet 200 201 self.log.info("Test blank descriptor wallets") 202 self.nodes[0].createwallet(wallet_name='desc_blank', blank=True, descriptors=True) 203 blank_rpc = self.nodes[0].get_wallet_rpc('desc_blank') 204 assert_raises_rpc_error(-4, 'This wallet has no available keys', blank_rpc.getnewaddress) 205 206 self.log.info("Test descriptor wallet with disabled private keys") 207 self.nodes[0].createwallet(wallet_name='desc_no_priv', disable_private_keys=True, descriptors=True) 208 nopriv_rpc = self.nodes[0].get_wallet_rpc('desc_no_priv') 209 assert_raises_rpc_error(-4, 'This wallet has no available keys', nopriv_rpc.getnewaddress) 210 211 self.log.info("Test descriptor exports") 212 self.nodes[0].createwallet(wallet_name='desc_export', descriptors=True) 213 exp_rpc = self.nodes[0].get_wallet_rpc('desc_export') 214 self.nodes[0].createwallet(wallet_name='desc_import', disable_private_keys=True, descriptors=True) 215 imp_rpc = self.nodes[0].get_wallet_rpc('desc_import') 216 217 addr_types = [('legacy', False, 'pkh(', '44h/1h/0h', -13), 218 ('p2sh-segwit', False, 'sh(wpkh(', '49h/1h/0h', -14), 219 ('bech32', False, 'wpkh(', '84h/1h/0h', -13), 220 ('bech32m', False, 'tr(', '86h/1h/0h', -13), 221 ('legacy', True, 'pkh(', '44h/1h/0h', -13), 222 ('p2sh-segwit', True, 'sh(wpkh(', '49h/1h/0h', -14), 223 ('bech32', True, 'wpkh(', '84h/1h/0h', -13), 224 ('bech32m', True, 'tr(', '86h/1h/0h', -13)] 225 226 for addr_type, internal, desc_prefix, deriv_path, int_idx in addr_types: 227 int_str = 'internal' if internal else 'external' 228 229 self.log.info("Testing descriptor address type for {} {}".format(addr_type, int_str)) 230 if internal: 231 addr = exp_rpc.getrawchangeaddress(address_type=addr_type) 232 else: 233 addr = exp_rpc.getnewaddress(address_type=addr_type) 234 desc = exp_rpc.getaddressinfo(addr)['parent_desc'] 235 assert_equal(desc_prefix, desc[0:len(desc_prefix)]) 236 idx = desc.index('/') + 1 237 assert_equal(deriv_path, desc[idx:idx + 9]) 238 if internal: 239 assert_equal('1', desc[int_idx]) 240 else: 241 assert_equal('0', desc[int_idx]) 242 243 self.log.info("Testing the same descriptor is returned for address type {} {}".format(addr_type, int_str)) 244 for i in range(0, 10): 245 if internal: 246 addr = exp_rpc.getrawchangeaddress(address_type=addr_type) 247 else: 248 addr = exp_rpc.getnewaddress(address_type=addr_type) 249 test_desc = exp_rpc.getaddressinfo(addr)['parent_desc'] 250 assert_equal(desc, test_desc) 251 252 self.log.info("Testing import of exported {} descriptor".format(addr_type)) 253 imp_rpc.importdescriptors([{ 254 'desc': desc, 255 'active': True, 256 'next_index': 11, 257 'timestamp': 'now', 258 'internal': internal 259 }]) 260 261 for i in range(0, 10): 262 if internal: 263 exp_addr = exp_rpc.getrawchangeaddress(address_type=addr_type) 264 imp_addr = imp_rpc.getrawchangeaddress(address_type=addr_type) 265 else: 266 exp_addr = exp_rpc.getnewaddress(address_type=addr_type) 267 imp_addr = imp_rpc.getnewaddress(address_type=addr_type) 268 assert_equal(exp_addr, imp_addr) 269 270 self.log.info("Test that loading descriptor wallet containing legacy key types throws error") 271 self.nodes[0].createwallet(wallet_name="crashme", descriptors=True) 272 self.nodes[0].unloadwallet("crashme") 273 wallet_db = self.nodes[0].wallets_path / "crashme" / self.wallet_data_filename 274 conn = sqlite3.connect(wallet_db) 275 with conn: 276 # add "cscript" entry: key type is uint160 (20 bytes), value type is CScript (zero-length here) 277 conn.execute('INSERT INTO main VALUES(?, ?)', (b'\x07cscript' + b'\x00'*20, b'\x00')) 278 conn.close() 279 assert_raises_rpc_error(-4, "Unexpected legacy entry in descriptor wallet found.", self.nodes[0].loadwallet, "crashme") 280 281 self.test_concurrent_writes() 282 283 284 if __name__ == '__main__': 285 WalletDescriptorTest().main ()