/ test / functional / wallet_descriptor.py
wallet_descriptor.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2019-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test descriptor wallet function."""
  6  
  7  try:
  8      import sqlite3
  9  except ImportError:
 10      pass
 11  
 12  import re
 13  
 14  from test_framework.blocktools import COINBASE_MATURITY
 15  from test_framework.test_framework import BitcoinTestFramework
 16  from test_framework.util import (
 17      assert_not_equal,
 18      assert_equal,
 19      assert_raises_rpc_error
 20  )
 21  from test_framework.wallet_util import WalletUnlock
 22  
 23  
 24  class WalletDescriptorTest(BitcoinTestFramework):
 25      def set_test_params(self):
 26          self.setup_clean_chain = True
 27          self.num_nodes = 1
 28          self.extra_args = [['-keypool=100']]
 29  
 30      def skip_test_if_missing_module(self):
 31          self.skip_if_no_wallet()
 32          self.skip_if_no_py_sqlite3()
 33  
 34      def test_parent_descriptors(self):
 35          self.log.info("Check that parent_descs is the same for all RPCs and is normalized")
 36          self.nodes[0].createwallet(wallet_name="parent_descs")
 37          wallet = self.nodes[0].get_wallet_rpc("parent_descs")
 38          default_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
 39  
 40          addr = wallet.getnewaddress()
 41          parent_desc = wallet.getaddressinfo(addr)["parent_desc"]
 42  
 43          # Verify that the parent descriptor is normalized
 44          # First remove the checksum
 45          desc_verify = parent_desc.split("#")[0]
 46          # Next extract the xpub
 47          desc_verify = re.sub(r"tpub\w+?(?=/)", "", desc_verify)
 48          # Extract origin info
 49          origin_match = re.search(r'\[([\da-fh/]+)\]', desc_verify)
 50          origin_part = origin_match.group(1) if origin_match else ""
 51          # Split on "]" for everything after the origin info
 52          after_origin = desc_verify.split("]", maxsplit=1)[-1]
 53          # Look for the hardened markers “h” inside each piece
 54          # We don't need to check for aspostrophe as normalization will not output aspostrophe
 55          found_hardened_in_origin = "h" in origin_part
 56          found_hardened_after_origin = "h" in after_origin
 57          assert_equal(found_hardened_in_origin, True)
 58          assert_equal(found_hardened_after_origin, False)
 59  
 60          # Send some coins so we can check listunspent, listtransactions, listunspent, and gettransaction
 61          since_block = self.nodes[0].getbestblockhash()
 62          txid = default_wallet.sendtoaddress(addr, 1)
 63          self.generate(self.nodes[0], 1)
 64  
 65          unspent = wallet.listunspent()
 66          assert_equal(len(unspent), 1)
 67          assert_equal(unspent[0]["parent_descs"], [parent_desc])
 68  
 69          txs = wallet.listtransactions()
 70          assert_equal(len(txs), 1)
 71          assert_equal(txs[0]["parent_descs"], [parent_desc])
 72  
 73          txs = wallet.listsinceblock(since_block)["transactions"]
 74          assert_equal(len(txs), 1)
 75          assert_equal(txs[0]["parent_descs"], [parent_desc])
 76  
 77          tx = wallet.gettransaction(txid=txid, verbose=True)
 78          assert_equal(tx["details"][0]["parent_descs"], [parent_desc])
 79  
 80          wallet.unloadwallet()
 81  
 82      def run_test(self):
 83          self.generate(self.nodes[0], COINBASE_MATURITY + 1)
 84  
 85          # Make a descriptor wallet
 86          self.log.info("Making a descriptor wallet")
 87          self.nodes[0].createwallet(wallet_name="desc1")
 88          wallet = self.nodes[0].get_wallet_rpc("desc1")
 89  
 90          # A descriptor wallet should have 100 addresses * 4 types = 400 keys
 91          self.log.info("Checking wallet info")
 92          wallet_info = wallet.getwalletinfo()
 93          assert_equal(wallet_info['format'], 'sqlite')
 94          assert_equal(wallet_info['keypoolsize'], 400)
 95          assert_equal(wallet_info['keypoolsize_hd_internal'], 400)
 96          assert 'keypoololdest' not in wallet_info
 97  
 98          # Check that getnewaddress works
 99          self.log.info("Test that getnewaddress and getrawchangeaddress work")
100          addr = wallet.getnewaddress("", "legacy")
101          addr_info = wallet.getaddressinfo(addr)
102          assert addr_info['desc'].startswith('pkh(')
103          assert_equal(addr_info['hdkeypath'], 'm/44h/1h/0h/0/0')
104  
105          addr = wallet.getnewaddress("", "p2sh-segwit")
106          addr_info = wallet.getaddressinfo(addr)
107          assert addr_info['desc'].startswith('sh(wpkh(')
108          assert_equal(addr_info['hdkeypath'], 'm/49h/1h/0h/0/0')
109  
110          addr = wallet.getnewaddress("", "bech32")
111          addr_info = wallet.getaddressinfo(addr)
112          assert addr_info['desc'].startswith('wpkh(')
113          assert_equal(addr_info['hdkeypath'], 'm/84h/1h/0h/0/0')
114  
115          addr = wallet.getnewaddress("", "bech32m")
116          addr_info = wallet.getaddressinfo(addr)
117          assert addr_info['desc'].startswith('tr(')
118          assert_equal(addr_info['hdkeypath'], 'm/86h/1h/0h/0/0')
119  
120          # Check that getrawchangeaddress works
121          addr = wallet.getrawchangeaddress("legacy")
122          addr_info = wallet.getaddressinfo(addr)
123          assert addr_info['desc'].startswith('pkh(')
124          assert_equal(addr_info['hdkeypath'], 'm/44h/1h/0h/1/0')
125  
126          addr = wallet.getrawchangeaddress("p2sh-segwit")
127          addr_info = wallet.getaddressinfo(addr)
128          assert addr_info['desc'].startswith('sh(wpkh(')
129          assert_equal(addr_info['hdkeypath'], 'm/49h/1h/0h/1/0')
130  
131          addr = wallet.getrawchangeaddress("bech32")
132          addr_info = wallet.getaddressinfo(addr)
133          assert addr_info['desc'].startswith('wpkh(')
134          assert_equal(addr_info['hdkeypath'], 'm/84h/1h/0h/1/0')
135  
136          addr = wallet.getrawchangeaddress("bech32m")
137          addr_info = wallet.getaddressinfo(addr)
138          assert addr_info['desc'].startswith('tr(')
139          assert_equal(addr_info['hdkeypath'], 'm/86h/1h/0h/1/0')
140  
141          # Make a wallet to receive coins at
142          self.nodes[0].createwallet(wallet_name="desc2")
143          recv_wrpc = self.nodes[0].get_wallet_rpc("desc2")
144          send_wrpc = self.nodes[0].get_wallet_rpc("desc1")
145  
146          # Generate some coins
147          self.generatetoaddress(self.nodes[0], COINBASE_MATURITY + 1, send_wrpc.getnewaddress())
148  
149          # Make transactions
150          self.log.info("Test sending and receiving")
151          addr = recv_wrpc.getnewaddress()
152          send_wrpc.sendtoaddress(addr, 10)
153  
154          self.log.info("Test encryption")
155          # Get the master fingerprint before encrypt
156          info1 = send_wrpc.getaddressinfo(send_wrpc.getnewaddress())
157  
158          # Encrypt wallet 0
159          send_wrpc.encryptwallet('pass')
160          with WalletUnlock(send_wrpc, "pass"):
161              addr = send_wrpc.getnewaddress()
162              info2 = send_wrpc.getaddressinfo(addr)
163              assert_not_equal(info1['hdmasterfingerprint'], info2['hdmasterfingerprint'])
164          assert 'hdmasterfingerprint' in send_wrpc.getaddressinfo(send_wrpc.getnewaddress())
165          info3 = send_wrpc.getaddressinfo(addr)
166          assert_equal(info2['desc'], info3['desc'])
167  
168          self.log.info("Test that getnewaddress still works after keypool is exhausted in an encrypted wallet")
169          for _ in range(500):
170              send_wrpc.getnewaddress()
171  
172          self.log.info("Test that unlock is needed when deriving only hardened keys in an encrypted wallet")
173          with WalletUnlock(send_wrpc, "pass"):
174              send_wrpc.importdescriptors([{
175                  "desc": "wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h)#y4dfsj7n",
176                  "timestamp": "now",
177                  "range": [0,10],
178                  "active": True
179              }])
180          # Exhaust keypool of 100
181          for _ in range(100):
182              send_wrpc.getnewaddress(address_type='bech32')
183          # This should now error
184          assert_raises_rpc_error(-12, "Keypool ran out, please call keypoolrefill first", send_wrpc.getnewaddress, '', 'bech32')
185  
186          self.log.info("Test born encrypted wallets")
187          self.nodes[0].createwallet('desc_enc', False, False, 'pass', False, True)
188          enc_rpc = self.nodes[0].get_wallet_rpc('desc_enc')
189          enc_rpc.getnewaddress() # Makes sure that we can get a new address from a born encrypted wallet
190  
191          self.log.info("Test blank descriptor wallets")
192          self.nodes[0].createwallet(wallet_name='desc_blank', blank=True)
193          blank_rpc = self.nodes[0].get_wallet_rpc('desc_blank')
194          assert_raises_rpc_error(-4, 'This wallet has no available keys', blank_rpc.getnewaddress)
195  
196          self.log.info("Test descriptor wallet with disabled private keys")
197          self.nodes[0].createwallet(wallet_name='desc_no_priv', disable_private_keys=True)
198          nopriv_rpc = self.nodes[0].get_wallet_rpc('desc_no_priv')
199          assert_raises_rpc_error(-4, 'This wallet has no available keys', nopriv_rpc.getnewaddress)
200  
201          self.log.info("Test descriptor exports")
202          self.nodes[0].createwallet(wallet_name='desc_export')
203          exp_rpc = self.nodes[0].get_wallet_rpc('desc_export')
204          self.nodes[0].createwallet(wallet_name='desc_import', disable_private_keys=True)
205          imp_rpc = self.nodes[0].get_wallet_rpc('desc_import')
206  
207          addr_types = [('legacy', False, 'pkh(', '44h/1h/0h', -13),
208                        ('p2sh-segwit', False, 'sh(wpkh(', '49h/1h/0h', -14),
209                        ('bech32', False, 'wpkh(', '84h/1h/0h', -13),
210                        ('bech32m', False, 'tr(', '86h/1h/0h', -13),
211                        ('legacy', True, 'pkh(', '44h/1h/0h', -13),
212                        ('p2sh-segwit', True, 'sh(wpkh(', '49h/1h/0h', -14),
213                        ('bech32', True, 'wpkh(', '84h/1h/0h', -13),
214                        ('bech32m', True, 'tr(', '86h/1h/0h', -13)]
215  
216          for addr_type, internal, desc_prefix, deriv_path, int_idx in addr_types:
217              int_str = 'internal' if internal else 'external'
218  
219              self.log.info("Testing descriptor address type for {} {}".format(addr_type, int_str))
220              if internal:
221                  addr = exp_rpc.getrawchangeaddress(address_type=addr_type)
222              else:
223                  addr = exp_rpc.getnewaddress(address_type=addr_type)
224              desc = exp_rpc.getaddressinfo(addr)['parent_desc']
225              assert_equal(desc_prefix, desc[0:len(desc_prefix)])
226              idx = desc.index('/') + 1
227              assert_equal(deriv_path, desc[idx:idx + 9])
228              if internal:
229                  assert_equal('1', desc[int_idx])
230              else:
231                  assert_equal('0', desc[int_idx])
232  
233              self.log.info("Testing the same descriptor is returned for address type {} {}".format(addr_type, int_str))
234              for i in range(0, 10):
235                  if internal:
236                      addr = exp_rpc.getrawchangeaddress(address_type=addr_type)
237                  else:
238                      addr = exp_rpc.getnewaddress(address_type=addr_type)
239                  test_desc = exp_rpc.getaddressinfo(addr)['parent_desc']
240                  assert_equal(desc, test_desc)
241  
242              self.log.info("Testing import of exported {} descriptor".format(addr_type))
243              imp_rpc.importdescriptors([{
244                  'desc': desc,
245                  'active': True,
246                  'next_index': 11,
247                  'timestamp': 'now',
248                  'internal': internal
249              }])
250  
251              for i in range(0, 10):
252                  if internal:
253                      exp_addr = exp_rpc.getrawchangeaddress(address_type=addr_type)
254                      imp_addr = imp_rpc.getrawchangeaddress(address_type=addr_type)
255                  else:
256                      exp_addr = exp_rpc.getnewaddress(address_type=addr_type)
257                      imp_addr = imp_rpc.getnewaddress(address_type=addr_type)
258                  assert_equal(exp_addr, imp_addr)
259  
260          self.log.info("Test that loading descriptor wallet containing legacy key types throws error")
261          self.nodes[0].createwallet(wallet_name="crashme")
262          self.nodes[0].unloadwallet("crashme")
263          wallet_db = self.nodes[0].wallets_path / "crashme" / self.wallet_data_filename
264          conn = sqlite3.connect(wallet_db)
265          with conn:
266              # add "cscript" entry: key type is uint160 (20 bytes), value type is CScript (zero-length here)
267              conn.execute('INSERT INTO main VALUES(?, ?)', (b'\x07cscript' + b'\x00'*20, b'\x00'))
268          conn.close()
269          assert_raises_rpc_error(-4, "Unexpected legacy entry in descriptor wallet found.", self.nodes[0].loadwallet, "crashme")
270  
271          self.test_parent_descriptors()
272  
273  if __name__ == '__main__':
274      WalletDescriptorTest(__file__).main()