/ test / functional / wallet_address_types.py
wallet_address_types.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2017-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test that the wallet can send and receive using all combinations of address types.
  6  
  7  There are 5 nodes-under-test:
  8      - node0 uses legacy addresses
  9      - node1 uses p2sh/segwit addresses
 10      - node2 uses p2sh/segwit addresses and bech32 addresses for change
 11      - node3 uses bech32 addresses
 12      - node4 uses a p2sh/segwit addresses for change
 13  
 14  node5 exists to generate new blocks.
 15  
 16  ## Multisig address test
 17  
 18  Test that adding a multisig address with:
 19      - an uncompressed pubkey always gives a legacy address
 20      - only compressed pubkeys gives the an `-addresstype` address
 21  
 22  ## Sending to address types test
 23  
 24  A series of tests, iterating over node0-node4. In each iteration of the test, one node sends:
 25      - 10/101th of its balance to itself (using getrawchangeaddress for single key addresses)
 26      - 20/101th to the next node
 27      - 30/101th to the node after that
 28      - 40/101th to the remaining node
 29      - 1/101th remains as fee+change
 30  
 31  Iterate over each node for single key addresses, and then over each node for
 32  multisig addresses.
 33  
 34  Repeat test, but with explicit address_type parameters passed to getnewaddress
 35  and getrawchangeaddress:
 36      - node0 and node3 send to p2sh.
 37      - node1 sends to bech32.
 38      - node2 sends to legacy.
 39  
 40  As every node sends coins after receiving, this also
 41  verifies that spending coins sent to all these address types works.
 42  
 43  ## Change type test
 44  
 45  Test that the nodes generate the correct change address type:
 46      - node0 always uses a legacy change address.
 47      - node1 uses a bech32 addresses for change if any destination address is bech32.
 48      - node2 always uses a bech32 address for change
 49      - node3 always uses a bech32 address for change
 50      - node4 always uses p2sh/segwit output for change.
 51  """
 52  
 53  from decimal import Decimal
 54  import itertools
 55  
 56  from test_framework.blocktools import COINBASE_MATURITY
 57  from test_framework.test_framework import BitcoinTestFramework
 58  from test_framework.descriptors import (
 59      descsum_create,
 60      descsum_check,
 61  )
 62  from test_framework.util import (
 63      assert_equal,
 64      assert_greater_than,
 65      assert_raises_rpc_error,
 66  )
 67  
 68  class AddressTypeTest(BitcoinTestFramework):
 69      def set_test_params(self):
 70          self.num_nodes = 6
 71          self.extra_args = [
 72              ["-addresstype=legacy"],
 73              ["-addresstype=p2sh-segwit"],
 74              ["-addresstype=p2sh-segwit", "-changetype=bech32"],
 75              ["-addresstype=bech32"],
 76              ["-changetype=p2sh-segwit"],
 77              [],
 78          ]
 79          # whitelist peers to speed up tx relay / mempool sync
 80          self.noban_tx_relay = True
 81  
 82      def skip_test_if_missing_module(self):
 83          self.skip_if_no_wallet()
 84  
 85      def setup_network(self):
 86          self.setup_nodes()
 87  
 88          # Fully mesh-connect nodes for faster mempool sync
 89          for i, j in itertools.product(range(self.num_nodes), repeat=2):
 90              if i > j:
 91                  self.connect_nodes(i, j)
 92          self.sync_all()
 93  
 94      def get_balances(self, key='trusted'):
 95          """Return a list of balances."""
 96          return [self.nodes[i].getbalances()['mine'][key] for i in range(4)]
 97  
 98      def test_address(self, node, address, multisig, typ):
 99          """Run sanity checks on an address."""
100          info = self.nodes[node].getaddressinfo(address)
101          assert self.nodes[node].validateaddress(address)['isvalid']
102          assert_equal(info.get('solvable'), True)
103  
104          if not multisig and typ == 'legacy':
105              # P2PKH
106              assert not info['isscript']
107              assert not info['iswitness']
108              assert 'pubkey' in info
109          elif not multisig and typ == 'p2sh-segwit':
110              # P2SH-P2WPKH
111              assert info['isscript']
112              assert not info['iswitness']
113              assert_equal(info['script'], 'witness_v0_keyhash')
114              assert 'pubkey' in info
115          elif not multisig and typ == 'bech32':
116              # P2WPKH
117              assert not info['isscript']
118              assert info['iswitness']
119              assert_equal(info['witness_version'], 0)
120              assert_equal(len(info['witness_program']), 40)
121              assert 'pubkey' in info
122          elif not multisig and typ == "bech32m":
123              # P2TR single sig
124              assert info["isscript"]
125              assert info["iswitness"]
126              assert_equal(info["witness_version"], 1)
127              assert_equal(len(info["witness_program"]), 64)
128          elif typ == 'legacy':
129              # P2SH-multisig
130              assert info['isscript']
131              assert_equal(info['script'], 'multisig')
132              assert not info['iswitness']
133              assert 'pubkeys' in info
134          elif typ == 'p2sh-segwit':
135              # P2SH-P2WSH-multisig
136              assert info['isscript']
137              assert_equal(info['script'], 'witness_v0_scripthash')
138              assert not info['iswitness']
139              assert info['embedded']['isscript']
140              assert_equal(info['embedded']['script'], 'multisig')
141              assert info['embedded']['iswitness']
142              assert_equal(info['embedded']['witness_version'], 0)
143              assert_equal(len(info['embedded']['witness_program']), 64)
144              assert 'pubkeys' in info['embedded']
145          elif typ == 'bech32':
146              # P2WSH-multisig
147              assert info['isscript']
148              assert_equal(info['script'], 'multisig')
149              assert info['iswitness']
150              assert_equal(info['witness_version'], 0)
151              assert_equal(len(info['witness_program']), 64)
152              assert 'pubkeys' in info
153          else:
154              # Unknown type
155              assert False
156  
157      def test_desc(self, node, address, multisig, typ, utxo):
158          """Run sanity checks on a descriptor reported by getaddressinfo."""
159          info = self.nodes[node].getaddressinfo(address)
160          assert 'desc' in info
161          assert_equal(info['desc'], utxo['desc'])
162          assert self.nodes[node].validateaddress(address)['isvalid']
163  
164          # Use a ridiculously roundabout way to find the key origin info through
165          # the PSBT logic. However, this does test consistency between the PSBT reported
166          # fingerprints/paths and the descriptor logic.
167          psbt = self.nodes[node].createpsbt([{'txid':utxo['txid'], 'vout':utxo['vout']}],[{address:0.00010000}])
168          psbt = self.nodes[node].walletprocesspsbt(psbt, False, "ALL", True)
169          decode = self.nodes[node].decodepsbt(psbt['psbt'])
170          key_descs = {}
171          for deriv in decode['inputs'][0]['bip32_derivs']:
172              assert_equal(len(deriv['master_fingerprint']), 8)
173              assert_equal(deriv['path'][0], 'm')
174              key_descs[deriv['pubkey']] = '[' + deriv['master_fingerprint'] + deriv['path'][1:].replace("'","h") + ']' + deriv['pubkey']
175  
176          # Verify the descriptor checksum against the Python implementation
177          assert descsum_check(info['desc'])
178          # Verify that stripping the checksum and recreating it using Python roundtrips
179          assert_equal(info['desc'], descsum_create(info['desc'][:-9]))
180          # Verify that stripping the checksum and feeding it to getdescriptorinfo roundtrips
181          assert_equal(info['desc'], self.nodes[0].getdescriptorinfo(info['desc'][:-9])['descriptor'])
182          assert_equal(info['desc'][-8:], self.nodes[0].getdescriptorinfo(info['desc'][:-9])['checksum'])
183          # Verify that keeping the checksum and feeding it to getdescriptorinfo roundtrips
184          assert_equal(info['desc'], self.nodes[0].getdescriptorinfo(info['desc'])['descriptor'])
185          assert_equal(info['desc'][-8:], self.nodes[0].getdescriptorinfo(info['desc'])['checksum'])
186  
187          if not multisig and typ == 'legacy':
188              # P2PKH
189              assert_equal(info['desc'], descsum_create("pkh(%s)" % key_descs[info['pubkey']]))
190          elif not multisig and typ == 'p2sh-segwit':
191              # P2SH-P2WPKH
192              assert_equal(info['desc'], descsum_create("sh(wpkh(%s))" % key_descs[info['pubkey']]))
193          elif not multisig and typ == 'bech32':
194              # P2WPKH
195              assert_equal(info['desc'], descsum_create("wpkh(%s)" % key_descs[info['pubkey']]))
196          elif typ == 'legacy':
197              # P2SH-multisig
198              assert_equal(info['desc'], descsum_create("sh(multi(2,%s,%s))" % (key_descs[info['pubkeys'][0]], key_descs[info['pubkeys'][1]])))
199          elif typ == 'p2sh-segwit':
200              # P2SH-P2WSH-multisig
201              assert_equal(info['desc'], descsum_create("sh(wsh(multi(2,%s,%s)))" % (key_descs[info['embedded']['pubkeys'][0]], key_descs[info['embedded']['pubkeys'][1]])))
202          elif typ == 'bech32':
203              # P2WSH-multisig
204              assert_equal(info['desc'], descsum_create("wsh(multi(2,%s,%s))" % (key_descs[info['pubkeys'][0]], key_descs[info['pubkeys'][1]])))
205          else:
206              # Unknown type
207              assert False
208  
209      def test_change_output_type(self, node_sender, destinations, expected_type):
210          txid = self.nodes[node_sender].sendmany(dummy="", amounts=dict.fromkeys(destinations, 0.001))
211          tx = self.nodes[node_sender].gettransaction(txid=txid, verbose=True)['decoded']
212  
213          # Make sure the transaction has change:
214          assert_equal(len(tx["vout"]), len(destinations) + 1)
215  
216          # Make sure the destinations are included, and remove them:
217          output_addresses = [vout['scriptPubKey']['address'] for vout in tx["vout"]]
218          change_addresses = [d for d in output_addresses if d not in destinations]
219          assert_equal(len(change_addresses), 1)
220  
221          self.log.debug("Check if change address " + change_addresses[0] + " is " + expected_type)
222          self.test_address(node_sender, change_addresses[0], multisig=False, typ=expected_type)
223  
224      def run_test(self):
225          # Mine 101 blocks on node5 to bring nodes out of IBD and make sure that
226          # no coinbases are maturing for the nodes-under-test during the test
227          self.generate(self.nodes[5], COINBASE_MATURITY + 1)
228  
229          compressed_1 = "0296b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52"
230          compressed_2 = "037211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073"
231  
232          do_multisigs = [False]
233  
234          for explicit_type, multisig, from_node in itertools.product([False, True], do_multisigs, range(4)):
235              address_type = None
236              if explicit_type and not multisig:
237                  if from_node == 1:
238                      address_type = 'bech32'
239                  elif from_node == 0 or from_node == 3:
240                      address_type = 'p2sh-segwit'
241                  else:
242                      address_type = 'legacy'
243              self.log.info("Sending from node {} ({}) with{} multisig using {}".format(from_node, self.extra_args[from_node], "" if multisig else "out", "default" if address_type is None else address_type))
244              old_balances = self.get_balances()
245              self.log.debug("Old balances are {}".format(old_balances))
246              to_send = (old_balances[from_node] / (COINBASE_MATURITY + 1)).quantize(Decimal("0.00000001"))
247              sends = {}
248              addresses = {}
249  
250              self.log.debug("Prepare sends")
251              for n, to_node in enumerate(range(from_node, from_node + 4)):
252                  to_node %= 4
253                  change = False
254                  if not multisig:
255                      if from_node == to_node:
256                          # When sending non-multisig to self, use getrawchangeaddress
257                          address = self.nodes[to_node].getrawchangeaddress(address_type=address_type)
258                          change = True
259                      else:
260                          address = self.nodes[to_node].getnewaddress(address_type=address_type)
261                  else:
262                      pubkey1 = self.nodes[to_node].getaddressinfo(self.nodes[to_node].getnewaddress())["pubkey"]
263                      pubkey2 = self.nodes[to_node].getaddressinfo(self.nodes[to_node].getnewaddress())["pubkey"]
264                      ms = self.nodes[to_node].createmultisig(2, [pubkey1, pubkey2])
265                      import_res = self.nodes[to_node].importdescriptors([{"desc": ms["descriptor"], "timestamp": 0}])
266                      assert_equal(import_res[0]["success"], True)
267  
268                  # Do some sanity checking on the created address
269                  if address_type is not None:
270                      typ = address_type
271                  elif to_node == 0:
272                      typ = 'legacy'
273                  elif to_node == 1 or (to_node == 2 and not change):
274                      typ = 'p2sh-segwit'
275                  else:
276                      typ = 'bech32'
277                  self.test_address(to_node, address, multisig, typ)
278  
279                  # Output entry
280                  sends[address] = to_send * 10 * (1 + n)
281                  addresses[to_node] = (address, typ)
282  
283              self.log.debug("Sending: {}".format(sends))
284              self.nodes[from_node].sendmany("", sends)
285              self.sync_mempools()
286  
287              unconf_balances = self.get_balances('untrusted_pending')
288              self.log.debug("Check unconfirmed balances: {}".format(unconf_balances))
289              assert_equal(unconf_balances[from_node], 0)
290              for n, to_node in enumerate(range(from_node + 1, from_node + 4)):
291                  to_node %= 4
292                  assert_equal(unconf_balances[to_node], to_send * 10 * (2 + n))
293  
294              # node5 collects fee and block subsidy to keep accounting simple
295              self.generate(self.nodes[5], 1)
296  
297              # Verify that the receiving wallet contains a UTXO with the expected address, and expected descriptor
298              for n, to_node in enumerate(range(from_node, from_node + 4)):
299                  to_node %= 4
300                  found = False
301                  for utxo in self.nodes[to_node].listunspent():
302                      if utxo['address'] == addresses[to_node][0]:
303                          found = True
304                          self.test_desc(to_node, addresses[to_node][0], multisig, addresses[to_node][1], utxo)
305                          break
306                  assert found
307  
308              new_balances = self.get_balances()
309              self.log.debug("Check new balances: {}".format(new_balances))
310              # We don't know what fee was set, so we can only check bounds on the balance of the sending node
311              assert_greater_than(new_balances[from_node], to_send * 10)
312              assert_greater_than(to_send * 11, new_balances[from_node])
313              for n, to_node in enumerate(range(from_node + 1, from_node + 4)):
314                  to_node %= 4
315                  assert_equal(new_balances[to_node], old_balances[to_node] + to_send * 10 * (2 + n))
316  
317          # Get one p2sh/segwit address from node2 and two bech32 addresses from node3:
318          to_address_p2sh = self.nodes[2].getnewaddress()
319          to_address_bech32_1 = self.nodes[3].getnewaddress()
320          to_address_bech32_2 = self.nodes[3].getnewaddress()
321  
322          # Fund node 4:
323          self.nodes[5].sendtoaddress(self.nodes[4].getnewaddress(), Decimal("1"))
324          self.generate(self.nodes[5], 1)
325          assert_equal(self.nodes[4].getbalance(), 1)
326  
327          self.log.info("Nodes with addresstype=legacy never use a P2WPKH change output (unless changetype is set otherwise):")
328          self.test_change_output_type(0, [to_address_bech32_1], 'legacy')
329  
330          self.log.info("Nodes with addresstype=p2sh-segwit match the change output")
331          self.test_change_output_type(1, [to_address_p2sh], 'p2sh-segwit')
332          self.test_change_output_type(1, [to_address_bech32_1], 'bech32')
333          self.test_change_output_type(1, [to_address_p2sh, to_address_bech32_1], 'bech32')
334          self.test_change_output_type(1, [to_address_bech32_1, to_address_bech32_2], 'bech32')
335  
336          self.log.info("Nodes with change_type=bech32 always use a P2WPKH change output:")
337          self.test_change_output_type(2, [to_address_bech32_1], 'bech32')
338          self.test_change_output_type(2, [to_address_p2sh], 'bech32')
339  
340          self.log.info("Nodes with addresstype=bech32 match the change output (unless changetype is set otherwise):")
341          self.test_change_output_type(3, [to_address_bech32_1], 'bech32')
342          self.test_change_output_type(3, [to_address_p2sh], 'p2sh-segwit')
343  
344          self.log.info('getrawchangeaddress defaults to addresstype if -changetype is not set and argument is absent')
345          self.test_address(3, self.nodes[3].getrawchangeaddress(), multisig=False, typ='bech32')
346  
347          self.log.info('test invalid address type arguments')
348          assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].createmultisig, 2, [compressed_1, compressed_2], address_type="")
349          assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getnewaddress, None, '')
350          assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getrawchangeaddress, '')
351          assert_raises_rpc_error(-5, "Unknown address type 'bech23'", self.nodes[3].getrawchangeaddress, 'bech23')
352          assert_raises_rpc_error(-5, "Unknown address type 'bech23'", self.nodes[3].createwalletdescriptor, "bech23")
353  
354          self.log.info("Nodes with changetype=p2sh-segwit never use a P2WPKH change output")
355          self.test_change_output_type(4, [to_address_bech32_1], 'p2sh-segwit')
356          self.test_address(4, self.nodes[4].getrawchangeaddress(), multisig=False, typ='p2sh-segwit')
357          self.log.info("Except for getrawchangeaddress if specified:")
358          self.test_address(4, self.nodes[4].getrawchangeaddress(), multisig=False, typ='p2sh-segwit')
359          self.test_address(4, self.nodes[4].getrawchangeaddress('bech32'), multisig=False, typ='bech32')
360  
361          self.log.info("Descriptor wallets have bech32m addresses")
362          self.test_address(4, self.nodes[4].getnewaddress("", "bech32m"), multisig=False, typ="bech32m")
363          self.test_address(4, self.nodes[4].getrawchangeaddress("bech32m"), multisig=False, typ="bech32m")
364  
365  if __name__ == '__main__':
366      AddressTypeTest(__file__).main()