rpc_createmultisig.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2015-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test multisig RPCs""" 6 import decimal 7 import itertools 8 import json 9 import os 10 11 from test_framework.address import address_to_scriptpubkey 12 from test_framework.blocktools import COINBASE_MATURITY 13 from test_framework.authproxy import JSONRPCException 14 from test_framework.descriptors import descsum_create, drop_origins 15 from test_framework.key import ECPubKey 16 from test_framework.test_framework import BitcoinTestFramework 17 from test_framework.util import ( 18 assert_raises_rpc_error, 19 assert_equal, 20 ) 21 from test_framework.wallet_util import generate_keypair 22 from test_framework.wallet import ( 23 MiniWallet, 24 getnewdestination, 25 ) 26 27 class RpcCreateMultiSigTest(BitcoinTestFramework): 28 def add_options(self, parser): 29 self.add_wallet_options(parser) 30 31 def set_test_params(self): 32 self.setup_clean_chain = True 33 self.num_nodes = 3 34 self.supports_cli = False 35 36 def get_keys(self): 37 self.pub = [] 38 self.priv = [] 39 node0, node1, node2 = self.nodes 40 for _ in range(self.nkeys): 41 privkey, pubkey = generate_keypair(wif=True) 42 self.pub.append(pubkey.hex()) 43 self.priv.append(privkey) 44 if self.is_bdb_compiled(): 45 self.final = node2.getnewaddress() 46 else: 47 self.final = getnewdestination('bech32')[2] 48 49 def run_test(self): 50 node0, node1, node2 = self.nodes 51 self.wallet = MiniWallet(test_node=node0) 52 53 if self.is_bdb_compiled(): 54 self.import_deterministic_coinbase_privkeys() 55 self.check_addmultisigaddress_errors() 56 57 self.log.info('Generating blocks ...') 58 self.generate(self.wallet, 149) 59 60 self.moved = 0 61 for self.nkeys in [3, 5]: 62 for self.nsigs in [2, 3]: 63 for self.output_type in ["bech32", "p2sh-segwit", "legacy"]: 64 self.get_keys() 65 self.do_multisig() 66 if self.is_bdb_compiled(): 67 self.checkbalances() 68 69 # Test mixed compressed and uncompressed pubkeys 70 self.log.info('Mixed compressed and uncompressed multisigs are not allowed') 71 pk0, pk1, pk2 = [getnewdestination('bech32')[0].hex() for _ in range(3)] 72 73 # decompress pk2 74 pk_obj = ECPubKey() 75 pk_obj.set(bytes.fromhex(pk2)) 76 pk_obj.compressed = False 77 pk2 = pk_obj.get_bytes().hex() 78 79 if self.is_bdb_compiled(): 80 node0.createwallet(wallet_name='wmulti0', disable_private_keys=True) 81 wmulti0 = node0.get_wallet_rpc('wmulti0') 82 83 # Check all permutations of keys because order matters apparently 84 for keys in itertools.permutations([pk0, pk1, pk2]): 85 # Results should be the same as this legacy one 86 legacy_addr = node0.createmultisig(2, keys, 'legacy')['address'] 87 88 if self.is_bdb_compiled(): 89 result = wmulti0.addmultisigaddress(2, keys, '', 'legacy') 90 assert_equal(legacy_addr, result['address']) 91 assert 'warnings' not in result 92 93 # Generate addresses with the segwit types. These should all make legacy addresses 94 err_msg = ["Unable to make chosen address type, please ensure no uncompressed public keys are present."] 95 96 for addr_type in ['bech32', 'p2sh-segwit']: 97 result = self.nodes[0].createmultisig(nrequired=2, keys=keys, address_type=addr_type) 98 assert_equal(legacy_addr, result['address']) 99 assert_equal(result['warnings'], err_msg) 100 101 if self.is_bdb_compiled(): 102 result = wmulti0.addmultisigaddress(nrequired=2, keys=keys, address_type=addr_type) 103 assert_equal(legacy_addr, result['address']) 104 assert_equal(result['warnings'], err_msg) 105 106 self.log.info('Testing sortedmulti descriptors with BIP 67 test vectors') 107 with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/rpc_bip67.json'), encoding='utf-8') as f: 108 vectors = json.load(f) 109 110 for t in vectors: 111 key_str = ','.join(t['keys']) 112 desc = descsum_create('sh(sortedmulti(2,{}))'.format(key_str)) 113 assert_equal(self.nodes[0].deriveaddresses(desc)[0], t['address']) 114 sorted_key_str = ','.join(t['sorted_keys']) 115 sorted_key_desc = descsum_create('sh(multi(2,{}))'.format(sorted_key_str)) 116 assert_equal(self.nodes[0].deriveaddresses(sorted_key_desc)[0], t['address']) 117 118 # Check that bech32m is currently not allowed 119 assert_raises_rpc_error(-5, "createmultisig cannot create bech32m multisig addresses", self.nodes[0].createmultisig, 2, self.pub, "bech32m") 120 121 def check_addmultisigaddress_errors(self): 122 if self.options.descriptors: 123 return 124 self.log.info('Check that addmultisigaddress fails when the private keys are missing') 125 addresses = [self.nodes[1].getnewaddress(address_type='legacy') for _ in range(2)] 126 assert_raises_rpc_error(-5, 'no full public key for address', lambda: self.nodes[0].addmultisigaddress(nrequired=1, keys=addresses)) 127 for a in addresses: 128 # Importing all addresses should not change the result 129 self.nodes[0].importaddress(a) 130 assert_raises_rpc_error(-5, 'no full public key for address', lambda: self.nodes[0].addmultisigaddress(nrequired=1, keys=addresses)) 131 132 # Bech32m address type is disallowed for legacy wallets 133 pubs = [self.nodes[1].getaddressinfo(addr)["pubkey"] for addr in addresses] 134 assert_raises_rpc_error(-5, "Bech32m multisig addresses cannot be created with legacy wallets", self.nodes[0].addmultisigaddress, 2, pubs, "", "bech32m") 135 136 def checkbalances(self): 137 node0, node1, node2 = self.nodes 138 self.generate(node0, COINBASE_MATURITY) 139 140 bal0 = node0.getbalance() 141 bal1 = node1.getbalance() 142 bal2 = node2.getbalance() 143 balw = self.wallet.get_balance() 144 145 height = node0.getblockchaininfo()["blocks"] 146 assert 150 < height < 350 147 total = 149 * 50 + (height - 149 - 100) * 25 148 assert bal1 == 0 149 assert bal2 == self.moved 150 assert_equal(bal0 + bal1 + bal2 + balw, total) 151 152 def do_multisig(self): 153 node0, node1, node2 = self.nodes 154 155 if self.is_bdb_compiled(): 156 if 'wmulti' not in node1.listwallets(): 157 try: 158 node1.loadwallet('wmulti') 159 except JSONRPCException as e: 160 path = self.nodes[1].wallets_path / "wmulti" 161 if e.error['code'] == -18 and "Wallet file verification failed. Failed to load database path '{}'. Path does not exist.".format(path) in e.error['message']: 162 node1.createwallet(wallet_name='wmulti', disable_private_keys=True) 163 else: 164 raise 165 wmulti = node1.get_wallet_rpc('wmulti') 166 167 # Construct the expected descriptor 168 desc = 'multi({},{})'.format(self.nsigs, ','.join(self.pub)) 169 if self.output_type == 'legacy': 170 desc = 'sh({})'.format(desc) 171 elif self.output_type == 'p2sh-segwit': 172 desc = 'sh(wsh({}))'.format(desc) 173 elif self.output_type == 'bech32': 174 desc = 'wsh({})'.format(desc) 175 desc = descsum_create(desc) 176 177 msig = node2.createmultisig(self.nsigs, self.pub, self.output_type) 178 assert 'warnings' not in msig 179 madd = msig["address"] 180 mredeem = msig["redeemScript"] 181 assert_equal(desc, msig['descriptor']) 182 if self.output_type == 'bech32': 183 assert madd[0:4] == "bcrt" # actually a bech32 address 184 185 if self.is_bdb_compiled(): 186 # compare against addmultisigaddress 187 msigw = wmulti.addmultisigaddress(self.nsigs, self.pub, None, self.output_type) 188 maddw = msigw["address"] 189 mredeemw = msigw["redeemScript"] 190 assert_equal(desc, drop_origins(msigw['descriptor'])) 191 # addmultisigiaddress and createmultisig work the same 192 assert maddw == madd 193 assert mredeemw == mredeem 194 wmulti.unloadwallet() 195 196 spk = address_to_scriptpubkey(madd) 197 txid = self.wallet.send_to(from_node=self.nodes[0], scriptPubKey=spk, amount=1300)["txid"] 198 tx = node0.getrawtransaction(txid, True) 199 vout = [v["n"] for v in tx["vout"] if madd == v["scriptPubKey"]["address"]] 200 assert len(vout) == 1 201 vout = vout[0] 202 scriptPubKey = tx["vout"][vout]["scriptPubKey"]["hex"] 203 value = tx["vout"][vout]["value"] 204 prevtxs = [{"txid": txid, "vout": vout, "scriptPubKey": scriptPubKey, "redeemScript": mredeem, "amount": value}] 205 206 self.generate(node0, 1) 207 208 outval = value - decimal.Decimal("0.00001000") 209 rawtx = node2.createrawtransaction([{"txid": txid, "vout": vout}], [{self.final: outval}]) 210 211 prevtx_err = dict(prevtxs[0]) 212 del prevtx_err["redeemScript"] 213 214 assert_raises_rpc_error(-8, "Missing redeemScript/witnessScript", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err]) 215 216 # if witnessScript specified, all ok 217 prevtx_err["witnessScript"] = prevtxs[0]["redeemScript"] 218 node2.signrawtransactionwithkey(rawtx, self.priv[0:self.nsigs-1], [prevtx_err]) 219 220 # both specified, also ok 221 prevtx_err["redeemScript"] = prevtxs[0]["redeemScript"] 222 node2.signrawtransactionwithkey(rawtx, self.priv[0:self.nsigs-1], [prevtx_err]) 223 224 # redeemScript mismatch to witnessScript 225 prevtx_err["redeemScript"] = "6a" # OP_RETURN 226 assert_raises_rpc_error(-8, "redeemScript does not correspond to witnessScript", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err]) 227 228 # redeemScript does not match scriptPubKey 229 del prevtx_err["witnessScript"] 230 assert_raises_rpc_error(-8, "redeemScript/witnessScript does not match scriptPubKey", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err]) 231 232 # witnessScript does not match scriptPubKey 233 prevtx_err["witnessScript"] = prevtx_err["redeemScript"] 234 del prevtx_err["redeemScript"] 235 assert_raises_rpc_error(-8, "redeemScript/witnessScript does not match scriptPubKey", node2.signrawtransactionwithkey, rawtx, self.priv[0:self.nsigs-1], [prevtx_err]) 236 237 rawtx2 = node2.signrawtransactionwithkey(rawtx, self.priv[0:self.nsigs - 1], prevtxs) 238 rawtx3 = node2.signrawtransactionwithkey(rawtx2["hex"], [self.priv[-1]], prevtxs) 239 240 self.moved += outval 241 tx = node0.sendrawtransaction(rawtx3["hex"], 0) 242 blk = self.generate(node0, 1)[0] 243 assert tx in node0.getblock(blk)["tx"] 244 245 txinfo = node0.getrawtransaction(tx, True, blk) 246 self.log.info("n/m=%d/%d %s size=%d vsize=%d weight=%d" % (self.nsigs, self.nkeys, self.output_type, txinfo["size"], txinfo["vsize"], txinfo["weight"])) 247 248 249 if __name__ == '__main__': 250 RpcCreateMultiSigTest().main()