wallet_musig.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2024-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 6 import re 7 8 from test_framework.descriptors import descsum_create 9 from test_framework.key import H_POINT 10 from test_framework.script import hash160 11 from test_framework.test_framework import BitcoinTestFramework 12 from test_framework.util import ( 13 assert_equal, 14 assert_greater_than, 15 ) 16 17 PRIVKEY_RE = re.compile(r"^tr\((.+?)/.+\)#.{8}$") 18 PUBKEY_RE = re.compile(r"^tr\((\[.+?\].+?)/.+\)#.{8}$") 19 ORIGIN_PATH_RE = re.compile(r"^\[\w{8}(/.*)\].*$") 20 MULTIPATH_TWO_RE = re.compile(r"<(\d+);(\d+)>") 21 MUSIG_RE = re.compile(r"musig\((.*?)\)") 22 PLACEHOLDER_RE = re.compile(r"\$\d") 23 24 class WalletMuSigTest(BitcoinTestFramework): 25 wallet_num = 0 26 def set_test_params(self): 27 self.num_nodes = 1 28 29 def skip_test_if_missing_module(self): 30 self.skip_if_no_wallet() 31 32 # Create wallets and extract keys 33 def create_wallets_and_keys_from_pattern(self, pat): 34 wallets = [] 35 keys = [] 36 37 for musig in MUSIG_RE.findall(pat): 38 for placeholder in PLACEHOLDER_RE.findall(musig): 39 wallet_index = int(placeholder[1:]) 40 if wallet_index < len(wallets): 41 continue 42 43 wallet_name = f"musig_{self.wallet_num}" 44 self.wallet_num += 1 45 self.nodes[0].createwallet(wallet_name) 46 wallet = self.nodes[0].get_wallet_rpc(wallet_name) 47 wallets.append(wallet) 48 49 for priv_desc in wallet.listdescriptors(True)["descriptors"]: 50 desc = priv_desc["desc"] 51 if not desc.startswith("tr("): 52 continue 53 privkey = PRIVKEY_RE.search(desc).group(1) 54 break 55 for pub_desc in wallet.listdescriptors()["descriptors"]: 56 desc = pub_desc["desc"] 57 if not desc.startswith("tr("): 58 continue 59 pubkey = PUBKEY_RE.search(desc).group(1) 60 # Since the pubkey is derived from the private key that we have, we need 61 # to extract and insert the origin path from the pubkey as well. 62 privkey += ORIGIN_PATH_RE.search(pubkey).group(1) 63 break 64 keys.append((privkey, pubkey)) 65 66 return wallets, keys 67 68 # Construct and import each wallet's musig descriptor that 69 # contains the private key from that wallet and pubkeys of the others 70 def construct_and_import_musig_descriptor_in_wallets(self, pat, wallets, keys, only_one_musig_wallet=False): 71 for i, wallet in enumerate(wallets): 72 if only_one_musig_wallet and i > 0: 73 continue 74 desc = pat 75 for j, (priv, pub) in enumerate(keys): 76 if j == i: 77 desc = desc.replace(f"${i}", priv) 78 else: 79 desc = desc.replace(f"${j}", pub) 80 81 import_descs = [{ 82 "desc": descsum_create(desc), 83 "active": True, 84 "timestamp": "now", 85 }] 86 87 res = wallet.importdescriptors(import_descs) 88 for r in res: 89 assert_equal(r["success"], True) 90 91 def setup_musig_scenario(self, pat): 92 wallets, keys = self.create_wallets_and_keys_from_pattern(pat) 93 self.construct_and_import_musig_descriptor_in_wallets(pat, wallets, keys, only_one_musig_wallet=False) 94 95 # Fund address 96 addr = wallets[0].getnewaddress(address_type="bech32m") 97 for wallet in wallets[1:]: 98 assert_equal(addr, wallet.getnewaddress(address_type="bech32m")) 99 100 self.def_wallet.sendtoaddress(addr, 10) 101 self.generate(self.nodes[0], 1) 102 103 # Create PSBT 104 utxo = wallets[0].listunspent()[0] 105 psbt = wallets[0].walletcreatefundedpsbt( 106 outputs=[{self.def_wallet.getnewaddress(): 5}], 107 inputs=[utxo], 108 change_type="bech32m", 109 changePosition=1 110 )["psbt"] 111 112 return wallets, psbt 113 114 def test_failure_case_1(self, comment, pat): 115 self.log.info(f"Testing {comment}") 116 wallets, psbt = self.setup_musig_scenario(pat) 117 118 # Only 2 out of 3 participants provide nonces 119 nonce_psbts = [] 120 for i in range(2): 121 proc = wallets[i].walletprocesspsbt(psbt=psbt) 122 nonce_psbts.append(proc["psbt"]) 123 124 comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts) 125 126 # Attempt to create partial sigs. This should not complete due to the 127 # missing nonce. 128 for wallet in wallets[:2]: 129 proc = wallet.walletprocesspsbt(psbt=comb_nonce_psbt) 130 assert_equal(proc["complete"], False) 131 # No partial sigs are created 132 dec = self.nodes[0].decodepsbt(proc["psbt"]) 133 # There are still only two nonces 134 assert_equal(len(dec["inputs"][0].get("musig2_pubnonces", [])), 2) 135 136 def test_failure_case_2(self, comment, pat): 137 self.log.info(f"Testing {comment}") 138 wallets, psbt = self.setup_musig_scenario(pat) 139 nonce_psbts = [w.walletprocesspsbt(psbt=psbt)["psbt"] for w in wallets] 140 comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts) 141 142 # Only 2 out of 3 provide partial sigs 143 psig_psbts = [] 144 for i in range(2): 145 proc = wallets[i].walletprocesspsbt(psbt=comb_nonce_psbt) 146 psig_psbts.append(proc["psbt"]) 147 148 comb_psig_psbt = self.nodes[0].combinepsbt(psig_psbts) 149 150 # Finalization fails due to missing partial sig 151 finalized = self.nodes[0].finalizepsbt(comb_psig_psbt) 152 assert_equal(finalized["complete"], False) 153 154 # Still only two partial sigs in combined PSBT 155 dec = self.nodes[0].decodepsbt(comb_psig_psbt) 156 assert_equal(len(dec["inputs"][0]["musig2_partial_sigs"]), 2) 157 158 def test_failure_case_3(self, comment, pat): 159 self.log.info(f"Testing {comment}") 160 wallets, psbt = self.setup_musig_scenario(pat) 161 nonce_psbts = [w.walletprocesspsbt(psbt=psbt)["psbt"] for w in wallets] 162 comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts) 163 164 finalized = self.nodes[0].finalizepsbt(comb_nonce_psbt) 165 assert_equal(finalized["complete"], False) 166 167 dec = self.nodes[0].decodepsbt(comb_nonce_psbt) 168 assert "musig2_pubnonces" in dec["inputs"][0] 169 assert "musig2_partial_sigs" not in dec["inputs"][0] 170 171 def test_success_case(self, comment, pattern, sighash_type=None, scriptpath=False, nosign_wallets=None, only_one_musig_wallet=False): 172 self.log.info(f"Testing {comment}") 173 has_internal = MULTIPATH_TWO_RE.search(pattern) is not None 174 175 pat = pattern.replace("$H", H_POINT) 176 wallets, keys = self.create_wallets_and_keys_from_pattern(pat) 177 self.construct_and_import_musig_descriptor_in_wallets(pat, wallets, keys, only_one_musig_wallet) 178 179 expected_pubnonces = 0 180 expected_partial_sigs = 0 181 for musig in MUSIG_RE.findall(pat): 182 musig_partial_sigs = 0 183 for placeholder in PLACEHOLDER_RE.findall(musig): 184 wallet_index = int(placeholder[1:]) 185 if nosign_wallets is None or wallet_index not in nosign_wallets: 186 expected_pubnonces += 1 187 else: 188 musig_partial_sigs = None 189 if musig_partial_sigs is not None: 190 musig_partial_sigs += 1 191 if wallet_index < len(wallets): 192 continue 193 if musig_partial_sigs is not None: 194 expected_partial_sigs += musig_partial_sigs 195 196 # Check that the wallets agree on the same musig address 197 addr = None 198 change_addr = None 199 for i, wallet in enumerate(wallets): 200 if only_one_musig_wallet and i > 0: 201 continue 202 if addr is None: 203 addr = wallet.getnewaddress(address_type="bech32m") 204 else: 205 assert_equal(addr, wallet.getnewaddress(address_type="bech32m")) 206 if has_internal: 207 if change_addr is None: 208 change_addr = wallet.getrawchangeaddress(address_type="bech32m") 209 else: 210 assert_equal(change_addr, wallet.getrawchangeaddress(address_type="bech32m")) 211 212 # Fund that address 213 self.def_wallet.sendtoaddress(addr, 10) 214 self.generate(self.nodes[0], 1) 215 216 # Spend that UTXO 217 utxo = None 218 for i, wallet in enumerate(wallets): 219 if only_one_musig_wallet and i > 0: 220 continue 221 if utxo is None: 222 utxo = wallet.listunspent()[0] 223 else: 224 assert_equal(utxo, wallet.listunspent()[0]) 225 psbt = wallets[0].walletcreatefundedpsbt(outputs=[{self.def_wallet.getnewaddress(): 5}], inputs=[utxo], change_type="bech32m", changePosition=1, locktime=self.nodes[0].getblockcount())["psbt"] 226 227 dec_psbt = self.nodes[0].decodepsbt(psbt) 228 assert_equal(len(dec_psbt["inputs"]), 1) 229 assert_equal(len(dec_psbt["inputs"][0]["musig2_participant_pubkeys"]), pattern.count("musig(")) 230 if has_internal: 231 assert_equal(len(dec_psbt["outputs"][1]["musig2_participant_pubkeys"]), pattern.count("musig(")) 232 233 # Check all participant pubkeys in the input and change output 234 psbt_maps = [dec_psbt["inputs"][0]] 235 if has_internal: 236 psbt_maps.append(dec_psbt["outputs"][1]) 237 for psbt_map in psbt_maps: 238 part_pks = set() 239 for agg in psbt_map["musig2_participant_pubkeys"]: 240 for part_pub in agg["participant_pubkeys"]: 241 part_pks.add(part_pub[2:]) 242 # Check that there are as many participants as we expected 243 assert_equal(len(part_pks), len(keys)) 244 # Check that each participant has a derivation path 245 for deriv_path in psbt_map["taproot_bip32_derivs"]: 246 if deriv_path["pubkey"] in part_pks: 247 part_pks.remove(deriv_path["pubkey"]) 248 assert_equal(len(part_pks), 0) 249 250 # Add pubnonces 251 nonce_psbts = [] 252 for i, wallet in enumerate(wallets): 253 if nosign_wallets and i in nosign_wallets: 254 continue 255 proc = wallet.walletprocesspsbt(psbt=psbt, sighashtype=sighash_type) 256 assert_equal(proc["complete"], False) 257 nonce_psbts.append(proc["psbt"]) 258 259 comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts) 260 261 dec_psbt = self.nodes[0].decodepsbt(comb_nonce_psbt) 262 assert_equal(len(dec_psbt["inputs"][0]["musig2_pubnonces"]), expected_pubnonces) 263 for pn in dec_psbt["inputs"][0]["musig2_pubnonces"]: 264 pubkey = pn["aggregate_pubkey"][2:] 265 if "pkh" in pattern or "pk_h" in pattern: 266 pubkey = hash160(bytes.fromhex(pubkey)).hex() 267 if pubkey in dec_psbt["inputs"][0]["witness_utxo"]["scriptPubKey"]["hex"]: 268 continue 269 elif "taproot_scripts" in dec_psbt["inputs"][0]: 270 for leaf_scripts in dec_psbt["inputs"][0]["taproot_scripts"]: 271 if pubkey in leaf_scripts["script"]: 272 break 273 else: 274 assert False, "Aggregate pubkey for pubnonce not seen as output key, or in any scripts" 275 else: 276 assert False, "Aggregate pubkey for pubnonce not seen as output key or internal key" 277 278 # Add partial sigs 279 psig_psbts = [] 280 for i, wallet in enumerate(wallets): 281 if nosign_wallets and i in nosign_wallets: 282 continue 283 proc = wallet.walletprocesspsbt(psbt=comb_nonce_psbt, sighashtype=sighash_type) 284 assert_equal(proc["complete"], False) 285 psig_psbts.append(proc["psbt"]) 286 287 comb_psig_psbt = self.nodes[0].combinepsbt(psig_psbts) 288 289 dec_psbt = self.nodes[0].decodepsbt(comb_psig_psbt) 290 assert_equal(len(dec_psbt["inputs"][0]["musig2_partial_sigs"]), expected_partial_sigs) 291 for ps in dec_psbt["inputs"][0]["musig2_partial_sigs"]: 292 pubkey = ps["aggregate_pubkey"][2:] 293 if "pkh" in pattern or "pk_h" in pattern: 294 pubkey = hash160(bytes.fromhex(pubkey)).hex() 295 if pubkey in dec_psbt["inputs"][0]["witness_utxo"]["scriptPubKey"]["hex"]: 296 continue 297 elif "taproot_scripts" in dec_psbt["inputs"][0]: 298 for leaf_scripts in dec_psbt["inputs"][0]["taproot_scripts"]: 299 if pubkey in leaf_scripts["script"]: 300 break 301 else: 302 assert False, "Aggregate pubkey for partial sig not seen as output key or in any scripts" 303 else: 304 assert False, "Aggregate pubkey for partial sig not seen as output key" 305 306 # Non-participant aggregates partial sigs and send 307 finalized = self.nodes[0].finalizepsbt(psbt=comb_psig_psbt, extract=False) 308 assert_equal(finalized["complete"], True) 309 witness = self.nodes[0].decodepsbt(finalized["psbt"])["inputs"][0]["final_scriptwitness"] 310 if scriptpath: 311 assert_greater_than(len(witness), 1) 312 else: 313 assert_equal(len(witness), 1) 314 finalized = self.nodes[0].finalizepsbt(comb_psig_psbt) 315 assert "hex" in finalized 316 self.nodes[0].sendrawtransaction(finalized["hex"]) 317 318 def run_test(self): 319 self.def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) 320 321 self.test_success_case("rawtr(musig(keys/*))", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))") 322 self.test_success_case("rawtr(musig(keys/*)) with ALL|ANYONECANPAY", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))", "ALL|ANYONECANPAY") 323 self.test_success_case("tr(musig(keys/*)) no multipath", "tr(musig($0/0/*,$1/1/*,$2/2/*))") 324 self.test_success_case("tr(musig(keys/*)) 2 index multipath", "tr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))") 325 self.test_success_case("tr(musig(keys/*)) 3 index multipath", "tr(musig($0/<0;1;2>/*,$1/<1;2;3>/*,$2/<2;3;4>/*))") 326 self.test_success_case("rawtr(musig/*)", "rawtr(musig($0,$1,$2)/<0;1>/*)") 327 self.test_success_case("tr(musig/*)", "tr(musig($0,$1,$2)/<0;1>/*)") 328 self.test_success_case("rawtr(musig(keys/*)) without all wallets importing", "rawtr(musig($0/<0;1>/*,$1/<0;1>/*,$2/<0;1>/*))", only_one_musig_wallet=True) 329 self.test_success_case("tr(musig(keys/*)) without all wallets importing", "tr(musig($0/<0;1>/*,$1/<0;1>/*,$2/<0;1>/*))", only_one_musig_wallet=True) 330 self.test_success_case("tr(H, pk(musig(keys/*)))", "tr($H,pk(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*)))", scriptpath=True) 331 self.test_success_case("tr(H,pk(musig/*))", "tr($H,pk(musig($0,$1,$2)/<0;1>/*))", scriptpath=True) 332 self.test_success_case("tr(H,{pk(musig/*), pk(musig/*)})", "tr($H,{pk(musig($0,$1,$2)/<0;1>/*),pk(musig($3,$4,$5)/0/*)})", scriptpath=True) 333 self.test_success_case("tr(H,{pk(musig/*), pk(same keys different musig/*)})", "tr($H,{pk(musig($0,$1,$2)/<0;1>/*),pk(musig($1,$2)/0/*)})", scriptpath=True) 334 self.test_success_case("tr(musig/*,{pk(partial keys diff musig-1/*),pk(partial keys diff musig-2/*)})}", "tr(musig($0,$1,$2)/<3;4>/*,{pk(musig($0,$1)/<5;6>/*),pk(musig($1,$2)/7/*)})") 335 self.test_success_case("tr(musig/*,{pk(partial keys diff musig-1/*),pk(partial keys diff musig-2/*)})} script-path", "tr(musig($0,$1,$2)/<3;4>/*,{pk(musig($0,$1)/<5;6>/*),pk(musig($1,$2)/7/*)})", scriptpath=True, nosign_wallets=[0]) 336 self.test_success_case("tr(H,and(pk(musig/*),after(1)))", "tr($H,and_v(v:pk(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True) 337 self.test_success_case("tr(H,and(pk_k(musig/*),after(1)))", "tr($H,and_v(vc:pk_k(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True) 338 self.test_success_case("tr(H,and(pkh(musig/*),after(1)))", "tr($H,and_v(v:pkh(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True) 339 self.test_success_case("tr(H,and(pk_h(musig/*),after(1)))", "tr($H,and_v(vc:pk_h(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True) 340 self.test_success_case("tr(H,{and(pk(musig/*),after(1)),and(pk(musig/*),after(1))})", "tr($H,{and_v(v:pk(musig($0,$2)/0/*),after(1)),and_v(v:pk(musig($1,$2)/0/*),after(1))})", scriptpath=True) 341 342 self.test_failure_case_1("missing participant nonce", "tr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))") 343 self.test_failure_case_2("insufficient partial signatures", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))") 344 self.test_failure_case_3("finalize without partial sigs", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*))") 345 346 if __name__ == '__main__': 347 WalletMuSigTest(__file__).main()