/ test / functional / rpc_createmultisig.py
rpc_createmultisig.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2015-2022 The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test multisig RPCs"""
  6  import decimal
  7  import itertools
  8  import json
  9  import os
 10  
 11  from test_framework.address import address_to_scriptpubkey
 12  from test_framework.blocktools import COINBASE_MATURITY
 13  from test_framework.authproxy import JSONRPCException
 14  from test_framework.descriptors import descsum_create, drop_origins
 15  from test_framework.key import ECPubKey
 16  from test_framework.test_framework import BitcoinTestFramework
 17  from test_framework.util import (
 18      assert_raises_rpc_error,
 19      assert_equal,
 20  )
 21  from test_framework.wallet_util import generate_keypair
 22  from test_framework.wallet import (
 23      MiniWallet,
 24      getnewdestination,
 25  )
 26  
 27  class RpcCreateMultiSigTest(BitcoinTestFramework):
 28      def add_options(self, parser):
 29          self.add_wallet_options(parser)
 30  
 31      def set_test_params(self):
 32          self.setup_clean_chain = True
 33          self.num_nodes = 3
 34          self.supports_cli = False
 35  
 36      def get_keys(self):
 37          self.pub = []
 38          self.priv = []
 39          node0, node1, node2 = self.nodes
 40          for _ in range(self.nkeys):
 41              privkey, pubkey = generate_keypair(wif=True)
 42              self.pub.append(pubkey.hex())
 43              self.priv.append(privkey)
 44          if self.is_bdb_compiled():
 45              self.final = node2.getnewaddress()
 46          else:
 47              self.final = getnewdestination('bech32')[2]
 48  
 49      def run_test(self):
 50          node0, node1, node2 = self.nodes
 51          self.wallet = MiniWallet(test_node=node0)
 52  
 53          if self.is_bdb_compiled():
 54              self.import_deterministic_coinbase_privkeys()
 55              self.check_addmultisigaddress_errors()
 56  
 57          self.log.info('Generating blocks ...')
 58          self.generate(self.wallet, 149)
 59  
 60          self.moved = 0
 61          for self.nkeys in [3, 5]:
 62              for self.nsigs in [2, 3]:
 63                  for self.output_type in ["bech32", "p2sh-segwit", "legacy"]:
 64                      self.get_keys()
 65                      self.do_multisig()
 66          if self.is_bdb_compiled():
 67              self.checkbalances()
 68  
 69          # Test mixed compressed and uncompressed pubkeys
 70          self.log.info('Mixed compressed and uncompressed multisigs are not allowed')
 71          pk0, pk1, pk2 = [getnewdestination('bech32')[0].hex() for _ in range(3)]
 72  
 73          # decompress pk2
 74          pk_obj = ECPubKey()
 75          pk_obj.set(bytes.fromhex(pk2))
 76          pk_obj.compressed = False
 77          pk2 = pk_obj.get_bytes().hex()
 78  
 79          if self.is_bdb_compiled():
 80              node0.createwallet(wallet_name='wmulti0', disable_private_keys=True)
 81              wmulti0 = node0.get_wallet_rpc('wmulti0')
 82  
 83          # Check all permutations of keys because order matters apparently
 84          for keys in itertools.permutations([pk0, pk1, pk2]):
 85              # Results should be the same as this legacy one
 86              legacy_addr = node0.createmultisig(2, keys, 'legacy')['address']
 87  
 88              if self.is_bdb_compiled():
 89                  result = wmulti0.addmultisigaddress(2, keys, '', 'legacy')
 90                  assert_equal(legacy_addr, result['address'])
 91                  assert 'warnings' not in result
 92  
 93              # Generate addresses with the segwit types. These should all make legacy addresses
 94              err_msg = ["Unable to make chosen address type, please ensure no uncompressed public keys are present."]
 95  
 96              for addr_type in ['bech32', 'p2sh-segwit']:
 97                  result = self.nodes[0].createmultisig(nrequired=2, keys=keys, address_type=addr_type)
 98                  assert_equal(legacy_addr, result['address'])
 99                  assert_equal(result['warnings'], err_msg)
100  
101                  if self.is_bdb_compiled():
102                      result = wmulti0.addmultisigaddress(nrequired=2, keys=keys, address_type=addr_type)
103                      assert_equal(legacy_addr, result['address'])
104                      assert_equal(result['warnings'], err_msg)
105  
106          self.log.info('Testing sortedmulti descriptors with BIP 67 test vectors')
107          with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/rpc_bip67.json'), encoding='utf-8') as f:
108              vectors = json.load(f)
109  
110          for t in vectors:
111              key_str = ','.join(t['keys'])
112              desc = descsum_create('sh(sortedmulti(2,{}))'.format(key_str))
113              assert_equal(self.nodes[0].deriveaddresses(desc)[0], t['address'])
114              sorted_key_str = ','.join(t['sorted_keys'])
115              sorted_key_desc = descsum_create('sh(multi(2,{}))'.format(sorted_key_str))
116              assert_equal(self.nodes[0].deriveaddresses(sorted_key_desc)[0], t['address'])
117  
118          # Check that bech32m is currently not allowed
119          assert_raises_rpc_error(-5, "createmultisig cannot create bech32m multisig addresses", self.nodes[0].createmultisig, 2, self.pub, "bech32m")
120  
121      def check_addmultisigaddress_errors(self):
122          if self.options.descriptors:
123              return
124          self.log.info('Check that addmultisigaddress fails when the private keys are missing')
125          addresses = [self.nodes[1].getnewaddress(address_type='legacy') for _ in range(2)]
126          assert_raises_rpc_error(-5, 'no full public key for address', lambda: self.nodes[0].addmultisigaddress(nrequired=1, keys=addresses))
127          for a in addresses:
128              # Importing all addresses should not change the result
129              self.nodes[0].importaddress(a)
130          assert_raises_rpc_error(-5, 'no full public key for address', lambda: self.nodes[0].addmultisigaddress(nrequired=1, keys=addresses))
131  
132          # Bech32m address type is disallowed for legacy wallets
133          pubs = [self.nodes[1].getaddressinfo(addr)["pubkey"] for addr in addresses]
134          assert_raises_rpc_error(-5, "Bech32m multisig addresses cannot be created with legacy wallets", self.nodes[0].addmultisigaddress, 2, pubs, "", "bech32m")
135  
136      def checkbalances(self):
137          node0, node1, node2 = self.nodes
138          self.generate(node0, COINBASE_MATURITY)
139  
140          bal0 = node0.getbalance()
141          bal1 = node1.getbalance()
142          bal2 = node2.getbalance()
143          balw = self.wallet.get_balance()
144  
145          height = node0.getblockchaininfo()["blocks"]
146          assert 150 < height < 350
147          total = 149 * 50 + (height - 149 - 100) * 25
148          assert bal1 == 0
149          assert bal2 == self.moved
150          assert_equal(bal0 + bal1 + bal2 + balw, total)
151  
152      def do_multisig(self):
153          node0, node1, node2 = self.nodes
154  
155          if self.is_bdb_compiled():
156              if 'wmulti' not in node1.listwallets():
157                  try:
158                      node1.loadwallet('wmulti')
159                  except JSONRPCException as e:
160                      path = self.nodes[1].wallets_path / "wmulti"
161                      if e.error['code'] == -18 and "Wallet file verification failed. Failed to load database path '{}'. Path does not exist.".format(path) in e.error['message']:
162                          node1.createwallet(wallet_name='wmulti', disable_private_keys=True)
163                      else:
164                          raise
165              wmulti = node1.get_wallet_rpc('wmulti')
166  
167          # Construct the expected descriptor
168          desc = 'multi({},{})'.format(self.nsigs, ','.join(self.pub))
169          if self.output_type == 'legacy':
170              desc = 'sh({})'.format(desc)
171          elif self.output_type == 'p2sh-segwit':
172              desc = 'sh(wsh({}))'.format(desc)
173          elif self.output_type == 'bech32':
174              desc = 'wsh({})'.format(desc)
175          desc = descsum_create(desc)
176  
177          msig = node2.createmultisig(self.nsigs, self.pub, self.output_type)
178          assert 'warnings' not in msig
179          madd = msig["address"]
180          mredeem = msig["redeemScript"]
181          assert_equal(desc, msig['descriptor'])
182          if self.output_type == 'bech32':
183              assert madd[0:4] == "bcrt"  # actually a bech32 address
184  
185          if self.is_bdb_compiled():
186              # compare against addmultisigaddress
187              msigw = wmulti.addmultisigaddress(self.nsigs, self.pub, None, self.output_type)
188              maddw = msigw["address"]
189              mredeemw = msigw["redeemScript"]
190              assert_equal(desc, drop_origins(msigw['descriptor']))
191              # addmultisigiaddress and createmultisig work the same
192              assert maddw == madd
193              assert mredeemw == mredeem
194              wmulti.unloadwallet()
195  
196          spk = address_to_scriptpubkey(madd)
197          txid = self.wallet.send_to(from_node=self.nodes[0], scriptPubKey=spk, amount=1300)["txid"]
198          tx = node0.getrawtransaction(txid, True)
199          vout = [v["n"] for v in tx["vout"] if madd == v["scriptPubKey"]["address"]]
200          assert len(vout) == 1
201          vout = vout[0]
202          scriptPubKey = tx["vout"][vout]["scriptPubKey"]["hex"]
203          value = tx["vout"][vout]["value"]
204          prevtxs = [{"txid": txid, "vout": vout, "scriptPubKey": scriptPubKey, "redeemScript": mredeem, "amount": value}]
205  
206          self.generate(node0, 1)
207  
208          outval = value - decimal.Decimal("0.00001000")
209          rawtx = node2.createrawtransaction([{"txid": txid, "vout": vout}], [{self.final: outval}])
210  
211          prevtx_err = dict(prevtxs[0])
212          del prevtx_err["redeemScript"]
213  
214          assert_raises_rpc_error(-8, "Missing redeemScript/witnessScript", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err])
215  
216          # if witnessScript specified, all ok
217          prevtx_err["witnessScript"] = prevtxs[0]["redeemScript"]
218          node2.signrawtransactionwithkey(rawtx, self.priv[0:self.nsigs-1], [prevtx_err])
219  
220          # both specified, also ok
221          prevtx_err["redeemScript"] = prevtxs[0]["redeemScript"]
222          node2.signrawtransactionwithkey(rawtx, self.priv[0:self.nsigs-1], [prevtx_err])
223  
224          # redeemScript mismatch to witnessScript
225          prevtx_err["redeemScript"] = "6a" # OP_RETURN
226          assert_raises_rpc_error(-8, "redeemScript does not correspond to witnessScript", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err])
227  
228          # redeemScript does not match scriptPubKey
229          del prevtx_err["witnessScript"]
230          assert_raises_rpc_error(-8, "redeemScript/witnessScript does not match scriptPubKey", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err])
231  
232          # witnessScript does not match scriptPubKey
233          prevtx_err["witnessScript"] = prevtx_err["redeemScript"]
234          del prevtx_err["redeemScript"]
235          assert_raises_rpc_error(-8, "redeemScript/witnessScript does not match scriptPubKey", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err])
236  
237          rawtx2 = node2.signrawtransactionwithkey(rawtx, self.priv[0:self.nsigs - 1], prevtxs)
238          rawtx3 = node2.signrawtransactionwithkey(rawtx2["hex"], [self.priv[-1]], prevtxs)
239  
240          self.moved += outval
241          tx = node0.sendrawtransaction(rawtx3["hex"], 0)
242          blk = self.generate(node0, 1)[0]
243          assert tx in node0.getblock(blk)["tx"]
244  
245          txinfo = node0.getrawtransaction(tx, True, blk)
246          self.log.info("n/m=%d/%d %s size=%d vsize=%d weight=%d" % (self.nsigs, self.nkeys, self.output_type, txinfo["size"], txinfo["vsize"], txinfo["weight"]))
247  
248  
249  if __name__ == '__main__':
250      RpcCreateMultiSigTest().main()