/ test / functional / wallet_migration.py
wallet_migration.py
   1  #!/usr/bin/env python3
   2  # Copyright (c) 2020-present The Bitcoin Core developers
   3  # Distributed under the MIT software license, see the accompanying
   4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
   5  """Test Migrating a wallet from legacy to descriptor."""
   6  import os.path
   7  import random
   8  import shutil
   9  import struct
  10  import time
  11  
  12  from test_framework.address import (
  13      key_to_p2pkh,
  14      key_to_p2wpkh,
  15      script_to_p2sh,
  16      script_to_p2wsh,
  17  )
  18  from test_framework.descriptors import descsum_create
  19  from test_framework.key import ECPubKey
  20  from test_framework.test_framework import BitcoinTestFramework
  21  from test_framework.messages import COIN, CTransaction, CTxOut, ser_string
  22  from test_framework.script import hash160
  23  from test_framework.script_util import key_to_p2pkh_script, key_to_p2pk_script, script_to_p2sh_script, script_to_p2wsh_script
  24  from test_framework.util import (
  25      assert_equal,
  26      assert_greater_than,
  27      assert_raises_rpc_error,
  28      find_vout_for_address,
  29      sha256sum_file,
  30  )
  31  from test_framework.wallet_util import (
  32      get_generate_key,
  33      generate_keypair,
  34  )
  35  
  36  BTREE_MAGIC = 0x053162
  37  
  38  
  39  class WalletMigrationTest(BitcoinTestFramework):
  40      def set_test_params(self):
  41          self.setup_clean_chain = True
  42          self.num_nodes = 2
  43          self.supports_cli = False
  44          self.extra_args = [[], ["-deprecatedrpc=create_bdb"]]
  45  
  46      def skip_test_if_missing_module(self):
  47          self.skip_if_no_wallet()
  48          self.skip_if_no_previous_releases()
  49  
  50      def setup_nodes(self):
  51          self.add_nodes(
  52              self.num_nodes,
  53              extra_args=self.extra_args,
  54              versions=[
  55                  None,
  56                  280200,
  57              ],
  58          )
  59          self.start_nodes()
  60          self.init_wallet(node=0)
  61  
  62      def assert_is_sqlite(self, wallet_name):
  63          wallet_file_path = self.master_node.wallets_path / wallet_name / self.wallet_data_filename
  64          with open(wallet_file_path, 'rb') as f:
  65              file_magic = f.read(16)
  66              assert_equal(file_magic, b'SQLite format 3\x00')
  67          assert_equal(self.master_node.get_wallet_rpc(wallet_name).getwalletinfo()["format"], "sqlite")
  68  
  69      def assert_is_bdb(self, wallet_name):
  70          with open(self.master_node.wallets_path / wallet_name / self.wallet_data_filename, "rb") as f:
  71              data = f.read(16)
  72              _, _, magic = struct.unpack("QII", data)
  73              assert_equal(magic, BTREE_MAGIC)
  74  
  75      def create_legacy_wallet(self, wallet_name, **kwargs):
  76          self.old_node.createwallet(wallet_name=wallet_name, descriptors=False, **kwargs)
  77          wallet = self.old_node.get_wallet_rpc(wallet_name)
  78          info = wallet.getwalletinfo()
  79          assert_equal(info["descriptors"], False)
  80          assert_equal(info["format"], "bdb")
  81          return wallet
  82  
  83      def assert_addr_info_equal(self, addr_info, addr_info_old):
  84          assert_equal(addr_info["address"], addr_info_old["address"])
  85          assert_equal(addr_info["scriptPubKey"], addr_info_old["scriptPubKey"])
  86          assert_equal(addr_info["ismine"], addr_info_old["ismine"])
  87          assert_equal(addr_info["hdkeypath"], addr_info_old["hdkeypath"].replace("'","h"))
  88          assert_equal(addr_info["solvable"], addr_info_old["solvable"])
  89          assert_equal(addr_info["ischange"], addr_info_old["ischange"])
  90          assert_equal(addr_info["hdmasterfingerprint"], addr_info_old["hdmasterfingerprint"])
  91  
  92      def assert_list_txs_equal(self, received_list_txs, expected_list_txs):
  93          for d in received_list_txs:
  94              if "parent_descs" in d:
  95                  del d["parent_descs"]
  96          for d in expected_list_txs:
  97              if "parent_descs" in d:
  98                  del d["parent_descs"]
  99          assert_equal(received_list_txs, expected_list_txs)
 100  
 101      def check_address(self, wallet, addr, is_mine, is_change, label):
 102          addr_info = wallet.getaddressinfo(addr)
 103          assert_equal(addr_info['ismine'], is_mine)
 104          assert_equal(addr_info['ischange'], is_change)
 105          if label is not None:
 106              assert_equal(addr_info['labels'], [label]),
 107          else:
 108              assert_equal(addr_info['labels'], []),
 109  
 110      def migrate_and_get_rpc(self, wallet_name, **kwargs):
 111          # Since we may rescan on loading of a wallet, make sure that the best block
 112          # is written before beginning migration
 113          # Reload to force write that record
 114          self.old_node.unloadwallet(wallet_name)
 115          self.old_node.loadwallet(wallet_name)
 116          assert_equal(self.old_node.get_wallet_rpc(wallet_name).getwalletinfo()["descriptors"], False)
 117          # Now unload so we can copy it to the master node for the migration test
 118          self.old_node.unloadwallet(wallet_name)
 119          if wallet_name == "":
 120              shutil.copyfile(self.old_node.wallets_path / "wallet.dat", self.master_node.wallets_path / "wallet.dat")
 121          else:
 122              src = os.path.abspath(self.old_node.wallets_path / wallet_name)
 123              dst = os.path.abspath(self.master_node.wallets_path / wallet_name)
 124              if src != dst :
 125                  shutil.copytree(self.old_node.wallets_path / wallet_name, self.master_node.wallets_path / wallet_name, dirs_exist_ok=True)
 126          # Check that the wallet shows up in listwalletdir with a warning about migration
 127          wallets = self.master_node.listwalletdir()
 128          for w in wallets["wallets"]:
 129              if w["name"] == wallet_name:
 130                  assert_equal(w["warnings"], ["This wallet is a legacy wallet and will need to be migrated with migratewallet before it can be loaded"])
 131  
 132          # migratewallet uses current time in naming the backup file, set a mock time
 133          # to check that this works correctly.
 134          mocked_time = int(time.time())
 135          self.master_node.setmocktime(mocked_time)
 136          # Migrate, checking that rescan does not occur
 137          with self.master_node.assert_debug_log(expected_msgs=[], unexpected_msgs=["Rescanning"]):
 138              migrate_info = self.master_node.migratewallet(wallet_name=wallet_name, **kwargs)
 139          self.master_node.setmocktime(0)
 140          # Update wallet name in case the initial wallet was completely migrated to a watch-only wallet
 141          # (in which case the wallet name would be suffixed by the 'watchonly' term)
 142          migrated_wallet_name = migrate_info['wallet_name']
 143          wallet = self.master_node.get_wallet_rpc(migrated_wallet_name)
 144          wallet_info = wallet.getwalletinfo()
 145          assert_equal(wallet_info["descriptors"], True)
 146          self.assert_is_sqlite(migrated_wallet_name)
 147          # Always verify the backup path exist after migration
 148          assert os.path.exists(migrate_info['backup_path'])
 149          if wallet_name == "":
 150              backup_prefix = "default_wallet"
 151          else:
 152              backup_prefix = os.path.basename(os.path.realpath(self.old_node.wallets_path / wallet_name))
 153  
 154          backup_filename = f"{backup_prefix}_{mocked_time}.legacy.bak"
 155          expected_backup_path = self.master_node.wallets_path / backup_filename
 156          assert_equal(str(expected_backup_path), migrate_info['backup_path'])
 157          assert {"name": backup_filename} not in self.master_node.listwalletdir()["wallets"]
 158  
 159          # Open the wallet with sqlite and verify that the wallet has the last hardened cache flag
 160          # set and the last hardened cache entries
 161          def check_last_hardened(conn):
 162              flags_rec = conn.execute(f"SELECT value FROM main WHERE key = x'{ser_string(b'flags').hex()}'").fetchone()
 163              flags = int.from_bytes(flags_rec[0], byteorder="little")
 164  
 165              # All wallets should have the upgrade flag set
 166              assert_equal(bool(flags & (1 << 2)), True)
 167  
 168              # Fetch all records with the walletdescriptorlhcache prefix
 169              # if the wallet has private keys and is not blank
 170              if wallet_info["private_keys_enabled"] and not wallet_info["blank"]:
 171                  lh_cache_recs = conn.execute(f"SELECT value FROM main where key >= x'{ser_string(b'walletdescriptorlhcache').hex()}' AND key < x'{ser_string(b'walletdescriptorlhcachf').hex()}'").fetchall()
 172                  assert_greater_than(len(lh_cache_recs), 0)
 173  
 174          inspect_path = os.path.join(self.options.tmpdir, os.path.basename(f"{migrated_wallet_name}_inspect.dat"))
 175          wallet.backupwallet(inspect_path)
 176          self.inspect_sqlite_db(inspect_path, check_last_hardened)
 177  
 178          return migrate_info, wallet
 179  
 180      def test_basic(self):
 181          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 182  
 183          self.log.info("Test migration of a basic keys only wallet without balance")
 184          basic0 = self.create_legacy_wallet("basic0")
 185  
 186          addr = basic0.getnewaddress()
 187          change = basic0.getrawchangeaddress()
 188  
 189          old_addr_info = basic0.getaddressinfo(addr)
 190          old_change_addr_info = basic0.getaddressinfo(change)
 191          assert_equal(old_addr_info["ismine"], True)
 192          assert_equal(old_addr_info["hdkeypath"], "m/0'/0'/0'")
 193          assert_equal(old_change_addr_info["ismine"], True)
 194          assert_equal(old_change_addr_info["hdkeypath"], "m/0'/1'/0'")
 195  
 196          # Note: migration could take a while.
 197          _, basic0 = self.migrate_and_get_rpc("basic0")
 198  
 199          # The wallet should create the following descriptors:
 200          # * BIP32 descriptors in the form of "0h/0h/*" and "0h/1h/*" (2 descriptors)
 201          # * BIP44 descriptors in the form of "44h/1h/0h/0/*" and "44h/1h/0h/1/*" (2 descriptors)
 202          # * BIP49 descriptors, P2SH(P2WPKH), in the form of "86h/1h/0h/0/*" and "86h/1h/0h/1/*" (2 descriptors)
 203          # * BIP84 descriptors, P2WPKH, in the form of "84h/1h/0h/1/*" and "84h/1h/0h/1/*" (2 descriptors)
 204          # * BIP86 descriptors, P2TR, in the form of "86h/1h/0h/0/*" and "86h/1h/0h/1/*" (2 descriptors)
 205          # * A combo(PK) descriptor for the wallet master key.
 206          # So, should have a total of 11 descriptors on it.
 207          assert_equal(len(basic0.listdescriptors()["descriptors"]), 11)
 208  
 209          # Compare addresses info
 210          addr_info = basic0.getaddressinfo(addr)
 211          change_addr_info = basic0.getaddressinfo(change)
 212          self.assert_addr_info_equal(addr_info, old_addr_info)
 213          self.assert_addr_info_equal(change_addr_info, old_change_addr_info)
 214  
 215          addr_info = basic0.getaddressinfo(basic0.getnewaddress("", "bech32"))
 216          assert_equal(addr_info["hdkeypath"], "m/84h/1h/0h/0/0")
 217  
 218          self.log.info("Test migration of a basic keys only wallet with a balance")
 219          basic1 = self.create_legacy_wallet("basic1")
 220  
 221          for _ in range(0, 10):
 222              default.sendtoaddress(basic1.getnewaddress(), 1)
 223  
 224          self.generate(self.master_node, 1)
 225  
 226          for _ in range(0, 5):
 227              basic1.sendtoaddress(default.getnewaddress(), 0.5)
 228  
 229          self.generate(self.master_node, 1)
 230          bal = basic1.getbalance()
 231          txs = basic1.listtransactions()
 232          addr_gps = basic1.listaddressgroupings()
 233  
 234          basic1_migrate, basic1 = self.migrate_and_get_rpc("basic1")
 235          assert_equal(basic1.getbalance(), bal)
 236          self.assert_list_txs_equal(basic1.listtransactions(), txs)
 237  
 238          self.log.info("Test backup file can be successfully restored")
 239          self.old_node.restorewallet("basic1_restored", basic1_migrate['backup_path'])
 240          basic1_restored = self.old_node.get_wallet_rpc("basic1_restored")
 241          basic1_restored_wi = basic1_restored.getwalletinfo()
 242          assert_equal(basic1_restored_wi['balance'], bal)
 243          assert_equal(basic1_restored.listaddressgroupings(), addr_gps)
 244          self.assert_list_txs_equal(basic1_restored.listtransactions(), txs)
 245  
 246          # restart master node and verify that everything is still there
 247          self.restart_node(0)
 248          self.connect_nodes(0, 1)
 249          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 250          self.master_node.loadwallet("basic1")
 251          basic1 = self.master_node.get_wallet_rpc("basic1")
 252          assert_equal(basic1.getbalance(), bal)
 253          self.assert_list_txs_equal(basic1.listtransactions(), txs)
 254  
 255          self.log.info("Test migration of a wallet with balance received on the seed")
 256          basic2 = self.create_legacy_wallet("basic2")
 257          basic2_seed = get_generate_key()
 258          basic2.sethdseed(True, basic2_seed.privkey)
 259          assert_equal(basic2.getbalance(), 0)
 260  
 261          # Receive coins on different output types for the same seed
 262          basic2_balance = 0
 263          for addr in [basic2_seed.p2pkh_addr, basic2_seed.p2wpkh_addr, basic2_seed.p2sh_p2wpkh_addr]:
 264              send_value = random.randint(1, 4)
 265              default.sendtoaddress(addr, send_value)
 266              basic2_balance += send_value
 267              self.generate(self.master_node, 1)
 268              assert_equal(basic2.getbalance(), basic2_balance)
 269          basic2_txs = basic2.listtransactions()
 270  
 271          # Now migrate and test that we still have the same balance/transactions
 272          _, basic2 = self.migrate_and_get_rpc("basic2")
 273          assert_equal(basic2.getbalance(), basic2_balance)
 274          self.assert_list_txs_equal(basic2.listtransactions(), basic2_txs)
 275  
 276          # Now test migration on a descriptor wallet
 277          self.log.info("Test \"nothing to migrate\" when the user tries to migrate a loaded wallet with no legacy data")
 278          assert_raises_rpc_error(-4, "Error: This wallet is already a descriptor wallet", basic2.migratewallet)
 279  
 280          self.log.info("Test \"nothing to migrate\" when the user tries to migrate an unloaded wallet with no legacy data")
 281          basic2.unloadwallet()
 282          assert_raises_rpc_error(-4, "Error: This wallet is already a descriptor wallet", self.master_node.migratewallet, "basic2")
 283  
 284      def test_multisig(self):
 285          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 286  
 287          # Contrived case where all the multisig keys are in a single wallet
 288          self.log.info("Test migration of a wallet with all keys for a multisig")
 289          multisig0 = self.create_legacy_wallet("multisig0")
 290          addr1 = multisig0.getnewaddress()
 291          addr2 = multisig0.getnewaddress()
 292          addr3 = multisig0.getnewaddress()
 293  
 294          ms_info = multisig0.addmultisigaddress(2, [addr1, addr2, addr3])
 295  
 296          _, multisig0 = self.migrate_and_get_rpc("multisig0")
 297          ms_addr_info = multisig0.getaddressinfo(ms_info["address"])
 298          assert_equal(ms_addr_info["ismine"], True)
 299          assert_equal(ms_addr_info["desc"], ms_info["descriptor"])
 300          assert_equal("multisig0_watchonly" in self.master_node.listwallets(), False)
 301          assert_equal("multisig0_solvables" in self.master_node.listwallets(), False)
 302  
 303          pub1 = multisig0.getaddressinfo(addr1)["pubkey"]
 304          pub2 = multisig0.getaddressinfo(addr2)["pubkey"]
 305  
 306          # Some keys in multisig do not belong to this wallet
 307          self.log.info("Test migration of a wallet that has some keys in a multisig")
 308          multisig1 = self.create_legacy_wallet("multisig1")
 309          ms_info = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2])
 310          ms_info2 = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2])
 311  
 312          addr1 = ms_info["address"]
 313          addr2 = ms_info2["address"]
 314          txid = default.sendtoaddress(addr1, 10)
 315          multisig1.importaddress(addr1)
 316          assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False)
 317          assert_equal(multisig1.getaddressinfo(addr1)["iswatchonly"], True)
 318          assert_equal(multisig1.getaddressinfo(addr1)["solvable"], True)
 319          self.generate(self.master_node, 1)
 320          multisig1.gettransaction(txid)
 321          assert_equal(multisig1.getbalances()["watchonly"]["trusted"], 10)
 322          assert_equal(multisig1.getaddressinfo(addr2)["ismine"], False)
 323          assert_equal(multisig1.getaddressinfo(addr2)["iswatchonly"], False)
 324          assert_equal(multisig1.getaddressinfo(addr2)["solvable"], True)
 325  
 326          # Migrating multisig1 should see the multisig is no longer part of multisig1
 327          # A new wallet multisig1_watchonly is created which has the multisig address
 328          # Transaction to multisig is in multisig1_watchonly and not multisig1
 329          _, multisig1 = self.migrate_and_get_rpc("multisig1")
 330          assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False)
 331          assert_equal(multisig1.getaddressinfo(addr1)["solvable"], False)
 332          assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", multisig1.gettransaction, txid)
 333          assert_equal(multisig1.getbalance(), 0)
 334          assert_equal(multisig1.listtransactions(), [])
 335  
 336          assert_equal("multisig1_watchonly" in self.master_node.listwallets(), True)
 337          ms1_watchonly = self.master_node.get_wallet_rpc("multisig1_watchonly")
 338          ms1_wallet_info = ms1_watchonly.getwalletinfo()
 339          assert_equal(ms1_wallet_info['descriptors'], True)
 340          assert_equal(ms1_wallet_info['private_keys_enabled'], False)
 341          self.assert_is_sqlite("multisig1_watchonly")
 342          assert_equal(ms1_watchonly.getaddressinfo(addr1)["ismine"], True)
 343          assert_equal(ms1_watchonly.getaddressinfo(addr1)["solvable"], True)
 344          # Because addr2 was not being watched, it isn't in multisig1_watchonly but rather multisig1_solvables
 345          assert_equal(ms1_watchonly.getaddressinfo(addr2)["ismine"], False)
 346          assert_equal(ms1_watchonly.getaddressinfo(addr2)["solvable"], False)
 347          ms1_watchonly.gettransaction(txid)
 348          assert_equal(ms1_watchonly.getbalance(), 10)
 349  
 350          # Migrating multisig1 should see the second multisig is no longer part of multisig1
 351          # A new wallet multisig1_solvables is created which has the second address
 352          # This should have no transactions
 353          assert_equal("multisig1_solvables" in self.master_node.listwallets(), True)
 354          ms1_solvable = self.master_node.get_wallet_rpc("multisig1_solvables")
 355          ms1_wallet_info = ms1_solvable.getwalletinfo()
 356          assert_equal(ms1_wallet_info['descriptors'], True)
 357          assert_equal(ms1_wallet_info['private_keys_enabled'], False)
 358          self.assert_is_sqlite("multisig1_solvables")
 359          assert_equal(ms1_solvable.getaddressinfo(addr1)["ismine"], False)
 360          assert_equal(ms1_solvable.getaddressinfo(addr1)["solvable"], False)
 361          assert_equal(ms1_solvable.getaddressinfo(addr2)["ismine"], True)
 362          assert_equal(ms1_solvable.getaddressinfo(addr2)["solvable"], True)
 363          assert_equal(ms1_solvable.getbalance(), 0)
 364          assert_equal(ms1_solvable.listtransactions(), [])
 365  
 366  
 367      def test_other_watchonly(self):
 368          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 369  
 370          # Wallet with an imported address. Should be the same thing as the multisig test
 371          self.log.info("Test migration of a wallet with watchonly imports")
 372          imports0 = self.create_legacy_wallet("imports0")
 373  
 374          # External address label
 375          imports0.setlabel(default.getnewaddress(), "external")
 376  
 377          # Normal non-watchonly tx
 378          received_addr = imports0.getnewaddress()
 379          imports0.setlabel(received_addr, "Receiving")
 380          received_txid = default.sendtoaddress(received_addr, 10)
 381  
 382          # Watchonly tx
 383          import_addr = default.getnewaddress()
 384          imports0.importaddress(import_addr)
 385          imports0.setlabel(import_addr, "imported")
 386          received_watchonly_txid = default.sendtoaddress(import_addr, 10)
 387  
 388          # Received watchonly tx that is then spent
 389          import_sent_addr = default.getnewaddress()
 390          imports0.importaddress(import_sent_addr)
 391          received_sent_watchonly_utxo = self.create_outpoints(node=default, outputs=[{import_sent_addr: 10}])[0]
 392  
 393          send = default.sendall(recipients=[default.getnewaddress()], inputs=[received_sent_watchonly_utxo])
 394          sent_watchonly_txid = send["txid"]
 395  
 396          # Tx that has both a watchonly and spendable output
 397          watchonly_spendable_txid = default.send(outputs=[{received_addr: 1}, {import_addr:1}])["txid"]
 398  
 399          self.generate(self.master_node, 2)
 400          received_watchonly_tx_info = imports0.gettransaction(received_watchonly_txid, True)
 401          received_sent_watchonly_tx_info = imports0.gettransaction(received_sent_watchonly_utxo["txid"], True)
 402  
 403          balances = imports0.getbalances()
 404          spendable_bal = balances["mine"]["trusted"]
 405          watchonly_bal = balances["watchonly"]["trusted"]
 406          assert_equal(len(imports0.listtransactions(include_watchonly=True)), 6)
 407  
 408          # Mock time forward a bit so we can check that tx metadata is preserved
 409          self.master_node.setmocktime(int(time.time()) + 100)
 410  
 411          # Migrate
 412          _, imports0 = self.migrate_and_get_rpc("imports0")
 413          assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_watchonly_txid)
 414          assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_sent_watchonly_utxo['txid'])
 415          assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, sent_watchonly_txid)
 416          assert_equal(len(imports0.listtransactions()), 2)
 417          imports0.gettransaction(received_txid)
 418          imports0.gettransaction(watchonly_spendable_txid)
 419          assert_equal(imports0.getbalance(), spendable_bal)
 420  
 421          assert_equal("imports0_watchonly" in self.master_node.listwallets(), True)
 422          watchonly = self.master_node.get_wallet_rpc("imports0_watchonly")
 423          watchonly_info = watchonly.getwalletinfo()
 424          assert_equal(watchonly_info["descriptors"], True)
 425          self.assert_is_sqlite("imports0_watchonly")
 426          assert_equal(watchonly_info["private_keys_enabled"], False)
 427          received_migrated_watchonly_tx_info = watchonly.gettransaction(received_watchonly_txid)
 428          assert_equal(received_watchonly_tx_info["time"], received_migrated_watchonly_tx_info["time"])
 429          assert_equal(received_watchonly_tx_info["timereceived"], received_migrated_watchonly_tx_info["timereceived"])
 430          received_sent_migrated_watchonly_tx_info = watchonly.gettransaction(received_sent_watchonly_utxo["txid"])
 431          assert_equal(received_sent_watchonly_tx_info["time"], received_sent_migrated_watchonly_tx_info["time"])
 432          assert_equal(received_sent_watchonly_tx_info["timereceived"], received_sent_migrated_watchonly_tx_info["timereceived"])
 433          watchonly.gettransaction(sent_watchonly_txid)
 434          watchonly.gettransaction(watchonly_spendable_txid)
 435          assert_equal(watchonly.getbalance(), watchonly_bal)
 436          assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", watchonly.gettransaction, received_txid)
 437          assert_equal(len(watchonly.listtransactions(include_watchonly=True)), 4)
 438  
 439          # Check that labels were migrated and persisted to watchonly wallet
 440          self.master_node.unloadwallet("imports0_watchonly")
 441          self.master_node.loadwallet("imports0_watchonly")
 442          labels = watchonly.listlabels()
 443          assert "external" in labels
 444          assert "imported" in labels
 445  
 446      def test_no_privkeys(self):
 447          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 448  
 449          # Migrating an actual watchonly wallet should not create a new watchonly wallet
 450          self.log.info("Test migration of a pure watchonly wallet")
 451          watchonly0 = self.create_legacy_wallet("watchonly0", disable_private_keys=True)
 452  
 453          addr = default.getnewaddress()
 454          desc = default.getaddressinfo(addr)["desc"]
 455          res = watchonly0.importmulti([
 456              {
 457                  "desc": desc,
 458                  "watchonly": True,
 459                  "timestamp": "now",
 460              }])
 461          assert_equal(res[0]['success'], True)
 462          default.sendtoaddress(addr, 10)
 463          self.generate(self.master_node, 1)
 464  
 465          _, watchonly0 = self.migrate_and_get_rpc("watchonly0")
 466          assert_equal("watchonly0_watchonly" in self.master_node.listwallets(), False)
 467          info = watchonly0.getwalletinfo()
 468          assert_equal(info["descriptors"], True)
 469          assert_equal(info["private_keys_enabled"], False)
 470          self.assert_is_sqlite("watchonly0")
 471  
 472          # Migrating a wallet with pubkeys added to the keypool
 473          self.log.info("Test migration of a pure watchonly wallet with pubkeys in keypool")
 474          watchonly1 = self.create_legacy_wallet("watchonly1", disable_private_keys=True)
 475  
 476          addr1 = default.getnewaddress(address_type="bech32")
 477          addr2 = default.getnewaddress(address_type="bech32")
 478          desc1 = default.getaddressinfo(addr1)["desc"]
 479          desc2 = default.getaddressinfo(addr2)["desc"]
 480          res = watchonly1.importmulti([
 481              {
 482                  "desc": desc1,
 483                  "keypool": True,
 484                  "timestamp": "now",
 485              },
 486              {
 487                  "desc": desc2,
 488                  "keypool": True,
 489                  "timestamp": "now",
 490              }
 491          ])
 492          assert_equal(res[0]["success"], True)
 493          assert_equal(res[1]["success"], True)
 494          # Before migrating, we can fetch addr1 from the keypool
 495          assert_equal(watchonly1.getnewaddress(address_type="bech32"), addr1)
 496  
 497          _, watchonly1 = self.migrate_and_get_rpc("watchonly1")
 498          info = watchonly1.getwalletinfo()
 499          assert_equal(info["descriptors"], True)
 500          assert_equal(info["private_keys_enabled"], False)
 501          self.assert_is_sqlite("watchonly1")
 502          # After migrating, the "keypool" is empty
 503          assert_raises_rpc_error(-4, "Error: This wallet has no available keys", watchonly1.getnewaddress)
 504  
 505          self.log.info("Test migration of a watch-only empty wallet")
 506          for idx, is_blank in enumerate([True, False], start=1):
 507              wallet_name = f"watchonly_empty{idx}"
 508              self.create_legacy_wallet(wallet_name, disable_private_keys=True, blank=is_blank)
 509              _, watchonly_empty = self.migrate_and_get_rpc(wallet_name)
 510              info = watchonly_empty.getwalletinfo()
 511              assert_equal(info["private_keys_enabled"], False)
 512              assert_equal(info["blank"], is_blank)
 513  
 514      def test_pk_coinbases(self):
 515          self.log.info("Test migration of a wallet using old pk() coinbases")
 516          wallet = self.create_legacy_wallet("pkcb")
 517  
 518          addr = wallet.getnewaddress()
 519          addr_info = wallet.getaddressinfo(addr)
 520          desc = descsum_create("pk(" + addr_info["pubkey"] + ")")
 521  
 522          self.generatetodescriptor(self.master_node, 1, desc)
 523  
 524          bals = wallet.getbalances()
 525  
 526          _, wallet = self.migrate_and_get_rpc("pkcb")
 527  
 528          assert_equal(bals, wallet.getbalances())
 529  
 530      def test_encrypted(self):
 531          self.log.info("Test migration of an encrypted wallet")
 532          wallet = self.create_legacy_wallet("encrypted")
 533          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 534  
 535          wallet.encryptwallet("pass")
 536          addr = wallet.getnewaddress()
 537          txid = default.sendtoaddress(addr, 1)
 538          self.generate(self.master_node, 1)
 539          bals = wallet.getbalances()
 540  
 541          # Use self.migrate_and_get_rpc to test this error to get everything copied over to the master node
 542          assert_raises_rpc_error(-4, "Error: Wallet decryption failed, the wallet passphrase was not provided or was incorrect", self.migrate_and_get_rpc, "encrypted")
 543  
 544          # Use the RPC directly on the master node for the rest of these checks
 545          self.master_node.bumpmocktime(1) # Prevents filename duplication on wallet backups which is a problem on Windows
 546          assert_raises_rpc_error(-4, "Error: Wallet decryption failed, the wallet passphrase was not provided or was incorrect", self.master_node.migratewallet, "encrypted", "badpass")
 547  
 548          self.master_node.bumpmocktime(1) # Prevents filename duplication on wallet backups which is a problem on Windows
 549          assert_raises_rpc_error(-4, "The passphrase contains a null character", self.master_node.migratewallet, "encrypted", "pass\0with\0null")
 550  
 551          # Verify we can properly migrate the encrypted wallet
 552          self.master_node.bumpmocktime(1) # Prevents filename duplication on wallet backups which is a problem on Windows
 553          self.master_node.migratewallet("encrypted", passphrase="pass")
 554          wallet = self.master_node.get_wallet_rpc("encrypted")
 555  
 556          info = wallet.getwalletinfo()
 557          assert_equal(info["descriptors"], True)
 558          assert_equal(info["format"], "sqlite")
 559          assert_equal(info["unlocked_until"], 0)
 560          wallet.gettransaction(txid)
 561  
 562          assert_equal(bals, wallet.getbalances())
 563  
 564      def test_nonexistent(self):
 565          self.log.info("Check migratewallet errors for nonexistent wallets")
 566          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 567          assert_raises_rpc_error(-8, "The RPC endpoint wallet and the wallet name parameter specify different wallets", default.migratewallet, "someotherwallet")
 568          assert_raises_rpc_error(-8, "Either the RPC endpoint wallet or the wallet name parameter must be provided", self.master_node.migratewallet)
 569          assert_raises_rpc_error(-4, "Error: Wallet does not exist", self.master_node.migratewallet, "notawallet")
 570  
 571      def test_unloaded_by_path(self):
 572          self.log.info("Test migration of a wallet that isn't loaded, specified by path")
 573          wallet = self.create_legacy_wallet("notloaded2")
 574          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 575  
 576          addr = wallet.getnewaddress()
 577          txid = default.sendtoaddress(addr, 1)
 578          self.generate(self.master_node, 1)
 579          bals = wallet.getbalances()
 580  
 581          wallet.unloadwallet()
 582  
 583          wallet_file_path = self.old_node.wallets_path / "notloaded2"
 584          self.master_node.migratewallet(wallet_file_path)
 585  
 586          # Because we gave the name by full path, the loaded wallet's name is that path too.
 587          wallet = self.master_node.get_wallet_rpc(str(wallet_file_path))
 588  
 589          info = wallet.getwalletinfo()
 590          assert_equal(info["descriptors"], True)
 591          assert_equal(info["format"], "sqlite")
 592          wallet.gettransaction(txid)
 593  
 594          assert_equal(bals, wallet.getbalances())
 595  
 596      def test_wallet_with_relative_path(self):
 597          self.log.info("Test migration of a wallet that isn't loaded, specified by a relative path")
 598  
 599          # Get the nearest common path of both nodes' wallet paths.
 600          common_parent = os.path.commonpath([self.master_node.wallets_path, self.old_node.wallets_path])
 601  
 602          # This test assumes that the relative path from each wallet directory to the common path is identical.
 603          assert_equal(os.path.relpath(common_parent, start=self.master_node.wallets_path), os.path.relpath(common_parent, start=self.old_node.wallets_path))
 604  
 605          wallet_name = "relative"
 606          absolute_path = os.path.abspath(os.path.join(common_parent, wallet_name))
 607          relative_name = os.path.relpath(absolute_path, start=self.master_node.wallets_path)
 608  
 609          wallet = self.create_legacy_wallet(relative_name)
 610          # listwalletdirs only returns wallets in the wallet directory
 611          assert {"name": relative_name} not in wallet.listwalletdir()["wallets"]
 612          assert relative_name in wallet.listwallets()
 613  
 614          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 615          addr = wallet.getnewaddress()
 616          txid = default.sendtoaddress(addr, 1)
 617          self.generate(self.master_node, 1)
 618          bals = wallet.getbalances()
 619  
 620          migrate_res, wallet = self.migrate_and_get_rpc(relative_name)
 621  
 622          # Check that the wallet was migrated, knows the right txid, and has the right balance.
 623          assert wallet.gettransaction(txid)
 624          assert_equal(bals, wallet.getbalances())
 625  
 626          # The migrated wallet should not be in the wallet dir, but should be in the list of wallets.
 627          info = wallet.getwalletinfo()
 628  
 629          walletdirlist = wallet.listwalletdir()
 630          assert {"name": info["walletname"]} not in walletdirlist["wallets"]
 631  
 632          walletlist = wallet.listwallets()
 633          assert info["walletname"] in walletlist
 634  
 635          # Check that old node can restore from the backup.
 636          self.old_node.restorewallet("relative_restored", migrate_res['backup_path'])
 637          wallet = self.old_node.get_wallet_rpc("relative_restored")
 638          assert wallet.gettransaction(txid)
 639          assert_equal(bals, wallet.getbalances())
 640  
 641          info = wallet.getwalletinfo()
 642          assert_equal(info["descriptors"], False)
 643          assert_equal(info["format"], "bdb")
 644  
 645      def test_wallet_with_path(self, wallet_path):
 646          self.log.info("Test migrating a wallet with the following path/name: %s", wallet_path)
 647          # the wallet data is actually inside of path/that/ends/
 648          wallet = self.create_legacy_wallet(wallet_path)
 649          default = self.master_node.get_wallet_rpc(self.default_wallet_name)
 650  
 651          addr = wallet.getnewaddress()
 652          txid = default.sendtoaddress(addr, 1)
 653          self.generate(self.master_node, 1)
 654          bals = wallet.getbalances()
 655  
 656          _, wallet = self.migrate_and_get_rpc(wallet_path)
 657  
 658          assert wallet.gettransaction(txid)
 659  
 660          assert_equal(bals, wallet.getbalances())
 661  
 662      def test_default_wallet(self):
 663          self.log.info("Test migration of the wallet named as the empty string")
 664          wallet = self.create_legacy_wallet("")
 665  
 666          res, wallet = self.migrate_and_get_rpc("")
 667          info = wallet.getwalletinfo()
 668          assert_equal(info["descriptors"], True)
 669          assert_equal(info["format"], "sqlite")
 670  
 671          walletdir_list = wallet.listwalletdir()
 672          assert {"name": info["walletname"]} in [{"name": w["name"]} for w in walletdir_list["wallets"]]
 673  
 674          # Make sure the backup uses a non-empty filename
 675          # migrate_and_get_rpc already checks for backup file existence
 676          assert os.path.basename(res["backup_path"]).startswith("default_wallet")
 677  
 678      def test_direct_file(self):
 679          self.log.info("Test migration of a wallet that is not in a wallet directory")
 680          wallet = self.create_legacy_wallet("plainfile")
 681          wallet.unloadwallet()
 682  
 683          shutil.copyfile(
 684              self.old_node.wallets_path / "plainfile" / "wallet.dat" ,
 685              self.master_node.wallets_path / "plainfile"
 686          )
 687          assert (self.master_node.wallets_path / "plainfile").is_file()
 688  
 689          mocked_time = int(time.time())
 690          self.master_node.setmocktime(mocked_time)
 691          migrate_res = self.master_node.migratewallet("plainfile")
 692          assert_equal(f"plainfile_{mocked_time}.legacy.bak", os.path.basename(migrate_res["backup_path"]))
 693          wallet = self.master_node.get_wallet_rpc("plainfile")
 694          info = wallet.getwalletinfo()
 695          assert_equal(info["descriptors"], True)
 696          assert_equal(info["format"], "sqlite")
 697  
 698          assert (self.master_node.wallets_path / "plainfile").is_dir()
 699          assert (self.master_node.wallets_path / "plainfile" / "wallet.dat").is_file()
 700  
 701      def test_addressbook(self):
 702          df_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
 703  
 704          self.log.info("Test migration of address book data")
 705          wallet = self.create_legacy_wallet("legacy_addrbook")
 706          df_wallet.sendtoaddress(wallet.getnewaddress(), 3)
 707  
 708          # Import watch-only script to create a watch-only wallet after migration
 709          watch_addr = df_wallet.getnewaddress()
 710          wallet.importaddress(watch_addr)
 711          df_wallet.sendtoaddress(watch_addr, 2)
 712  
 713          # Import solvable script
 714          multi_addr1 = wallet.getnewaddress()
 715          multi_addr2 = wallet.getnewaddress()
 716          multi_addr3 = df_wallet.getnewaddress()
 717          wallet.importpubkey(df_wallet.getaddressinfo(multi_addr3)["pubkey"])
 718          ms_addr_info = wallet.addmultisigaddress(2, [multi_addr1, multi_addr2, multi_addr3])
 719  
 720          self.generate(self.master_node, 1)
 721  
 722          # Test vectors
 723          addr_external = {
 724              "addr": df_wallet.getnewaddress(),
 725              "is_mine": False,
 726              "is_change": False,
 727              "label": ""
 728          }
 729          addr_external_with_label = {
 730              "addr": df_wallet.getnewaddress(),
 731              "is_mine": False,
 732              "is_change": False,
 733              "label": "external"
 734          }
 735          addr_internal = {
 736              "addr": wallet.getnewaddress(),
 737              "is_mine": True,
 738              "is_change": False,
 739              "label": ""
 740          }
 741          addr_internal_with_label = {
 742              "addr": wallet.getnewaddress(),
 743              "is_mine": True,
 744              "is_change": False,
 745              "label": "internal"
 746          }
 747          change_address = {
 748              "addr": wallet.getrawchangeaddress(),
 749              "is_mine": True,
 750              "is_change": True,
 751              "label": None
 752          }
 753          watch_only_addr = {
 754              "addr": watch_addr,
 755              "is_mine": False,
 756              "is_change": False,
 757              "label": "imported"
 758          }
 759          ms_addr = {
 760              "addr": ms_addr_info['address'],
 761              "is_mine": False,
 762              "is_change": False,
 763              "label": "multisig"
 764          }
 765  
 766          # To store the change address in the addressbook need to send coins to it
 767          wallet.send(outputs=[{wallet.getnewaddress(): 2}], options={"change_address": change_address['addr']})
 768          self.generate(self.master_node, 1)
 769  
 770          # Util wrapper func for 'addr_info'
 771          def check(info, node):
 772              self.check_address(node, info['addr'], info['is_mine'], info['is_change'], info["label"])
 773  
 774          # Pre-migration: set label and perform initial checks
 775          for addr_info in [addr_external, addr_external_with_label, addr_internal, addr_internal_with_label, change_address, watch_only_addr, ms_addr]:
 776              if not addr_info['is_change']:
 777                  wallet.setlabel(addr_info['addr'], addr_info["label"])
 778              check(addr_info, wallet)
 779  
 780          # Migrate wallet
 781          info_migration, wallet = self.migrate_and_get_rpc("legacy_addrbook")
 782          wallet_wo = self.master_node.get_wallet_rpc(info_migration["watchonly_name"])
 783          wallet_solvables = self.master_node.get_wallet_rpc(info_migration["solvables_name"])
 784  
 785          #########################
 786          # Post migration checks #
 787          #########################
 788  
 789          # First check the main wallet
 790          for addr_info in [addr_external, addr_external_with_label, addr_internal, addr_internal_with_label, change_address, ms_addr]:
 791              check(addr_info, wallet)
 792  
 793          # Watch-only wallet will contain the watch-only entry (with 'is_mine=True') and all external addresses ('send')
 794          self.check_address(wallet_wo, watch_only_addr['addr'], is_mine=True, is_change=watch_only_addr['is_change'], label=watch_only_addr["label"])
 795          for addr_info in [addr_external, addr_external_with_label, ms_addr]:
 796              check(addr_info, wallet_wo)
 797  
 798          # Solvables wallet will contain the multisig entry (with 'is_mine=True') and all external addresses ('send')
 799          self.check_address(wallet_solvables, ms_addr['addr'], is_mine=True, is_change=ms_addr['is_change'], label=ms_addr["label"])
 800          for addr_info in [addr_external, addr_external_with_label]:
 801              check(addr_info, wallet_solvables)
 802  
 803          ########################################################################################
 804          # Now restart migrated wallets and verify that the addressbook entries are still there #
 805          ########################################################################################
 806  
 807          # First the main wallet
 808          self.master_node.unloadwallet("legacy_addrbook")
 809          self.master_node.loadwallet("legacy_addrbook")
 810          for addr_info in [addr_external, addr_external_with_label, addr_internal, addr_internal_with_label, change_address, ms_addr]:
 811              check(addr_info, wallet)
 812  
 813          # Watch-only wallet
 814          self.master_node.unloadwallet(info_migration["watchonly_name"])
 815          self.master_node.loadwallet(info_migration["watchonly_name"])
 816          self.check_address(wallet_wo, watch_only_addr['addr'], is_mine=True, is_change=watch_only_addr['is_change'], label=watch_only_addr["label"])
 817          for addr_info in [addr_external, addr_external_with_label, ms_addr]:
 818              check(addr_info, wallet_wo)
 819  
 820          # Solvables wallet
 821          self.master_node.unloadwallet(info_migration["solvables_name"])
 822          self.master_node.loadwallet(info_migration["solvables_name"])
 823          self.check_address(wallet_solvables, ms_addr['addr'], is_mine=True, is_change=ms_addr['is_change'], label=ms_addr["label"])
 824          for addr_info in [addr_external, addr_external_with_label]:
 825              check(addr_info, wallet_solvables)
 826  
 827      def test_migrate_raw_p2sh(self):
 828          self.log.info("Test migration of watch-only raw p2sh script")
 829          df_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
 830          wallet = self.create_legacy_wallet("raw_p2sh")
 831  
 832          def send_to_script(script, amount):
 833              tx = CTransaction()
 834              tx.vout.append(CTxOut(nValue=amount*COIN, scriptPubKey=script))
 835  
 836              hex_tx = df_wallet.fundrawtransaction(tx.serialize().hex())['hex']
 837              signed_tx = df_wallet.signrawtransactionwithwallet(hex_tx)
 838              df_wallet.sendrawtransaction(signed_tx['hex'])
 839              self.generate(self.master_node, 1)
 840  
 841          # Craft sh(pkh(key)) script and send coins to it
 842          pubkey = df_wallet.getaddressinfo(df_wallet.getnewaddress())["pubkey"]
 843          script_pkh = key_to_p2pkh_script(pubkey)
 844          script_sh_pkh = script_to_p2sh_script(script_pkh)
 845          send_to_script(script=script_sh_pkh, amount=2)
 846  
 847          # Import script and check balance
 848          wallet.importaddress(address=script_pkh.hex(), label="raw_spk", rescan=True, p2sh=True)
 849          assert_equal(wallet.getbalances()['watchonly']['trusted'], 2)
 850  
 851          # Craft wsh(pkh(key)) and send coins to it
 852          pubkey = df_wallet.getaddressinfo(df_wallet.getnewaddress())["pubkey"]
 853          script_wsh_pkh = script_to_p2wsh_script(key_to_p2pkh_script(pubkey))
 854          send_to_script(script=script_wsh_pkh, amount=3)
 855  
 856          # Import script and check balance
 857          wallet.importaddress(address=script_wsh_pkh.hex(), label="raw_spk2", rescan=True, p2sh=False)
 858          assert_equal(wallet.getbalances()['watchonly']['trusted'], 5)
 859  
 860          # Import sh(pkh()) script, by using importaddress(), with the p2sh flag enabled.
 861          # This will wrap the script under another sh level, which is invalid!, and store it inside the wallet.
 862          # The migration process must skip the invalid scripts and the addressbook records linked to them.
 863          # They are not being watched by the current wallet, nor should be watched by the migrated one.
 864          label_sh_pkh = "raw_sh_pkh"
 865          script_pkh = key_to_p2pkh_script(df_wallet.getaddressinfo(df_wallet.getnewaddress())["pubkey"])
 866          script_sh_pkh = script_to_p2sh_script(script_pkh)
 867          addy_script_sh_pkh = script_to_p2sh(script_pkh)  # valid script address
 868          addy_script_double_sh_pkh = script_to_p2sh(script_sh_pkh)  # invalid script address
 869  
 870          # Note: 'importaddress()' will add two scripts, a valid one sh(pkh()) and an invalid one 'sh(sh(pkh()))'.
 871          #       Both of them will be stored with the same addressbook label. And only the latter one should
 872          #       be discarded during migration. The first one must be migrated.
 873          wallet.importaddress(address=script_sh_pkh.hex(), label=label_sh_pkh, rescan=False, p2sh=True)
 874  
 875          # Migrate wallet and re-check balance
 876          info_migration, wallet = self.migrate_and_get_rpc("raw_p2sh")
 877          wallet_wo = self.master_node.get_wallet_rpc(info_migration["watchonly_name"])
 878  
 879          # Watch-only balance is under "mine".
 880          assert_equal(wallet_wo.getbalances()['mine']['trusted'], 5)
 881          # The watch-only scripts are no longer part of the main wallet
 882          assert_equal(wallet.getbalances()['mine']['trusted'], 0)
 883  
 884          # The invalid sh(sh(pk())) script label must not be part of the main wallet anymore
 885          assert label_sh_pkh not in wallet.listlabels()
 886          # But, the standard sh(pkh()) script should be part of the watch-only wallet.
 887          addrs_by_label = wallet_wo.getaddressesbylabel(label_sh_pkh)
 888          assert addy_script_sh_pkh in addrs_by_label
 889          assert addy_script_double_sh_pkh not in addrs_by_label
 890  
 891          # Also, the watch-only wallet should have the descriptor for the standard sh(pkh())
 892          desc = descsum_create(f"addr({addy_script_sh_pkh})")
 893          assert next(it['desc'] for it in wallet_wo.listdescriptors()['descriptors'] if it['desc'] == desc)
 894          # And doesn't have a descriptor for the invalid one
 895          desc_invalid = descsum_create(f"addr({addy_script_double_sh_pkh})")
 896          assert_equal(next((it['desc'] for it in wallet_wo.listdescriptors()['descriptors'] if it['desc'] == desc_invalid), None), None)
 897  
 898          # Just in case, also verify wallet restart
 899          self.master_node.unloadwallet(info_migration["watchonly_name"])
 900          self.master_node.loadwallet(info_migration["watchonly_name"])
 901          assert_equal(wallet_wo.getbalances()['mine']['trusted'], 5)
 902  
 903      def test_conflict_txs(self):
 904          self.log.info("Test migration when wallet contains conflicting transactions")
 905          def_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
 906  
 907          wallet = self.create_legacy_wallet("conflicts")
 908          def_wallet.sendtoaddress(wallet.getnewaddress(), 10)
 909          self.generate(self.master_node, 1)
 910  
 911          # parent tx
 912          parent_txid = wallet.sendtoaddress(wallet.getnewaddress(), 9)
 913          parent_txid_bytes = bytes.fromhex(parent_txid)[::-1]
 914          conflict_utxo = wallet.gettransaction(txid=parent_txid, verbose=True)["decoded"]["vin"][0]
 915  
 916          # The specific assertion in MarkConflicted being tested requires that the parent tx is already loaded
 917          # by the time the child tx is loaded. Since transactions end up being loaded in txid order due to how
 918          # sqlite stores things, we can just grind the child tx until it has a txid that is greater than the parent's.
 919          locktime = 500000000 # Use locktime as nonce, starting at unix timestamp minimum
 920          addr = wallet.getnewaddress()
 921          while True:
 922              child_send_res = wallet.send(outputs=[{addr: 8}], options={"add_to_wallet": False, "locktime": locktime})
 923              child_txid = child_send_res["txid"]
 924              child_txid_bytes = bytes.fromhex(child_txid)[::-1]
 925              if (child_txid_bytes > parent_txid_bytes):
 926                  wallet.sendrawtransaction(child_send_res["hex"])
 927                  break
 928              locktime += 1
 929  
 930          # conflict with parent
 931          conflict_unsigned = self.master_node.createrawtransaction(inputs=[conflict_utxo], outputs=[{wallet.getnewaddress(): 9.9999}])
 932          conflict_signed = wallet.signrawtransactionwithwallet(conflict_unsigned)["hex"]
 933          conflict_txid = self.master_node.sendrawtransaction(conflict_signed)
 934          self.generate(self.master_node, 1)
 935          assert_equal(wallet.gettransaction(txid=parent_txid)["confirmations"], -1)
 936          assert_equal(wallet.gettransaction(txid=child_txid)["confirmations"], -1)
 937          assert_equal(wallet.gettransaction(txid=conflict_txid)["confirmations"], 1)
 938  
 939          _, wallet = self.migrate_and_get_rpc("conflicts")
 940          assert_equal(wallet.gettransaction(txid=parent_txid)["confirmations"], -1)
 941          assert_equal(wallet.gettransaction(txid=child_txid)["confirmations"], -1)
 942          assert_equal(wallet.gettransaction(txid=conflict_txid)["confirmations"], 1)
 943  
 944          wallet.unloadwallet()
 945  
 946      def test_hybrid_pubkey(self):
 947          self.log.info("Test migration when wallet contains a hybrid pubkey")
 948  
 949          wallet = self.create_legacy_wallet("hybrid_keys")
 950  
 951          # Get the hybrid pubkey for one of the keys in the wallet
 952          normal_pubkey = wallet.getaddressinfo(wallet.getnewaddress())["pubkey"]
 953          first_byte = bytes.fromhex(normal_pubkey)[0] + 4 # Get the hybrid pubkey first byte
 954          parsed_pubkey = ECPubKey()
 955          parsed_pubkey.set(bytes.fromhex(normal_pubkey))
 956          parsed_pubkey.compressed = False
 957          hybrid_pubkey_bytes = bytearray(parsed_pubkey.get_bytes())
 958          hybrid_pubkey_bytes[0] = first_byte # Make it hybrid
 959          hybrid_pubkey = hybrid_pubkey_bytes.hex()
 960  
 961          # Import the hybrid pubkey
 962          wallet.importpubkey(hybrid_pubkey)
 963          p2pkh_addr = key_to_p2pkh(hybrid_pubkey)
 964          p2pkh_addr_info = wallet.getaddressinfo(p2pkh_addr)
 965          assert_equal(p2pkh_addr_info["iswatchonly"], True)
 966          assert_equal(p2pkh_addr_info["ismine"], False) # Things involving hybrid pubkeys are not spendable
 967  
 968          # Also import the p2wpkh for the pubkey to make sure we don't migrate it
 969          p2wpkh_addr = key_to_p2wpkh(hybrid_pubkey)
 970          wallet.importaddress(p2wpkh_addr)
 971  
 972          migrate_info, wallet = self.migrate_and_get_rpc("hybrid_keys")
 973  
 974          # Both addresses should only appear in the watchonly wallet
 975          p2pkh_addr_info = wallet.getaddressinfo(p2pkh_addr)
 976          assert_equal(p2pkh_addr_info["ismine"], False)
 977          p2wpkh_addr_info = wallet.getaddressinfo(p2wpkh_addr)
 978          assert_equal(p2wpkh_addr_info["ismine"], False)
 979  
 980          watchonly_wallet = self.master_node.get_wallet_rpc(migrate_info["watchonly_name"])
 981          watchonly_p2pkh_addr_info = watchonly_wallet.getaddressinfo(p2pkh_addr)
 982          assert_equal(watchonly_p2pkh_addr_info["ismine"], True)
 983          watchonly_p2wpkh_addr_info = watchonly_wallet.getaddressinfo(p2wpkh_addr)
 984          assert_equal(watchonly_p2wpkh_addr_info["ismine"], True)
 985  
 986          # There should only be raw or addr descriptors
 987          for desc in watchonly_wallet.listdescriptors()["descriptors"]:
 988              if desc["desc"].startswith("raw(") or desc["desc"].startswith("addr("):
 989                  continue
 990              assert False, "Hybrid pubkey watchonly wallet has more than just raw() and addr()"
 991  
 992          wallet.unloadwallet()
 993  
 994      def test_failed_migration_cleanup(self):
 995          self.log.info("Test that a failed migration is cleaned up")
 996          wallet = self.create_legacy_wallet("failed")
 997  
 998          # Make a copy of the wallet with the solvables wallet name so that we are unable
 999          # to create the solvables wallet when migrating, thus failing to migrate
