/ test / functional / wallet_hd.py
wallet_hd.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2016-2022 The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test Hierarchical Deterministic wallet function."""
  6  
  7  import shutil
  8  
  9  from test_framework.blocktools import COINBASE_MATURITY
 10  from test_framework.test_framework import BitcoinTestFramework
 11  from test_framework.util import (
 12      assert_equal,
 13      assert_raises_rpc_error,
 14  )
 15  
 16  
 17  class WalletHDTest(BitcoinTestFramework):
 18      def add_options(self, parser):
 19          self.add_wallet_options(parser)
 20  
 21      def set_test_params(self):
 22          self.setup_clean_chain = True
 23          self.num_nodes = 2
 24          self.extra_args = [[], ['-keypool=0']]
 25          # whitelist peers to speed up tx relay / mempool sync
 26          self.noban_tx_relay = True
 27  
 28          self.supports_cli = False
 29  
 30      def skip_test_if_missing_module(self):
 31          self.skip_if_no_wallet()
 32  
 33      def run_test(self):
 34          # Make sure we use hd, keep masterkeyid
 35          hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint']
 36          assert_equal(len(hd_fingerprint), 8)
 37  
 38          # create an internal key
 39          change_addr = self.nodes[1].getrawchangeaddress()
 40          change_addrV = self.nodes[1].getaddressinfo(change_addr)
 41          if self.options.descriptors:
 42              assert_equal(change_addrV["hdkeypath"], "m/84h/1h/0h/1/0")
 43          else:
 44              assert_equal(change_addrV["hdkeypath"], "m/0'/1'/0'")  #first internal child key
 45  
 46          # Import a non-HD private key in the HD wallet
 47          non_hd_add = 'bcrt1qmevj8zfx0wdvp05cqwkmr6mxkfx60yezwjksmt'
 48          non_hd_key = 'cS9umN9w6cDMuRVYdbkfE4c7YUFLJRoXMfhQ569uY4odiQbVN8Rt'
 49          self.nodes[1].importprivkey(non_hd_key)
 50  
 51          # This should be enough to keep the master key and the non-HD key
 52          self.nodes[1].backupwallet(self.nodes[1].datadir_path / "hd.bak")
 53          #self.nodes[1].dumpwallet(self.nodes[1].datadir_path / "hd.dump")
 54  
 55          # Derive some HD addresses and remember the last
 56          # Also send funds to each add
 57          self.generate(self.nodes[0], COINBASE_MATURITY + 1)
 58          hd_add = None
 59          NUM_HD_ADDS = 10
 60          for i in range(1, NUM_HD_ADDS + 1):
 61              hd_add = self.nodes[1].getnewaddress()
 62              hd_info = self.nodes[1].getaddressinfo(hd_add)
 63              if self.options.descriptors:
 64                  assert_equal(hd_info["hdkeypath"], "m/84h/1h/0h/0/" + str(i))
 65              else:
 66                  assert_equal(hd_info["hdkeypath"], "m/0'/0'/" + str(i) + "'")
 67              assert_equal(hd_info["hdmasterfingerprint"], hd_fingerprint)
 68              self.nodes[0].sendtoaddress(hd_add, 1)
 69              self.generate(self.nodes[0], 1)
 70          self.nodes[0].sendtoaddress(non_hd_add, 1)
 71          self.generate(self.nodes[0], 1)
 72  
 73          # create an internal key (again)
 74          change_addr = self.nodes[1].getrawchangeaddress()
 75          change_addrV = self.nodes[1].getaddressinfo(change_addr)
 76          if self.options.descriptors:
 77              assert_equal(change_addrV["hdkeypath"], "m/84h/1h/0h/1/1")
 78          else:
 79              assert_equal(change_addrV["hdkeypath"], "m/0'/1'/1'")  #second internal child key
 80  
 81          self.sync_all()
 82          assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
 83  
 84          self.log.info("Restore backup ...")
 85          self.stop_node(1)
 86          # we need to delete the complete chain directory
 87          # otherwise node1 would auto-recover all funds in flag the keypool keys as used
 88          shutil.rmtree(self.nodes[1].blocks_path)
 89          shutil.rmtree(self.nodes[1].chain_path / "chainstate")
 90          shutil.copyfile(
 91              self.nodes[1].datadir_path / "hd.bak",
 92              self.nodes[1].wallets_path / self.default_wallet_name / self.wallet_data_filename
 93          )
 94          self.start_node(1)
 95  
 96          # Assert that derivation is deterministic
 97          hd_add_2 = None
 98          for i in range(1, NUM_HD_ADDS + 1):
 99              hd_add_2 = self.nodes[1].getnewaddress()
100              hd_info_2 = self.nodes[1].getaddressinfo(hd_add_2)
101              if self.options.descriptors:
102                  assert_equal(hd_info_2["hdkeypath"], "m/84h/1h/0h/0/" + str(i))
103              else:
104                  assert_equal(hd_info_2["hdkeypath"], "m/0'/0'/" + str(i) + "'")
105              assert_equal(hd_info_2["hdmasterfingerprint"], hd_fingerprint)
106          assert_equal(hd_add, hd_add_2)
107          self.connect_nodes(0, 1)
108          self.sync_all()
109  
110          # Needs rescan
111          self.nodes[1].rescanblockchain()
112          assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
113  
114          # Try a RPC based rescan
115          self.stop_node(1)
116          shutil.rmtree(self.nodes[1].blocks_path)
117          shutil.rmtree(self.nodes[1].chain_path / "chainstate")
118          shutil.copyfile(
119              self.nodes[1].datadir_path / "hd.bak",
120              self.nodes[1].wallets_path / self.default_wallet_name / self.wallet_data_filename
121          )
122          self.start_node(1, extra_args=self.extra_args[1])
123          self.connect_nodes(0, 1)
124          self.sync_all()
125          # Wallet automatically scans blocks older than key on startup
126          assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
127          out = self.nodes[1].rescanblockchain(0, 1)
128          assert_equal(out['start_height'], 0)
129          assert_equal(out['stop_height'], 1)
130          out = self.nodes[1].rescanblockchain()
131          assert_equal(out['start_height'], 0)
132          assert_equal(out['stop_height'], self.nodes[1].getblockcount())
133          assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
134  
135          # send a tx and make sure its using the internal chain for the changeoutput
136          txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1)
137          outs = self.nodes[1].gettransaction(txid=txid, verbose=True)['decoded']['vout']
138          keypath = ""
139          for out in outs:
140              if out['value'] != 1:
141                  keypath = self.nodes[1].getaddressinfo(out['scriptPubKey']['address'])['hdkeypath']
142  
143          if self.options.descriptors:
144              assert_equal(keypath[0:14], "m/84h/1h/0h/1/")
145          else:
146              assert_equal(keypath[0:7], "m/0'/1'")
147  
148          if not self.options.descriptors:
149              # Generate a new HD seed on node 1 and make sure it is set
150              orig_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid']
151              self.nodes[1].sethdseed()
152              new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid']
153              assert orig_masterkeyid != new_masterkeyid
154              addr = self.nodes[1].getnewaddress()
155              # Make sure the new address is the first from the keypool
156              assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/0\'')
157              self.nodes[1].keypoolrefill(1)  # Fill keypool with 1 key
158  
159              # Set a new HD seed on node 1 without flushing the keypool
160              new_seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress())
161              orig_masterkeyid = new_masterkeyid
162              self.nodes[1].sethdseed(False, new_seed)
163              new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid']
164              assert orig_masterkeyid != new_masterkeyid
165              addr = self.nodes[1].getnewaddress()
166              assert_equal(orig_masterkeyid, self.nodes[1].getaddressinfo(addr)['hdseedid'])
167              # Make sure the new address continues previous keypool
168              assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/1\'')
169  
170              # Check that the next address is from the new seed
171              self.nodes[1].keypoolrefill(1)
172              next_addr = self.nodes[1].getnewaddress()
173              assert_equal(new_masterkeyid, self.nodes[1].getaddressinfo(next_addr)['hdseedid'])
174              # Make sure the new address is not from previous keypool
175              assert_equal(self.nodes[1].getaddressinfo(next_addr)['hdkeypath'], 'm/0\'/0\'/0\'')
176              assert next_addr != addr
177  
178              # Sethdseed parameter validity
179              assert_raises_rpc_error(-1, 'sethdseed', self.nodes[0].sethdseed, False, new_seed, 0)
180              assert_raises_rpc_error(-5, "Invalid private key", self.nodes[1].sethdseed, False, "not_wif")
181              assert_raises_rpc_error(-3, "JSON value of type string is not of expected type bool", self.nodes[1].sethdseed, "Not_bool")
182              assert_raises_rpc_error(-3, "JSON value of type bool is not of expected type string", self.nodes[1].sethdseed, False, True)
183              assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, new_seed)
184              assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, self.nodes[1].dumpprivkey(self.nodes[1].getnewaddress()))
185  
186              self.log.info('Test sethdseed restoring with keys outside of the initial keypool')
187              self.generate(self.nodes[0], 10)
188              # Restart node 1 with keypool of 3 and a different wallet
189              self.nodes[1].createwallet(wallet_name='origin', blank=True)
190              self.restart_node(1, extra_args=['-keypool=3', '-wallet=origin'])
191              self.connect_nodes(0, 1)
192  
193              # sethdseed restoring and seeing txs to addresses out of the keypool
194              origin_rpc = self.nodes[1].get_wallet_rpc('origin')
195              seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress())
196              origin_rpc.sethdseed(True, seed)
197  
198              self.nodes[1].createwallet(wallet_name='restore', blank=True)
199              restore_rpc = self.nodes[1].get_wallet_rpc('restore')
200              restore_rpc.sethdseed(True, seed)  # Set to be the same seed as origin_rpc
201              restore_rpc.sethdseed(True)  # Rotate to a new seed, making original `seed` inactive
202  
203              self.nodes[1].createwallet(wallet_name='restore2', blank=True)
204              restore2_rpc = self.nodes[1].get_wallet_rpc('restore2')
205              restore2_rpc.sethdseed(True, seed)  # Set to be the same seed as origin_rpc
206              restore2_rpc.sethdseed(True)  # Rotate to a new seed, making original `seed` inactive
207  
208              # Check persistence of inactive seed by reloading restore. restore2 is still loaded to test the case where the wallet is not reloaded
209              restore_rpc.unloadwallet()
210              self.nodes[1].loadwallet('restore')
211              restore_rpc = self.nodes[1].get_wallet_rpc('restore')
212  
213              # Empty origin keypool and get an address that is beyond the initial keypool
214              origin_rpc.getnewaddress()
215              origin_rpc.getnewaddress()
216              last_addr = origin_rpc.getnewaddress()  # Last address of initial keypool
217              addr = origin_rpc.getnewaddress()  # First address beyond initial keypool
218  
219              # Check that the restored seed has last_addr but does not have addr
220              info = restore_rpc.getaddressinfo(last_addr)
221              assert_equal(info['ismine'], True)
222              info = restore_rpc.getaddressinfo(addr)
223              assert_equal(info['ismine'], False)
224              info = restore2_rpc.getaddressinfo(last_addr)
225              assert_equal(info['ismine'], True)
226              info = restore2_rpc.getaddressinfo(addr)
227              assert_equal(info['ismine'], False)
228              # Check that the origin seed has addr
229              info = origin_rpc.getaddressinfo(addr)
230              assert_equal(info['ismine'], True)
231  
232              # Send a transaction to addr, which is out of the initial keypool.
233              # The wallet that has set a new seed (restore_rpc) should not detect this transaction.
234              txid = self.nodes[0].sendtoaddress(addr, 1)
235              origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex'])
236              self.generate(self.nodes[0], 1)
237              origin_rpc.gettransaction(txid)
238              assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, txid)
239              out_of_kp_txid = txid
240  
241              # Send a transaction to last_addr, which is in the initial keypool.
242              # The wallet that has set a new seed (restore_rpc) should detect this transaction and generate 3 new keys from the initial seed.
243              # The previous transaction (out_of_kp_txid) should still not be detected as a rescan is required.
244              txid = self.nodes[0].sendtoaddress(last_addr, 1)
245              origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex'])
246              self.generate(self.nodes[0], 1)
247              origin_rpc.gettransaction(txid)
248              restore_rpc.gettransaction(txid)
249              assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, out_of_kp_txid)
250              restore2_rpc.gettransaction(txid)
251              assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore2_rpc.gettransaction, out_of_kp_txid)
252  
253              # After rescanning, restore_rpc should now see out_of_kp_txid and generate an additional key.
254              # addr should now be part of restore_rpc and be ismine
255              restore_rpc.rescanblockchain()
256              restore_rpc.gettransaction(out_of_kp_txid)
257              info = restore_rpc.getaddressinfo(addr)
258              assert_equal(info['ismine'], True)
259              restore2_rpc.rescanblockchain()
260              restore2_rpc.gettransaction(out_of_kp_txid)
261              info = restore2_rpc.getaddressinfo(addr)
262              assert_equal(info['ismine'], True)
263  
264              # Check again that 3 keys were derived.
265              # Empty keypool and get an address that is beyond the initial keypool
266              origin_rpc.getnewaddress()
267              origin_rpc.getnewaddress()
268              last_addr = origin_rpc.getnewaddress()
269              addr = origin_rpc.getnewaddress()
270  
271              # Check that the restored seed has last_addr but does not have addr
272              info = restore_rpc.getaddressinfo(last_addr)
273              assert_equal(info['ismine'], True)
274              info = restore_rpc.getaddressinfo(addr)
275              assert_equal(info['ismine'], False)
276              info = restore2_rpc.getaddressinfo(last_addr)
277              assert_equal(info['ismine'], True)
278              info = restore2_rpc.getaddressinfo(addr)
279              assert_equal(info['ismine'], False)
280  
281  
282  if __name__ == '__main__':
283      WalletHDTest().main()