wallet_address_types.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test that the wallet can send and receive using all combinations of address types. 6 7 There are 5 nodes-under-test: 8 - node0 uses legacy addresses 9 - node1 uses p2sh/segwit addresses 10 - node2 uses p2sh/segwit addresses and bech32 addresses for change 11 - node3 uses bech32 addresses 12 - node4 uses a p2sh/segwit addresses for change 13 14 node5 exists to generate new blocks. 15 16 ## Multisig address test 17 18 Test that adding a multisig address with: 19 - an uncompressed pubkey always gives a legacy address 20 - only compressed pubkeys gives the an `-addresstype` address 21 22 ## Sending to address types test 23 24 A series of tests, iterating over node0-node4. In each iteration of the test, one node sends: 25 - 10/101th of its balance to itself (using getrawchangeaddress for single key addresses) 26 - 20/101th to the next node 27 - 30/101th to the node after that 28 - 40/101th to the remaining node 29 - 1/101th remains as fee+change 30 31 Iterate over each node for single key addresses, and then over each node for 32 multisig addresses. 33 34 Repeat test, but with explicit address_type parameters passed to getnewaddress 35 and getrawchangeaddress: 36 - node0 and node3 send to p2sh. 37 - node1 sends to bech32. 38 - node2 sends to legacy. 39 40 As every node sends coins after receiving, this also 41 verifies that spending coins sent to all these address types works. 42 43 ## Change type test 44 45 Test that the nodes generate the correct change address type: 46 - node0 always uses a legacy change address. 47 - node1 uses a bech32 addresses for change if any destination address is bech32. 48 - node2 always uses a bech32 address for change 49 - node3 always uses a bech32 address for change 50 - node4 always uses p2sh/segwit output for change. 51 """ 52 53 from decimal import Decimal 54 import itertools 55 56 from test_framework.blocktools import COINBASE_MATURITY 57 from test_framework.test_framework import BitcoinTestFramework 58 from test_framework.descriptors import ( 59 descsum_create, 60 descsum_check, 61 ) 62 from test_framework.util import ( 63 assert_equal, 64 assert_greater_than, 65 assert_raises_rpc_error, 66 ) 67 68 class AddressTypeTest(BitcoinTestFramework): 69 def add_options(self, parser): 70 self.add_wallet_options(parser) 71 72 def set_test_params(self): 73 self.num_nodes = 6 74 self.extra_args = [ 75 ["-addresstype=legacy"], 76 ["-addresstype=p2sh-segwit"], 77 ["-addresstype=p2sh-segwit", "-changetype=bech32"], 78 ["-addresstype=bech32"], 79 ["-changetype=p2sh-segwit"], 80 [], 81 ] 82 # whitelist peers to speed up tx relay / mempool sync 83 self.noban_tx_relay = True 84 self.supports_cli = False 85 86 def skip_test_if_missing_module(self): 87 self.skip_if_no_wallet() 88 89 def setup_network(self): 90 self.setup_nodes() 91 92 # Fully mesh-connect nodes for faster mempool sync 93 for i, j in itertools.product(range(self.num_nodes), repeat=2): 94 if i > j: 95 self.connect_nodes(i, j) 96 self.sync_all() 97 98 def get_balances(self, key='trusted'): 99 """Return a list of balances.""" 100 return [self.nodes[i].getbalances()['mine'][key] for i in range(4)] 101 102 def test_address(self, node, address, multisig, typ): 103 """Run sanity checks on an address.""" 104 info = self.nodes[node].getaddressinfo(address) 105 assert self.nodes[node].validateaddress(address)['isvalid'] 106 assert_equal(info.get('solvable'), True) 107 108 if not multisig and typ == 'legacy': 109 # P2PKH 110 assert not info['isscript'] 111 assert not info['iswitness'] 112 assert 'pubkey' in info 113 elif not multisig and typ == 'p2sh-segwit': 114 # P2SH-P2WPKH 115 assert info['isscript'] 116 assert not info['iswitness'] 117 assert_equal(info['script'], 'witness_v0_keyhash') 118 assert 'pubkey' in info 119 elif not multisig and typ == 'bech32': 120 # P2WPKH 121 assert not info['isscript'] 122 assert info['iswitness'] 123 assert_equal(info['witness_version'], 0) 124 assert_equal(len(info['witness_program']), 40) 125 assert 'pubkey' in info 126 elif not multisig and typ == "bech32m": 127 # P2TR single sig 128 assert info["isscript"] 129 assert info["iswitness"] 130 assert_equal(info["witness_version"], 1) 131 assert_equal(len(info["witness_program"]), 64) 132 elif typ == 'legacy': 133 # P2SH-multisig 134 assert info['isscript'] 135 assert_equal(info['script'], 'multisig') 136 assert not info['iswitness'] 137 assert 'pubkeys' in info 138 elif typ == 'p2sh-segwit': 139 # P2SH-P2WSH-multisig 140 assert info['isscript'] 141 assert_equal(info['script'], 'witness_v0_scripthash') 142 assert not info['iswitness'] 143 assert info['embedded']['isscript'] 144 assert_equal(info['embedded']['script'], 'multisig') 145 assert info['embedded']['iswitness'] 146 assert_equal(info['embedded']['witness_version'], 0) 147 assert_equal(len(info['embedded']['witness_program']), 64) 148 assert 'pubkeys' in info['embedded'] 149 elif typ == 'bech32': 150 # P2WSH-multisig 151 assert info['isscript'] 152 assert_equal(info['script'], 'multisig') 153 assert info['iswitness'] 154 assert_equal(info['witness_version'], 0) 155 assert_equal(len(info['witness_program']), 64) 156 assert 'pubkeys' in info 157 else: 158 # Unknown type 159 assert False 160 161 def test_desc(self, node, address, multisig, typ, utxo): 162 """Run sanity checks on a descriptor reported by getaddressinfo.""" 163 info = self.nodes[node].getaddressinfo(address) 164 assert 'desc' in info 165 assert_equal(info['desc'], utxo['desc']) 166 assert self.nodes[node].validateaddress(address)['isvalid'] 167 168 # Use a ridiculously roundabout way to find the key origin info through 169 # the PSBT logic. However, this does test consistency between the PSBT reported 170 # fingerprints/paths and the descriptor logic. 171 psbt = self.nodes[node].createpsbt([{'txid':utxo['txid'], 'vout':utxo['vout']}],[{address:0.00010000}]) 172 psbt = self.nodes[node].walletprocesspsbt(psbt, False, "ALL", True) 173 decode = self.nodes[node].decodepsbt(psbt['psbt']) 174 key_descs = {} 175 for deriv in decode['inputs'][0]['bip32_derivs']: 176 assert_equal(len(deriv['master_fingerprint']), 8) 177 assert_equal(deriv['path'][0], 'm') 178 key_descs[deriv['pubkey']] = '[' + deriv['master_fingerprint'] + deriv['path'][1:].replace("'","h") + ']' + deriv['pubkey'] 179 180 # Verify the descriptor checksum against the Python implementation 181 assert descsum_check(info['desc']) 182 # Verify that stripping the checksum and recreating it using Python roundtrips 183 assert info['desc'] == descsum_create(info['desc'][:-9]) 184 # Verify that stripping the checksum and feeding it to getdescriptorinfo roundtrips 185 assert info['desc'] == self.nodes[0].getdescriptorinfo(info['desc'][:-9])['descriptor'] 186 assert_equal(info['desc'][-8:], self.nodes[0].getdescriptorinfo(info['desc'][:-9])['checksum']) 187 # Verify that keeping the checksum and feeding it to getdescriptorinfo roundtrips 188 assert info['desc'] == self.nodes[0].getdescriptorinfo(info['desc'])['descriptor'] 189 assert_equal(info['desc'][-8:], self.nodes[0].getdescriptorinfo(info['desc'])['checksum']) 190 191 if not multisig and typ == 'legacy': 192 # P2PKH 193 assert_equal(info['desc'], descsum_create("pkh(%s)" % key_descs[info['pubkey']])) 194 elif not multisig and typ == 'p2sh-segwit': 195 # P2SH-P2WPKH 196 assert_equal(info['desc'], descsum_create("sh(wpkh(%s))" % key_descs[info['pubkey']])) 197 elif not multisig and typ == 'bech32': 198 # P2WPKH 199 assert_equal(info['desc'], descsum_create("wpkh(%s)" % key_descs[info['pubkey']])) 200 elif typ == 'legacy': 201 # P2SH-multisig 202 assert_equal(info['desc'], descsum_create("sh(multi(2,%s,%s))" % (key_descs[info['pubkeys'][0]], key_descs[info['pubkeys'][1]]))) 203 elif typ == 'p2sh-segwit': 204 # P2SH-P2WSH-multisig 205 assert_equal(info['desc'], descsum_create("sh(wsh(multi(2,%s,%s)))" % (key_descs[info['embedded']['pubkeys'][0]], key_descs[info['embedded']['pubkeys'][1]]))) 206 elif typ == 'bech32': 207 # P2WSH-multisig 208 assert_equal(info['desc'], descsum_create("wsh(multi(2,%s,%s))" % (key_descs[info['pubkeys'][0]], key_descs[info['pubkeys'][1]]))) 209 else: 210 # Unknown type 211 assert False 212 213 def test_change_output_type(self, node_sender, destinations, expected_type): 214 txid = self.nodes[node_sender].sendmany(dummy="", amounts=dict.fromkeys(destinations, 0.001)) 215 tx = self.nodes[node_sender].gettransaction(txid=txid, verbose=True)['decoded'] 216 217 # Make sure the transaction has change: 218 assert_equal(len(tx["vout"]), len(destinations) + 1) 219 220 # Make sure the destinations are included, and remove them: 221 output_addresses = [vout['scriptPubKey']['address'] for vout in tx["vout"]] 222 change_addresses = [d for d in output_addresses if d not in destinations] 223 assert_equal(len(change_addresses), 1) 224 225 self.log.debug("Check if change address " + change_addresses[0] + " is " + expected_type) 226 self.test_address(node_sender, change_addresses[0], multisig=False, typ=expected_type) 227 228 def run_test(self): 229 # Mine 101 blocks on node5 to bring nodes out of IBD and make sure that 230 # no coinbases are maturing for the nodes-under-test during the test 231 self.generate(self.nodes[5], COINBASE_MATURITY + 1) 232 233 uncompressed_1 = "0496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858ee" 234 uncompressed_2 = "047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77" 235 compressed_1 = "0296b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52" 236 compressed_2 = "037211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073" 237 238 if not self.options.descriptors: 239 # Tests for addmultisigaddress's address type behavior is only for legacy wallets. 240 # Descriptor wallets do not have addmultsigaddress so these tests are not needed for those. 241 # addmultisigaddress with at least 1 uncompressed key should return a legacy address. 242 for node in range(4): 243 self.test_address(node, self.nodes[node].addmultisigaddress(2, [uncompressed_1, uncompressed_2])['address'], True, 'legacy') 244 self.test_address(node, self.nodes[node].addmultisigaddress(2, [compressed_1, uncompressed_2])['address'], True, 'legacy') 245 self.test_address(node, self.nodes[node].addmultisigaddress(2, [uncompressed_1, compressed_2])['address'], True, 'legacy') 246 # addmultisigaddress with all compressed keys should return the appropriate address type (even when the keys are not ours). 247 self.test_address(0, self.nodes[0].addmultisigaddress(2, [compressed_1, compressed_2])['address'], True, 'legacy') 248 self.test_address(1, self.nodes[1].addmultisigaddress(2, [compressed_1, compressed_2])['address'], True, 'p2sh-segwit') 249 self.test_address(2, self.nodes[2].addmultisigaddress(2, [compressed_1, compressed_2])['address'], True, 'p2sh-segwit') 250 self.test_address(3, self.nodes[3].addmultisigaddress(2, [compressed_1, compressed_2])['address'], True, 'bech32') 251 252 do_multisigs = [False] 253 if not self.options.descriptors: 254 do_multisigs.append(True) 255 256 for explicit_type, multisig, from_node in itertools.product([False, True], do_multisigs, range(4)): 257 address_type = None 258 if explicit_type and not multisig: 259 if from_node == 1: 260 address_type = 'bech32' 261 elif from_node == 0 or from_node == 3: 262 address_type = 'p2sh-segwit' 263 else: 264 address_type = 'legacy' 265 self.log.info("Sending from node {} ({}) with{} multisig using {}".format(from_node, self.extra_args[from_node], "" if multisig else "out", "default" if address_type is None else address_type)) 266 old_balances = self.get_balances() 267 self.log.debug("Old balances are {}".format(old_balances)) 268 to_send = (old_balances[from_node] / (COINBASE_MATURITY + 1)).quantize(Decimal("0.00000001")) 269 sends = {} 270 addresses = {} 271 272 self.log.debug("Prepare sends") 273 for n, to_node in enumerate(range(from_node, from_node + 4)): 274 to_node %= 4 275 change = False 276 if not multisig: 277 if from_node == to_node: 278 # When sending non-multisig to self, use getrawchangeaddress 279 address = self.nodes[to_node].getrawchangeaddress(address_type=address_type) 280 change = True 281 else: 282 address = self.nodes[to_node].getnewaddress(address_type=address_type) 283 else: 284 addr1 = self.nodes[to_node].getnewaddress() 285 addr2 = self.nodes[to_node].getnewaddress() 286 address = self.nodes[to_node].addmultisigaddress(2, [addr1, addr2])['address'] 287 288 # Do some sanity checking on the created address 289 if address_type is not None: 290 typ = address_type 291 elif to_node == 0: 292 typ = 'legacy' 293 elif to_node == 1 or (to_node == 2 and not change): 294 typ = 'p2sh-segwit' 295 else: 296 typ = 'bech32' 297 self.test_address(to_node, address, multisig, typ) 298 299 # Output entry 300 sends[address] = to_send * 10 * (1 + n) 301 addresses[to_node] = (address, typ) 302 303 self.log.debug("Sending: {}".format(sends)) 304 self.nodes[from_node].sendmany("", sends) 305 self.sync_mempools() 306 307 unconf_balances = self.get_balances('untrusted_pending') 308 self.log.debug("Check unconfirmed balances: {}".format(unconf_balances)) 309 assert_equal(unconf_balances[from_node], 0) 310 for n, to_node in enumerate(range(from_node + 1, from_node + 4)): 311 to_node %= 4 312 assert_equal(unconf_balances[to_node], to_send * 10 * (2 + n)) 313 314 # node5 collects fee and block subsidy to keep accounting simple 315 self.generate(self.nodes[5], 1) 316 317 # Verify that the receiving wallet contains a UTXO with the expected address, and expected descriptor 318 for n, to_node in enumerate(range(from_node, from_node + 4)): 319 to_node %= 4 320 found = False 321 for utxo in self.nodes[to_node].listunspent(): 322 if utxo['address'] == addresses[to_node][0]: 323 found = True 324 self.test_desc(to_node, addresses[to_node][0], multisig, addresses[to_node][1], utxo) 325 break 326 assert found 327 328 new_balances = self.get_balances() 329 self.log.debug("Check new balances: {}".format(new_balances)) 330 # We don't know what fee was set, so we can only check bounds on the balance of the sending node 331 assert_greater_than(new_balances[from_node], to_send * 10) 332 assert_greater_than(to_send * 11, new_balances[from_node]) 333 for n, to_node in enumerate(range(from_node + 1, from_node + 4)): 334 to_node %= 4 335 assert_equal(new_balances[to_node], old_balances[to_node] + to_send * 10 * (2 + n)) 336 337 # Get one p2sh/segwit address from node2 and two bech32 addresses from node3: 338 to_address_p2sh = self.nodes[2].getnewaddress() 339 to_address_bech32_1 = self.nodes[3].getnewaddress() 340 to_address_bech32_2 = self.nodes[3].getnewaddress() 341 342 # Fund node 4: 343 self.nodes[5].sendtoaddress(self.nodes[4].getnewaddress(), Decimal("1")) 344 self.generate(self.nodes[5], 1) 345 assert_equal(self.nodes[4].getbalance(), 1) 346 347 self.log.info("Nodes with addresstype=legacy never use a P2WPKH change output (unless changetype is set otherwise):") 348 self.test_change_output_type(0, [to_address_bech32_1], 'legacy') 349 350 self.log.info("Nodes with addresstype=p2sh-segwit match the change output") 351 self.test_change_output_type(1, [to_address_p2sh], 'p2sh-segwit') 352 self.test_change_output_type(1, [to_address_bech32_1], 'bech32') 353 self.test_change_output_type(1, [to_address_p2sh, to_address_bech32_1], 'bech32') 354 self.test_change_output_type(1, [to_address_bech32_1, to_address_bech32_2], 'bech32') 355 356 self.log.info("Nodes with change_type=bech32 always use a P2WPKH change output:") 357 self.test_change_output_type(2, [to_address_bech32_1], 'bech32') 358 self.test_change_output_type(2, [to_address_p2sh], 'bech32') 359 360 self.log.info("Nodes with addresstype=bech32 match the change output (unless changetype is set otherwise):") 361 self.test_change_output_type(3, [to_address_bech32_1], 'bech32') 362 self.test_change_output_type(3, [to_address_p2sh], 'p2sh-segwit') 363 364 self.log.info('getrawchangeaddress defaults to addresstype if -changetype is not set and argument is absent') 365 self.test_address(3, self.nodes[3].getrawchangeaddress(), multisig=False, typ='bech32') 366 367 self.log.info('test invalid address type arguments') 368 assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].addmultisigaddress, 2, [compressed_1, compressed_2], None, '') 369 assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getnewaddress, None, '') 370 assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getrawchangeaddress, '') 371 assert_raises_rpc_error(-5, "Unknown address type 'bech23'", self.nodes[3].getrawchangeaddress, 'bech23') 372 373 self.log.info("Nodes with changetype=p2sh-segwit never use a P2WPKH change output") 374 self.test_change_output_type(4, [to_address_bech32_1], 'p2sh-segwit') 375 self.test_address(4, self.nodes[4].getrawchangeaddress(), multisig=False, typ='p2sh-segwit') 376 self.log.info("Except for getrawchangeaddress if specified:") 377 self.test_address(4, self.nodes[4].getrawchangeaddress(), multisig=False, typ='p2sh-segwit') 378 self.test_address(4, self.nodes[4].getrawchangeaddress('bech32'), multisig=False, typ='bech32') 379 380 if self.options.descriptors: 381 self.log.info("Descriptor wallets have bech32m addresses") 382 self.test_address(4, self.nodes[4].getnewaddress("", "bech32m"), multisig=False, typ="bech32m") 383 self.test_address(4, self.nodes[4].getrawchangeaddress("bech32m"), multisig=False, typ="bech32m") 384 else: 385 self.log.info("Legacy wallets cannot make bech32m addresses") 386 assert_raises_rpc_error(-8, "Legacy wallets cannot provide bech32m addresses", self.nodes[0].getnewaddress, "", "bech32m") 387 assert_raises_rpc_error(-8, "Legacy wallets cannot provide bech32m addresses", self.nodes[0].getrawchangeaddress, "bech32m") 388 389 if __name__ == '__main__': 390 AddressTypeTest().main()