1000          wallet.unloadwallet()
1001          solvables_path = self.master_node.wallets_path / "failed_solvables"
1002          shutil.copytree(self.old_node.wallets_path / "failed", solvables_path)
1003          original_shasum = sha256sum_file(solvables_path / "wallet.dat")
1004  
1005          self.old_node.loadwallet("failed")
1006  
1007          # Add a multisig so that a solvables wallet is created
1008          wallet.addmultisigaddress(2, [wallet.getnewaddress(), get_generate_key().pubkey])
1009          wallet.importaddress(get_generate_key().p2pkh_addr)
1010  
1011          self.old_node.unloadwallet("failed")
1012          shutil.copytree(self.old_node.wallets_path / "failed", self.master_node.wallets_path / "failed")
1013          assert_raises_rpc_error(-4, "Failed to create database", self.master_node.migratewallet, "failed")
1014  
1015          assert all(wallet not in self.master_node.listwallets() for wallet in ["failed", "failed_watchonly", "failed_solvables"])
1016  
1017          assert not (self.master_node.wallets_path / "failed_watchonly").exists()
1018          # Since the file in failed_solvables is one that we put there, migration shouldn't touch it
1019          assert solvables_path.exists()
1020          new_shasum = sha256sum_file(solvables_path / "wallet.dat")
1021          assert_equal(original_shasum, new_shasum)
1022  
1023          # Check the wallet we tried to migrate is still BDB
1024          self.assert_is_bdb("failed")
1025  
1026      def test_failed_migration_cleanup_relative_path(self):
1027          self.log.info("Test that a failed migration with a relative path is cleaned up")
1028  
1029          # Get the nearest common path of both nodes' wallet paths.
1030          common_parent = os.path.commonpath([self.master_node.wallets_path, self.old_node.wallets_path])
1031  
1032          # This test assumes that the relative path from each wallet directory to the common path is identical.
1033          assert_equal(os.path.relpath(common_parent, start=self.master_node.wallets_path), os.path.relpath(common_parent, start=self.old_node.wallets_path))
1034  
1035          wallet_name = "relativefailure"
1036          absolute_path = os.path.abspath(os.path.join(common_parent, wallet_name))
1037          relative_name = os.path.relpath(absolute_path, start=self.master_node.wallets_path)
1038  
1039          wallet = self.create_legacy_wallet(relative_name)
1040  
1041          # Make a copy of the wallet with the solvables wallet name so that we are unable
1042          # to create the solvables wallet when migrating, thus failing to migrate
1043          wallet.unloadwallet()
1044          solvables_path = os.path.join(common_parent, f"{wallet_name}_solvables")
1045  
1046          shutil.copytree(self.old_node.wallets_path / relative_name, solvables_path)
1047          original_shasum = sha256sum_file(os.path.join(solvables_path, "wallet.dat"))
1048  
1049          self.old_node.loadwallet(relative_name)
1050  
1051          # Add a multisig so that a solvables wallet is created
1052          wallet.addmultisigaddress(2, [wallet.getnewaddress(), get_generate_key().pubkey])
1053          wallet.importaddress(get_generate_key().p2pkh_addr)
1054  
1055          self.old_node.unloadwallet(relative_name)
1056          assert_raises_rpc_error(-4, "Failed to create database", self.master_node.migratewallet, relative_name)
1057  
1058          assert all(wallet not in self.master_node.listwallets() for wallet in [f"{wallet_name}", f"{wallet_name}_watchonly", f"{wallet_name}_solvables"])
1059  
1060          assert not (self.master_node.wallets_path / f"{wallet_name}_watchonly").exists()
1061          # Since the file in failed_solvables is one that we put there, migration shouldn't touch it
1062          assert os.path.exists(solvables_path)
1063          new_shasum = sha256sum_file(os.path.join(solvables_path , "wallet.dat"))
1064          assert_equal(original_shasum, new_shasum)
1065  
1066          # Check the wallet we tried to migrate is still BDB
1067          datfile = os.path.join(absolute_path, "wallet.dat")
1068          with open(datfile, "rb") as f:
1069              data = f.read(16)
1070              _, _, magic = struct.unpack("QII", data)
1071              assert_equal(magic, BTREE_MAGIC)
1072  
1073      def test_blank(self):
1074          self.log.info("Test that a blank wallet is migrated")
1075          wallet = self.create_legacy_wallet("blank", blank=True)
1076          assert_equal(wallet.getwalletinfo()["blank"], True)
1077          _, wallet = self.migrate_and_get_rpc("blank")
1078          assert_equal(wallet.getwalletinfo()["blank"], True)
1079  
1080      def test_avoidreuse(self):
1081          self.log.info("Test that avoidreuse persists after migration")
1082          def_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
1083  
1084          wallet = self.create_legacy_wallet("avoidreuse")
1085          wallet.setwalletflag("avoid_reuse", True)
1086  
1087          # Import a pubkey to the test wallet and send some funds to it
1088          reused_imported_addr = def_wallet.getnewaddress()
1089          wallet.importpubkey(def_wallet.getaddressinfo(reused_imported_addr)["pubkey"])
1090          imported_utxos = self.create_outpoints(def_wallet, outputs=[{reused_imported_addr: 2}])
1091          def_wallet.lockunspent(False, imported_utxos)
1092  
1093          # Send funds to the test wallet
1094          reused_addr = wallet.getnewaddress()
1095          def_wallet.sendtoaddress(reused_addr, 2)
1096  
1097          self.generate(self.master_node, 1)
1098  
1099          # Send funds from the test wallet with both its own and the imported
1100          wallet.sendall([def_wallet.getnewaddress()])
1101          def_wallet.sendall(recipients=[def_wallet.getnewaddress()], inputs=imported_utxos)
1102          self.generate(self.master_node, 1)
1103          balances = wallet.getbalances()
1104          assert_equal(balances["mine"]["trusted"], 0)
1105          assert_equal(balances["watchonly"]["trusted"], 0)
1106  
1107          # Reuse the addresses
1108          def_wallet.sendtoaddress(reused_addr, 1)
1109          def_wallet.sendtoaddress(reused_imported_addr, 1)
1110          self.generate(self.master_node, 1)
1111          balances = wallet.getbalances()
1112          assert_equal(balances["mine"]["used"], 1)
1113          # Reused watchonly will not show up in balances
1114          assert_equal(balances["watchonly"]["trusted"], 0)
1115          assert_equal(balances["watchonly"]["untrusted_pending"], 0)
1116          assert_equal(balances["watchonly"]["immature"], 0)
1117  
1118          utxos = wallet.listunspent()
1119          assert_equal(len(utxos), 2)
1120          for utxo in utxos:
1121              assert_equal(utxo["reused"], True)
1122  
1123          # Migrate
1124          _, wallet = self.migrate_and_get_rpc("avoidreuse")
1125          watchonly_wallet = self.master_node.get_wallet_rpc("avoidreuse_watchonly")
1126  
1127          # One utxo in each wallet, marked used
1128          utxos = wallet.listunspent()
1129          assert_equal(len(utxos), 1)
1130          assert_equal(utxos[0]["reused"], True)
1131          watchonly_utxos = watchonly_wallet.listunspent()
1132          assert_equal(len(watchonly_utxos), 1)
1133          assert_equal(watchonly_utxos[0]["reused"], True)
1134  
1135      def test_preserve_tx_extra_info(self):
1136          self.log.info("Test that tx extra data is preserved after migration")
1137          def_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
1138  
1139          # Create and fund wallet
1140          wallet = self.create_legacy_wallet("persist_comments")
1141          def_wallet.sendtoaddress(wallet.getnewaddress(), 2)
1142  
1143          self.generate(self.master_node, 6)
1144  
1145          # Create tx and bump it to store 'replaced_by_txid' and 'replaces_txid' data within the transactions.
1146          # Additionally, store an extra comment within the original tx.
1147          extra_comment = "don't discard me"
1148          original_tx_id = wallet.sendtoaddress(address=wallet.getnewaddress(), amount=1, comment=extra_comment)
1149          bumped_tx = wallet.bumpfee(txid=original_tx_id)
1150  
1151          def check_comments():
1152              for record in wallet.listtransactions():
1153                  if record["txid"] == original_tx_id:
1154                      assert_equal(record["replaced_by_txid"], bumped_tx["txid"])
1155                      assert_equal(record['comment'], extra_comment)
1156                  elif record["txid"] == bumped_tx["txid"]:
1157                      assert_equal(record["replaces_txid"], original_tx_id)
1158  
1159          # Pre-migration verification
1160          check_comments()
1161          # Migrate
1162          _, wallet = self.migrate_and_get_rpc("persist_comments")
1163          # Post-migration verification
1164          check_comments()
1165  
1166          wallet.unloadwallet()
1167  
1168      def test_migrate_simple_watch_only(self):
1169          self.log.info("Test migrating a watch-only p2pk script")
1170          wallet = self.create_legacy_wallet("bare_p2pk", blank=True)
1171          _, pubkey = generate_keypair()
1172          p2pk_script = key_to_p2pk_script(pubkey)
1173          wallet.importaddress(address=p2pk_script.hex())
1174          # Migrate wallet in the latest node
1175          res, _ = self.migrate_and_get_rpc("bare_p2pk")
1176          wo_wallet = self.master_node.get_wallet_rpc(res['wallet_name'])
1177          assert_equal(wo_wallet.listdescriptors()['descriptors'][0]['desc'], descsum_create(f'pk({pubkey.hex()})'))
1178          assert_equal(wo_wallet.getwalletinfo()["private_keys_enabled"], False)
1179  
1180          # Ensure that migrating a wallet with watch-only scripts does not create a spendable wallet.
1181          assert_equal('bare_p2pk_watchonly', res['wallet_name'])
1182          assert "bare_p2pk" not in self.master_node.listwallets()
1183          assert "bare_p2pk" not in [w["name"] for w in self.master_node.listwalletdir()["wallets"]]
1184  
1185          wo_wallet.unloadwallet()
1186  
1187      def test_manual_keys_import(self):
1188          self.log.info("Test migrating standalone private keys")
1189          wallet = self.create_legacy_wallet("import_privkeys", blank=True)
1190          privkey, pubkey = generate_keypair(wif=True)
1191          wallet.importprivkey(privkey=privkey, label="hi", rescan=False)
1192  
1193          # Migrate and verify
1194          res, wallet = self.migrate_and_get_rpc("import_privkeys")
1195  
1196          # There should be descriptors containing the imported key for: pk(), pkh(), sh(wpkh()), wpkh()
1197          key_origin = hash160(pubkey)[:4].hex()
1198          pubkey_hex = pubkey.hex()
1199          combo_desc = descsum_create(f"combo([{key_origin}]{pubkey_hex})")
1200  
1201          # Verify all expected descriptors were migrated
1202          migrated_desc = [item['desc'] for item in wallet.listdescriptors()['descriptors'] if pubkey.hex() in item['desc']]
1203          assert_equal([combo_desc], migrated_desc)
1204          wallet.unloadwallet()
1205  
1206          ######################################################
1207          self.log.info("Test migrating standalone public keys")
1208          wallet = self.create_legacy_wallet("import_pubkeys", blank=True)
1209          wallet.importpubkey(pubkey=pubkey_hex, rescan=False)
1210  
1211          res, _ = self.migrate_and_get_rpc("import_pubkeys")
1212  
1213          # Same as before, there should be descriptors in the watch-only wallet for the imported pubkey
1214          wo_wallet = self.nodes[0].get_wallet_rpc(res['wallet_name'])
1215          # Assert this is a watch-only wallet
1216          assert_equal(wo_wallet.getwalletinfo()["private_keys_enabled"], False)
1217          # As we imported the pubkey only, there will be no key origin in the following descriptors
1218          pk_desc = descsum_create(f'pk({pubkey_hex})')
1219          pkh_desc = descsum_create(f'pkh({pubkey_hex})')
1220          sh_wpkh_desc = descsum_create(f'sh(wpkh({pubkey_hex}))')
1221          wpkh_desc = descsum_create(f'wpkh({pubkey_hex})')
1222          expected_descs = [pk_desc, pkh_desc, sh_wpkh_desc, wpkh_desc]
1223  
1224          # Verify all expected descriptors were migrated
1225          migrated_desc = [item['desc'] for item in wo_wallet.listdescriptors()['descriptors']]
1226          assert_equal(expected_descs, migrated_desc)
1227          # Ensure that migrating a wallet with watch-only scripts does not create a spendable wallet.
1228          assert_equal('import_pubkeys_watchonly', res['wallet_name'])
1229          assert "import_pubkeys" not in self.master_node.listwallets()
1230          assert "import_pubkeys" not in [w["name"] for w in self.master_node.listwalletdir()["wallets"]]
1231          wo_wallet.unloadwallet()
1232  
1233      def test_p2wsh(self):
1234          self.log.info("Test that non-multisig P2WSH output scripts are migrated")
1235          def_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
1236  
1237          wallet = self.create_legacy_wallet("p2wsh")
1238  
1239          # Craft wsh(pkh(key))
1240          pubkey = wallet.getaddressinfo(wallet.getnewaddress())["pubkey"]
1241          pkh_script = key_to_p2pkh_script(pubkey).hex()
1242          wsh_pkh_script = script_to_p2wsh_script(pkh_script).hex()
1243          wsh_pkh_addr = script_to_p2wsh(pkh_script)
1244  
1245          # Legacy single key scripts (i.e. pkh(key) and pk(key)) are not inserted into mapScripts
1246          # automatically, they need to be imported directly if we want to receive to P2WSH (or P2SH)
1247          # wrappings of such scripts.
1248          wallet.importaddress(address=pkh_script, p2sh=False)
1249          wallet.importaddress(address=wsh_pkh_script, p2sh=False)
1250  
1251          def_wallet.sendtoaddress(wsh_pkh_addr, 5)
1252          self.generate(self.nodes[0], 6)
1253          assert_equal(wallet.getbalances()['mine']['trusted'], 5)
1254  
1255          _, wallet = self.migrate_and_get_rpc("p2wsh")
1256  
1257          assert_equal(wallet.getbalances()['mine']['trusted'], 5)
1258          addr_info = wallet.getaddressinfo(wsh_pkh_addr)
1259          assert_equal(addr_info["ismine"], True)
1260          assert_equal(addr_info["solvable"], True)
1261  
1262          wallet.unloadwallet()
1263  
1264      def test_disallowed_p2wsh(self):
1265          self.log.info("Test that P2WSH output scripts with invalid witnessScripts are not migrated and do not cause migration failure")
1266          def_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
1267  
1268          wallet = self.create_legacy_wallet("invalid_p2wsh")
1269  
1270          invalid_addrs = []
1271  
1272          # For a P2WSH output script stored in the legacy wallet's mapScripts, both the native P2WSH
1273          # and the P2SH-P2WSH are detected by IsMine. We need to verify that descriptors for both
1274          # output scripts are added to the resulting descriptor wallet.
1275          # However, this cannot be done using a multisig as wallet migration treats multisigs specially.
1276          # Instead, this is tested by importing a wsh(pkh()) script. But importing this directly will
1277          # insert the wsh() into setWatchOnly which means that the setWatchOnly migration ends up handling
1278          # this case, which we do not want.
1279          # In order to get the wsh(pkh()) into only mapScripts and not setWatchOnly, we need to utilize
1280          # importmulti and wrap the wsh(pkh()) inside of a sh(). This will insert the sh(wsh(pkh())) into
1281          # setWatchOnly but not the wsh(pkh()).
1282          # Furthermore, migration should not migrate the wsh(pkh()) if the key is uncompressed.
1283          comp_wif, comp_pubkey = generate_keypair(compressed=True, wif=True)
1284          comp_pkh_script = key_to_p2pkh_script(comp_pubkey).hex()
1285          comp_wsh_pkh_script = script_to_p2wsh_script(comp_pkh_script).hex()
1286          comp_sh_wsh_pkh_script = script_to_p2sh_script(comp_wsh_pkh_script).hex()
1287          comp_wsh_pkh_addr = script_to_p2wsh(comp_pkh_script)
1288  
1289          uncomp_wif, uncomp_pubkey = generate_keypair(compressed=False, wif=True)
1290          uncomp_pkh_script = key_to_p2pkh_script(uncomp_pubkey).hex()
1291          uncomp_wsh_pkh_script = script_to_p2wsh_script(uncomp_pkh_script).hex()
1292          uncomp_sh_wsh_pkh_script = script_to_p2sh_script(uncomp_wsh_pkh_script).hex()
1293          uncomp_wsh_pkh_addr = script_to_p2wsh(uncomp_pkh_script)
1294          invalid_addrs.append(uncomp_wsh_pkh_addr)
1295  
1296          import_res = wallet.importmulti([
1297              {
1298                  "scriptPubKey": comp_sh_wsh_pkh_script,
1299                  "timestamp": "now",
1300                  "redeemscript": comp_wsh_pkh_script,
1301                  "witnessscript": comp_pkh_script,
1302                  "keys": [
1303                      comp_wif,
1304                  ],
1305              },
1306              {
1307                  "scriptPubKey": uncomp_sh_wsh_pkh_script,
1308                  "timestamp": "now",
1309                  "redeemscript": uncomp_wsh_pkh_script,
1310                  "witnessscript": uncomp_pkh_script,
1311                  "keys": [
1312                      uncomp_wif,
1313                  ],
1314              },
1315          ])
1316          assert_equal(import_res[0]["success"], True)
1317          assert_equal(import_res[1]["success"], True)
1318  
1319          # Create a wsh(sh(pkh())) - P2SH inside of P2WSH is invalid
1320          comp_sh_pkh_script = script_to_p2sh_script(comp_pkh_script).hex()
1321          wsh_sh_pkh_script = script_to_p2wsh_script(comp_sh_pkh_script).hex()
1322          wsh_sh_pkh_addr = script_to_p2wsh(comp_sh_pkh_script)
1323          invalid_addrs.append(wsh_sh_pkh_addr)
1324  
1325          # Import wsh(sh(pkh()))
1326          wallet.importaddress(address=comp_sh_pkh_script, p2sh=False)
1327          wallet.importaddress(address=wsh_sh_pkh_script, p2sh=False)
1328  
1329          # Create a wsh(wsh(pkh())) - P2WSH inside of P2WSH is invalid
1330          wsh_wsh_pkh_script = script_to_p2wsh_script(comp_wsh_pkh_script).hex()
1331          wsh_wsh_pkh_addr = script_to_p2wsh(comp_wsh_pkh_script)
1332          invalid_addrs.append(wsh_wsh_pkh_addr)
1333  
1334          # Import wsh(wsh(pkh()))
1335          wallet.importaddress(address=wsh_wsh_pkh_script, p2sh=False)
1336  
1337          # The wsh(pkh()) with a compressed key is always valid, so we should see that the wallet detects it as ismine, not
1338          # watchonly, and can provide us information about the witnessScript via "embedded"
1339          comp_wsh_pkh_addr_info = wallet.getaddressinfo(comp_wsh_pkh_addr)
1340          assert_equal(comp_wsh_pkh_addr_info["ismine"], True)
1341          assert_equal(comp_wsh_pkh_addr_info["iswatchonly"], False)
1342          assert "embedded" in comp_wsh_pkh_addr_info
1343  
1344          # The invalid addresses are invalid, so the legacy wallet should not detect them as ismine,
1345          # nor consider them watchonly. However, because the legacy wallet has the witnessScripts/redeemScripts,
1346          # we should see information about those in "embedded"
1347          for addr in invalid_addrs:
1348              addr_info = wallet.getaddressinfo(addr)
1349              assert_equal(addr_info["ismine"], False)
1350              assert_equal(addr_info["iswatchonly"], False)
1351              assert "embedded" in addr_info
1352  
1353          # Fund those output scripts, although the invalid addresses will not have any balance.
1354          # This behavior follows as the addresses are not ismine.
1355          def_wallet.send([{comp_wsh_pkh_addr: 1}] + [{k: i + 1} for i, k in enumerate(invalid_addrs)])
1356          self.generate(self.nodes[0], 6)
1357          bal = wallet.getbalances()
1358          assert_equal(bal["mine"]["trusted"], 1)
1359          assert_equal(bal["watchonly"]["trusted"], 0)
1360  
1361          res, wallet = self.migrate_and_get_rpc("invalid_p2wsh")
1362          assert "watchonly_name" not in res
1363          assert "solvables_name" not in res
1364  
1365          assert_equal(wallet.getbalances()["mine"]["trusted"], 1)
1366  
1367          # After migration, the wsh(pkh()) with a compressed key is still valid and the descriptor wallet will have
1368          # information about the witnessScript
1369          comp_wsh_pkh_addr_info = wallet.getaddressinfo(comp_wsh_pkh_addr)
1370          assert_equal(comp_wsh_pkh_addr_info["ismine"], True)
1371          assert "embedded" in comp_wsh_pkh_addr_info
1372  
1373          # After migration, the invalid addresses should still not be detected as ismine and not watchonly.
1374          # The descriptor wallet should not have migrated these at all, so there should additionally be no
1375          # information in "embedded" about the witnessScripts/redeemScripts.
1376          for addr in invalid_addrs:
1377              addr_info = wallet.getaddressinfo(addr)
1378              assert_equal(addr_info["ismine"], False)
1379              assert "embedded" not in addr_info
1380  
1381          wallet.unloadwallet()
1382  
1383      def test_miniscript(self):
1384          # It turns out that due to how signing logic works, legacy wallets that have valid miniscript witnessScripts
1385          # and the private keys for them can still sign and spend them, even though output scripts involving them
1386          # as a witnessScript would not be detected as ISMINE_SPENDABLE.
1387          self.log.info("Test migration of a legacy wallet containing miniscript")
1388          def_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
1389          wallet = self.create_legacy_wallet("miniscript")
1390  
1391          privkey, _ = generate_keypair(compressed=True, wif=True)
1392  
1393          # Make a descriptor where we only have some of the keys. This will be migrated to the watchonly wallet.
1394          some_keys_priv_desc = descsum_create(f"wsh(or_b(pk({privkey}),s:pk(029ffbe722b147f3035c87cb1c60b9a5947dd49c774cc31e94773478711a929ac0)))")
1395          some_keys_addr = self.master_node.deriveaddresses(some_keys_priv_desc)[0]
1396  
1397          # Make a descriptor where we have all of the keys. This will stay in the migrated wallet
1398          all_keys_priv_desc = descsum_create(f"wsh(and_v(v:pk({privkey}),1))")
1399          all_keys_addr = self.master_node.deriveaddresses(all_keys_priv_desc)[0]
1400  
1401          imp = wallet.importmulti([
1402              {
1403                  "desc": some_keys_priv_desc,
1404                  "timestamp": "now",
1405              },
1406              {
1407                  "desc": all_keys_priv_desc,
1408                  "timestamp": "now",
1409              }
1410          ])
1411          assert_equal(imp[0]["success"], True)
1412          assert_equal(imp[1]["success"], True)
1413  
1414          def_wallet.sendtoaddress(some_keys_addr, 1)
1415          def_wallet.sendtoaddress(all_keys_addr, 1)
1416          self.generate(self.master_node, 6)
1417          # Check that the miniscript can be spent by the legacy wallet
1418          send_res = wallet.send(outputs=[{some_keys_addr: 1},{all_keys_addr: 0.75}], include_watching=True, change_address=def_wallet.getnewaddress())
1419          assert_equal(send_res["complete"], True)
1420          self.generate(self.old_node, 6)
1421          assert_equal(wallet.getbalances()["watchonly"]["trusted"], 1.75)
1422  
1423          _, wallet = self.migrate_and_get_rpc("miniscript")
1424  
1425          # The miniscript with all keys should be in the migrated wallet
1426          assert_equal(wallet.getbalances()["mine"], {"trusted": 0.75, "untrusted_pending": 0, "immature": 0})
1427          assert_equal(wallet.getaddressinfo(all_keys_addr)["ismine"], True)
1428          assert_equal(wallet.getaddressinfo(some_keys_addr)["ismine"], False)
1429  
1430          # The miniscript with some keys should be in the watchonly wallet
1431          assert "miniscript_watchonly" in self.master_node.listwallets()
1432          watchonly = self.master_node.get_wallet_rpc("miniscript_watchonly")
1433          assert_equal(watchonly.getbalances()["mine"], {"trusted": 1, "untrusted_pending": 0, "immature": 0})
1434          assert_equal(watchonly.getaddressinfo(some_keys_addr)["ismine"], True)
1435          assert_equal(watchonly.getaddressinfo(all_keys_addr)["ismine"], False)
1436  
1437      def test_taproot(self):
1438          # It turns out that due to how signing logic works, legacy wallets that have the private key for a Taproot
1439          # output key will be able to sign and spend those scripts, even though they would not be detected as ISMINE_SPENDABLE.
1440          self.log.info("Test migration of Taproot scripts")
1441          def_wallet = self.master_node.get_wallet_rpc(self.default_wallet_name)
1442          wallet = self.create_legacy_wallet("taproot")
1443  
1444          privkey, _ = generate_keypair(compressed=True, wif=True)
1445  
1446          rawtr_desc = descsum_create(f"rawtr({privkey})")
1447          rawtr_addr = self.master_node.deriveaddresses(rawtr_desc)[0]
1448          rawtr_spk = self.master_node.validateaddress(rawtr_addr)["scriptPubKey"]
1449          tr_desc = descsum_create(f"tr({privkey})")
1450          tr_addr = self.master_node.deriveaddresses(tr_desc)[0]
1451          tr_spk = self.master_node.validateaddress(tr_addr)["scriptPubKey"]
1452          tr_script_desc = descsum_create(f"tr(9ffbe722b147f3035c87cb1c60b9a5947dd49c774cc31e94773478711a929ac0,pk({privkey}))")
1453          tr_script_addr = self.master_node.deriveaddresses(tr_script_desc)[0]
1454          tr_script_spk = self.master_node.validateaddress(tr_script_addr)["scriptPubKey"]
1455  
1456          wallet.importaddress(rawtr_spk)
1457          wallet.importaddress(tr_spk)
1458          wallet.importaddress(tr_script_spk)
1459          wallet.importprivkey(privkey)
1460  
1461          txid = def_wallet.send([{rawtr_addr: 1},{tr_addr: 2}, {tr_script_addr: 3}])["txid"]
1462          rawtr_vout = find_vout_for_address(self.master_node, txid, rawtr_addr)
1463          tr_vout = find_vout_for_address(self.master_node, txid, tr_addr)
1464          tr_script_vout = find_vout_for_address(self.master_node, txid, tr_script_addr)
1465          self.generate(self.master_node, 6)
1466          assert_equal(wallet.getbalances()["watchonly"]["trusted"], 6)
1467  
1468          # Check that the rawtr can be spent by the legacy wallet
1469          send_res = wallet.send(outputs=[{rawtr_addr: 0.5}], include_watching=True, change_address=def_wallet.getnewaddress(), inputs=[{"txid": txid, "vout": rawtr_vout}])
1470          assert_equal(send_res["complete"], True)
1471          self.generate(self.old_node, 6)
1472          assert_equal(wallet.getbalances()["watchonly"]["trusted"], 5.5)
1473          assert_equal(wallet.getbalances()["mine"]["trusted"], 0)
1474  
1475          # Check that the tr() cannot be spent by the legacy wallet
1476          send_res = wallet.send(outputs=[{def_wallet.getnewaddress(): 4}], include_watching=True, inputs=[{"txid": txid, "vout": tr_vout}, {"txid": txid, "vout": tr_script_vout}])
1477          assert_equal(send_res["complete"], False)
1478  
1479          res, wallet = self.migrate_and_get_rpc("taproot")
1480  
1481          # The rawtr should be migrated
1482          assert_equal(wallet.getbalances()["mine"], {"trusted": 0.5, "untrusted_pending": 0, "immature": 0})
1483          assert_equal(wallet.getaddressinfo(rawtr_addr)["ismine"], True)
1484          assert_equal(wallet.getaddressinfo(tr_addr)["ismine"], False)
1485          assert_equal(wallet.getaddressinfo(tr_script_addr)["ismine"], False)
1486  
1487          # The tr() with some keys should be in the watchonly wallet
1488          assert "taproot_watchonly" in self.master_node.listwallets()
1489          watchonly = self.master_node.get_wallet_rpc("taproot_watchonly")
1490          assert_equal(watchonly.getbalances()["mine"], {"trusted": 5, "untrusted_pending": 0, "immature": 0})
1491          assert_equal(watchonly.getaddressinfo(rawtr_addr)["ismine"], False)
1492          assert_equal(watchonly.getaddressinfo(tr_addr)["ismine"], True)
1493          assert_equal(watchonly.getaddressinfo(tr_script_addr)["ismine"], True)
1494  
1495      def test_solvable_no_privs(self):
1496          self.log.info("Test migrating a multisig that we do not have any private keys for")
1497          wallet = self.create_legacy_wallet("multisig_noprivs")
1498  
1499          _, pubkey = generate_keypair(compressed=True, wif=True)
1500  
1501          add_ms_res = wallet.addmultisigaddress(nrequired=1, keys=[pubkey.hex()])
1502          addr = add_ms_res["address"]
1503  
1504          # The multisig address should be ISMINE_NO but we should have the script info
1505          addr_info = wallet.getaddressinfo(addr)
1506          assert_equal(addr_info["ismine"], False)
1507          assert "hex" in addr_info
1508  
1509          migrate_res, wallet = self.migrate_and_get_rpc("multisig_noprivs")
1510          assert_equal(migrate_res["solvables_name"], "multisig_noprivs_solvables")
1511          solvables = self.master_node.get_wallet_rpc(migrate_res["solvables_name"])
1512  
1513          # The multisig should not be in the spendable wallet
1514          addr_info = wallet.getaddressinfo(addr)
1515          assert_equal(addr_info["ismine"], False)
1516          assert "hex" not in addr_info
1517  
1518          # The multisig address should be in the solvables wallet
1519          addr_info = solvables.getaddressinfo(addr)
1520          assert_equal(addr_info["ismine"], True)
1521          assert_equal(addr_info["solvable"], True)
1522          assert "hex" in addr_info
1523  
1524      def test_loading_failure_after_migration(self):
1525          self.log.info("Test that a failed loading of the wallet at the end of migration restores the backup")
1526          self.stop_node(self.old_node.index)
1527          self.old_node.chain = "signet"
1528          self.old_node.replace_in_config([("regtest=", "signet="), ("[regtest]", "[signet]")])
1529          # Disable network sync and prevent disk space warning on small (tmp)fs
1530          self.start_node(self.old_node.index, extra_args=self.old_node.extra_args + ["-maxconnections=0", "-prune=550"])
1531  
1532          wallet_name = "failed_load_after_migrate"
1533          self.create_legacy_wallet(wallet_name)
1534          assert_raises_rpc_error(-4, "Wallet loading failed. Wallet files should not be reused across chains.", lambda: self.migrate_and_get_rpc(wallet_name))
1535  
1536          # Check the wallet we tried to migrate is still BDB
1537          self.assert_is_bdb(wallet_name)
1538  
1539          self.stop_node(self.old_node.index)
1540          self.old_node.chain = "regtest"
1541          self.old_node.replace_in_config([("signet=", "regtest="), ("[signet]", "[regtest]")])
1542          self.start_node(self.old_node.index)
1543          self.connect_nodes(1, 0)
1544  
1545      def run_test(self):
1546          self.master_node = self.nodes[0]
1547          self.old_node = self.nodes[1]
1548  
1549          self.generate(self.master_node, 101)
1550  
1551          # TODO: Test the actual records in the wallet for these tests too. The behavior may be correct, but the data written may not be what we actually want
1552          self.test_basic()
1553          self.test_multisig()
1554          self.test_other_watchonly()
1555          self.test_no_privkeys()
1556          self.test_pk_coinbases()
1557          self.test_encrypted()
1558          self.test_nonexistent()
1559          self.test_unloaded_by_path()
1560          self.test_wallet_with_relative_path()
1561          self.test_wallet_with_path("path/to/mywallet/")
1562          self.test_wallet_with_path("path/that/ends/in/..")
1563          self.test_default_wallet()
1564          self.test_direct_file()
1565          self.test_addressbook()
1566          self.test_migrate_raw_p2sh()
1567          self.test_conflict_txs()
1568          self.test_hybrid_pubkey()
1569          self.test_failed_migration_cleanup()
1570          self.test_failed_migration_cleanup_relative_path()
1571          self.test_avoidreuse()
1572          self.test_preserve_tx_extra_info()
1573          self.test_blank()
1574          self.test_migrate_simple_watch_only()
1575          self.test_manual_keys_import()
1576          self.test_p2wsh()
1577          self.test_disallowed_p2wsh()
1578          self.test_miniscript()
1579          self.test_taproot()
1580          self.test_solvable_no_privs()
1581          self.test_loading_failure_after_migration()
1582  
1583  if __name__ == '__main__':
1584      WalletMigrationTest(__file__).main()