wallet_address_types.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test that the wallet can send and receive using all combinations of address types. 6 7 There are 5 nodes-under-test: 8 - node0 uses legacy addresses 9 - node1 uses p2sh/segwit addresses 10 - node2 uses p2sh/segwit addresses and bech32 addresses for change 11 - node3 uses bech32 addresses 12 - node4 uses a p2sh/segwit addresses for change 13 14 node5 exists to generate new blocks. 15 16 ## Multisig address test 17 18 Test that adding a multisig address with: 19 - an uncompressed pubkey always gives a legacy address 20 - only compressed pubkeys gives the an `-addresstype` address 21 22 ## Sending to address types test 23 24 A series of tests, iterating over node0-node4. In each iteration of the test, one node sends: 25 - 10/101th of its balance to itself (using getrawchangeaddress for single key addresses) 26 - 20/101th to the next node 27 - 30/101th to the node after that 28 - 40/101th to the remaining node 29 - 1/101th remains as fee+change 30 31 Iterate over each node for single key addresses, and then over each node for 32 multisig addresses. 33 34 Repeat test, but with explicit address_type parameters passed to getnewaddress 35 and getrawchangeaddress: 36 - node0 and node3 send to p2sh. 37 - node1 sends to bech32. 38 - node2 sends to legacy. 39 40 As every node sends coins after receiving, this also 41 verifies that spending coins sent to all these address types works. 42 43 ## Change type test 44 45 Test that the nodes generate the correct change address type: 46 - node0 always uses a legacy change address. 47 - node1 uses a bech32 addresses for change if any destination address is bech32. 48 - node2 always uses a bech32 address for change 49 - node3 always uses a bech32 address for change 50 - node4 always uses p2sh/segwit output for change. 51 """ 52 53 from decimal import Decimal 54 import itertools 55 56 from test_framework.blocktools import COINBASE_MATURITY 57 from test_framework.test_framework import BitcoinTestFramework 58 from test_framework.descriptors import ( 59 descsum_create, 60 descsum_check, 61 ) 62 from test_framework.util import ( 63 assert_equal, 64 assert_greater_than, 65 assert_raises_rpc_error, 66 ) 67 68 class AddressTypeTest(BitcoinTestFramework): 69 def set_test_params(self): 70 self.num_nodes = 6 71 self.extra_args = [ 72 ["-addresstype=legacy"], 73 ["-addresstype=p2sh-segwit"], 74 ["-addresstype=p2sh-segwit", "-changetype=bech32"], 75 ["-addresstype=bech32"], 76 ["-changetype=p2sh-segwit"], 77 [], 78 ] 79 # whitelist peers to speed up tx relay / mempool sync 80 self.noban_tx_relay = True 81 82 def skip_test_if_missing_module(self): 83 self.skip_if_no_wallet() 84 85 def setup_network(self): 86 self.setup_nodes() 87 88 # Fully mesh-connect nodes for faster mempool sync 89 for i, j in itertools.product(range(self.num_nodes), repeat=2): 90 if i > j: 91 self.connect_nodes(i, j) 92 self.sync_all() 93 94 def get_balances(self, key='trusted'): 95 """Return a list of balances.""" 96 return [self.nodes[i].getbalances()['mine'][key] for i in range(4)] 97 98 def test_address(self, node, address, multisig, typ): 99 """Run sanity checks on an address.""" 100 info = self.nodes[node].getaddressinfo(address) 101 assert self.nodes[node].validateaddress(address)['isvalid'] 102 assert_equal(info.get('solvable'), True) 103 104 if not multisig and typ == 'legacy': 105 # P2PKH 106 assert not info['isscript'] 107 assert not info['iswitness'] 108 assert 'pubkey' in info 109 elif not multisig and typ == 'p2sh-segwit': 110 # P2SH-P2WPKH 111 assert info['isscript'] 112 assert not info['iswitness'] 113 assert_equal(info['script'], 'witness_v0_keyhash') 114 assert 'pubkey' in info 115 elif not multisig and typ == 'bech32': 116 # P2WPKH 117 assert not info['isscript'] 118 assert info['iswitness'] 119 assert_equal(info['witness_version'], 0) 120 assert_equal(len(info['witness_program']), 40) 121 assert 'pubkey' in info 122 elif not multisig and typ == "bech32m": 123 # P2TR single sig 124 assert info["isscript"] 125 assert info["iswitness"] 126 assert_equal(info["witness_version"], 1) 127 assert_equal(len(info["witness_program"]), 64) 128 elif typ == 'legacy': 129 # P2SH-multisig 130 assert info['isscript'] 131 assert_equal(info['script'], 'multisig') 132 assert not info['iswitness'] 133 assert 'pubkeys' in info 134 elif typ == 'p2sh-segwit': 135 # P2SH-P2WSH-multisig 136 assert info['isscript'] 137 assert_equal(info['script'], 'witness_v0_scripthash') 138 assert not info['iswitness'] 139 assert info['embedded']['isscript'] 140 assert_equal(info['embedded']['script'], 'multisig') 141 assert info['embedded']['iswitness'] 142 assert_equal(info['embedded']['witness_version'], 0) 143 assert_equal(len(info['embedded']['witness_program']), 64) 144 assert 'pubkeys' in info['embedded'] 145 elif typ == 'bech32': 146 # P2WSH-multisig 147 assert info['isscript'] 148 assert_equal(info['script'], 'multisig') 149 assert info['iswitness'] 150 assert_equal(info['witness_version'], 0) 151 assert_equal(len(info['witness_program']), 64) 152 assert 'pubkeys' in info 153 else: 154 # Unknown type 155 assert False 156 157 def test_desc(self, node, address, multisig, typ, utxo): 158 """Run sanity checks on a descriptor reported by getaddressinfo.""" 159 info = self.nodes[node].getaddressinfo(address) 160 assert 'desc' in info 161 assert_equal(info['desc'], utxo['desc']) 162 assert self.nodes[node].validateaddress(address)['isvalid'] 163 164 # Use a ridiculously roundabout way to find the key origin info through 165 # the PSBT logic. However, this does test consistency between the PSBT reported 166 # fingerprints/paths and the descriptor logic. 167 psbt = self.nodes[node].createpsbt([{'txid':utxo['txid'], 'vout':utxo['vout']}],[{address:0.00010000}]) 168 psbt = self.nodes[node].walletprocesspsbt(psbt, False, "ALL", True) 169 decode = self.nodes[node].decodepsbt(psbt['psbt']) 170 key_descs = {} 171 for deriv in decode['inputs'][0]['bip32_derivs']: 172 assert_equal(len(deriv['master_fingerprint']), 8) 173 assert_equal(deriv['path'][0], 'm') 174 key_descs[deriv['pubkey']] = '[' + deriv['master_fingerprint'] + deriv['path'][1:].replace("'","h") + ']' + deriv['pubkey'] 175 176 # Verify the descriptor checksum against the Python implementation 177 assert descsum_check(info['desc']) 178 # Verify that stripping the checksum and recreating it using Python roundtrips 179 assert info['desc'] == descsum_create(info['desc'][:-9]) 180 # Verify that stripping the checksum and feeding it to getdescriptorinfo roundtrips 181 assert info['desc'] == self.nodes[0].getdescriptorinfo(info['desc'][:-9])['descriptor'] 182 assert_equal(info['desc'][-8:], self.nodes[0].getdescriptorinfo(info['desc'][:-9])['checksum']) 183 # Verify that keeping the checksum and feeding it to getdescriptorinfo roundtrips 184 assert info['desc'] == self.nodes[0].getdescriptorinfo(info['desc'])['descriptor'] 185 assert_equal(info['desc'][-8:], self.nodes[0].getdescriptorinfo(info['desc'])['checksum']) 186 187 if not multisig and typ == 'legacy': 188 # P2PKH 189 assert_equal(info['desc'], descsum_create("pkh(%s)" % key_descs[info['pubkey']])) 190 elif not multisig and typ == 'p2sh-segwit': 191 # P2SH-P2WPKH 192 assert_equal(info['desc'], descsum_create("sh(wpkh(%s))" % key_descs[info['pubkey']])) 193 elif not multisig and typ == 'bech32': 194 # P2WPKH 195 assert_equal(info['desc'], descsum_create("wpkh(%s)" % key_descs[info['pubkey']])) 196 elif typ == 'legacy': 197 # P2SH-multisig 198 assert_equal(info['desc'], descsum_create("sh(multi(2,%s,%s))" % (key_descs[info['pubkeys'][0]], key_descs[info['pubkeys'][1]]))) 199 elif typ == 'p2sh-segwit': 200 # P2SH-P2WSH-multisig 201 assert_equal(info['desc'], descsum_create("sh(wsh(multi(2,%s,%s)))" % (key_descs[info['embedded']['pubkeys'][0]], key_descs[info['embedded']['pubkeys'][1]]))) 202 elif typ == 'bech32': 203 # P2WSH-multisig 204 assert_equal(info['desc'], descsum_create("wsh(multi(2,%s,%s))" % (key_descs[info['pubkeys'][0]], key_descs[info['pubkeys'][1]]))) 205 else: 206 # Unknown type 207 assert False 208 209 def test_change_output_type(self, node_sender, destinations, expected_type): 210 txid = self.nodes[node_sender].sendmany(dummy="", amounts=dict.fromkeys(destinations, 0.001)) 211 tx = self.nodes[node_sender].gettransaction(txid=txid, verbose=True)['decoded'] 212 213 # Make sure the transaction has change: 214 assert_equal(len(tx["vout"]), len(destinations) + 1) 215 216 # Make sure the destinations are included, and remove them: 217 output_addresses = [vout['scriptPubKey']['address'] for vout in tx["vout"]] 218 change_addresses = [d for d in output_addresses if d not in destinations] 219 assert_equal(len(change_addresses), 1) 220 221 self.log.debug("Check if change address " + change_addresses[0] + " is " + expected_type) 222 self.test_address(node_sender, change_addresses[0], multisig=False, typ=expected_type) 223 224 def run_test(self): 225 # Mine 101 blocks on node5 to bring nodes out of IBD and make sure that 226 # no coinbases are maturing for the nodes-under-test during the test 227 self.generate(self.nodes[5], COINBASE_MATURITY + 1) 228 229 compressed_1 = "0296b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52" 230 compressed_2 = "037211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073" 231 232 do_multisigs = [False] 233 234 for explicit_type, multisig, from_node in itertools.product([False, True], do_multisigs, range(4)): 235 address_type = None 236 if explicit_type and not multisig: 237 if from_node == 1: 238 address_type = 'bech32' 239 elif from_node == 0 or from_node == 3: 240 address_type = 'p2sh-segwit' 241 else: 242 address_type = 'legacy' 243 self.log.info("Sending from node {} ({}) with{} multisig using {}".format(from_node, self.extra_args[from_node], "" if multisig else "out", "default" if address_type is None else address_type)) 244 old_balances = self.get_balances() 245 self.log.debug("Old balances are {}".format(old_balances)) 246 to_send = (old_balances[from_node] / (COINBASE_MATURITY + 1)).quantize(Decimal("0.00000001")) 247 sends = {} 248 addresses = {} 249 250 self.log.debug("Prepare sends") 251 for n, to_node in enumerate(range(from_node, from_node + 4)): 252 to_node %= 4 253 change = False 254 if not multisig: 255 if from_node == to_node: 256 # When sending non-multisig to self, use getrawchangeaddress 257 address = self.nodes[to_node].getrawchangeaddress(address_type=address_type) 258 change = True 259 else: 260 address = self.nodes[to_node].getnewaddress(address_type=address_type) 261 else: 262 pubkey1 = self.nodes[to_node].getaddressinfo(self.nodes[to_node].getnewaddress())["pubkey"] 263 pubkey2 = self.nodes[to_node].getaddressinfo(self.nodes[to_node].getnewaddress())["pubkey"] 264 ms = self.nodes[to_node].createmultisig(2, [pubkey1, pubkey2]) 265 import_res = self.nodes[to_node].importdescriptors([{"desc": ms["descriptor"], "timestamp": 0}]) 266 assert_equal(import_res[0]["success"], True) 267 268 # Do some sanity checking on the created address 269 if address_type is not None: 270 typ = address_type 271 elif to_node == 0: 272 typ = 'legacy' 273 elif to_node == 1 or (to_node == 2 and not change): 274 typ = 'p2sh-segwit' 275 else: 276 typ = 'bech32' 277 self.test_address(to_node, address, multisig, typ) 278 279 # Output entry 280 sends[address] = to_send * 10 * (1 + n) 281 addresses[to_node] = (address, typ) 282 283 self.log.debug("Sending: {}".format(sends)) 284 self.nodes[from_node].sendmany("", sends) 285 self.sync_mempools() 286 287 unconf_balances = self.get_balances('untrusted_pending') 288 self.log.debug("Check unconfirmed balances: {}".format(unconf_balances)) 289 assert_equal(unconf_balances[from_node], 0) 290 for n, to_node in enumerate(range(from_node + 1, from_node + 4)): 291 to_node %= 4 292 assert_equal(unconf_balances[to_node], to_send * 10 * (2 + n)) 293 294 # node5 collects fee and block subsidy to keep accounting simple 295 self.generate(self.nodes[5], 1) 296 297 # Verify that the receiving wallet contains a UTXO with the expected address, and expected descriptor 298 for n, to_node in enumerate(range(from_node, from_node + 4)): 299 to_node %= 4 300 found = False 301 for utxo in self.nodes[to_node].listunspent(): 302 if utxo['address'] == addresses[to_node][0]: 303 found = True 304 self.test_desc(to_node, addresses[to_node][0], multisig, addresses[to_node][1], utxo) 305 break 306 assert found 307 308 new_balances = self.get_balances() 309 self.log.debug("Check new balances: {}".format(new_balances)) 310 # We don't know what fee was set, so we can only check bounds on the balance of the sending node 311 assert_greater_than(new_balances[from_node], to_send * 10) 312 assert_greater_than(to_send * 11, new_balances[from_node]) 313 for n, to_node in enumerate(range(from_node + 1, from_node + 4)): 314 to_node %= 4 315 assert_equal(new_balances[to_node], old_balances[to_node] + to_send * 10 * (2 + n)) 316 317 # Get one p2sh/segwit address from node2 and two bech32 addresses from node3: 318 to_address_p2sh = self.nodes[2].getnewaddress() 319 to_address_bech32_1 = self.nodes[3].getnewaddress() 320 to_address_bech32_2 = self.nodes[3].getnewaddress() 321 322 # Fund node 4: 323 self.nodes[5].sendtoaddress(self.nodes[4].getnewaddress(), Decimal("1")) 324 self.generate(self.nodes[5], 1) 325 assert_equal(self.nodes[4].getbalance(), 1) 326 327 self.log.info("Nodes with addresstype=legacy never use a P2WPKH change output (unless changetype is set otherwise):") 328 self.test_change_output_type(0, [to_address_bech32_1], 'legacy') 329 330 self.log.info("Nodes with addresstype=p2sh-segwit match the change output") 331 self.test_change_output_type(1, [to_address_p2sh], 'p2sh-segwit') 332 self.test_change_output_type(1, [to_address_bech32_1], 'bech32') 333 self.test_change_output_type(1, [to_address_p2sh, to_address_bech32_1], 'bech32') 334 self.test_change_output_type(1, [to_address_bech32_1, to_address_bech32_2], 'bech32') 335 336 self.log.info("Nodes with change_type=bech32 always use a P2WPKH change output:") 337 self.test_change_output_type(2, [to_address_bech32_1], 'bech32') 338 self.test_change_output_type(2, [to_address_p2sh], 'bech32') 339 340 self.log.info("Nodes with addresstype=bech32 match the change output (unless changetype is set otherwise):") 341 self.test_change_output_type(3, [to_address_bech32_1], 'bech32') 342 self.test_change_output_type(3, [to_address_p2sh], 'p2sh-segwit') 343 344 self.log.info('getrawchangeaddress defaults to addresstype if -changetype is not set and argument is absent') 345 self.test_address(3, self.nodes[3].getrawchangeaddress(), multisig=False, typ='bech32') 346 347 self.log.info('test invalid address type arguments') 348 assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].createmultisig, 2, [compressed_1, compressed_2], address_type="") 349 assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getnewaddress, None, '') 350 assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getrawchangeaddress, '') 351 assert_raises_rpc_error(-5, "Unknown address type 'bech23'", self.nodes[3].getrawchangeaddress, 'bech23') 352 assert_raises_rpc_error(-5, "Unknown address type 'bech23'", self.nodes[3].createwalletdescriptor, "bech23") 353 354 self.log.info("Nodes with changetype=p2sh-segwit never use a P2WPKH change output") 355 self.test_change_output_type(4, [to_address_bech32_1], 'p2sh-segwit') 356 self.test_address(4, self.nodes[4].getrawchangeaddress(), multisig=False, typ='p2sh-segwit') 357 self.log.info("Except for getrawchangeaddress if specified:") 358 self.test_address(4, self.nodes[4].getrawchangeaddress(), multisig=False, typ='p2sh-segwit') 359 self.test_address(4, self.nodes[4].getrawchangeaddress('bech32'), multisig=False, typ='bech32') 360 361 self.log.info("Descriptor wallets have bech32m addresses") 362 self.test_address(4, self.nodes[4].getnewaddress("", "bech32m"), multisig=False, typ="bech32m") 363 self.test_address(4, self.nodes[4].getrawchangeaddress("bech32m"), multisig=False, typ="bech32m") 364 365 if __name__ == '__main__': 366 AddressTypeTest(__file__).main()