/ test / functional / wallet_musig.py
wallet_musig.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2024-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  
  6  import re
  7  
  8  from test_framework.descriptors import descsum_create
  9  from test_framework.key import H_POINT
 10  from test_framework.script import hash160
 11  from test_framework.test_framework import BitcoinTestFramework
 12  from test_framework.util import (
 13      assert_equal,
 14      assert_greater_than,
 15  )
 16  
 17  PRIVKEY_RE = re.compile(r"^tr\((.+?)/.+\)#.{8}$")
 18  PUBKEY_RE = re.compile(r"^tr\((\[.+?\].+?)/.+\)#.{8}$")
 19  ORIGIN_PATH_RE = re.compile(r"^\[\w{8}(/.*)\].*$")
 20  MULTIPATH_TWO_RE = re.compile(r"<(\d+);(\d+)>")
 21  MUSIG_RE = re.compile(r"musig\((.*?)\)")
 22  PLACEHOLDER_RE = re.compile(r"\$\d")
 23  
 24  class WalletMuSigTest(BitcoinTestFramework):
 25      wallet_num = 0
 26      def set_test_params(self):
 27          self.num_nodes = 1
 28  
 29      def skip_test_if_missing_module(self):
 30          self.skip_if_no_wallet()
 31  
 32      # Create wallets and extract keys
 33      def create_wallets_and_keys_from_pattern(self, pat):
 34          wallets = []
 35          keys = []
 36  
 37          for musig in MUSIG_RE.findall(pat):
 38              for placeholder in PLACEHOLDER_RE.findall(musig):
 39                  wallet_index = int(placeholder[1:])
 40                  if wallet_index < len(wallets):
 41                      continue
 42  
 43                  wallet_name = f"musig_{self.wallet_num}"
 44                  self.wallet_num += 1
 45                  self.nodes[0].createwallet(wallet_name)
 46                  wallet = self.nodes[0].get_wallet_rpc(wallet_name)
 47                  wallets.append(wallet)
 48  
 49                  for priv_desc in wallet.listdescriptors(True)["descriptors"]:
 50                      desc = priv_desc["desc"]
 51                      if not desc.startswith("tr("):
 52                          continue
 53                      privkey = PRIVKEY_RE.search(desc).group(1)
 54                      break
 55                  for pub_desc in wallet.listdescriptors()["descriptors"]:
 56                      desc = pub_desc["desc"]
 57                      if not desc.startswith("tr("):
 58                          continue
 59                      pubkey = PUBKEY_RE.search(desc).group(1)
 60                      # Since the pubkey is derived from the private key that we have, we need
 61                      # to extract and insert the origin path from the pubkey as well.
 62                      privkey += ORIGIN_PATH_RE.search(pubkey).group(1)
 63                      break
 64                  keys.append((privkey, pubkey))
 65  
 66          return wallets, keys
 67  
 68      # Construct and import each wallet's musig descriptor that
 69      # contains the private key from that wallet and pubkeys of the others
 70      def construct_and_import_musig_descriptor_in_wallets(self, pat, wallets, keys, only_one_musig_wallet=False):
 71          for i, wallet in enumerate(wallets):
 72              if only_one_musig_wallet and i > 0:
 73                  continue
 74              desc = pat
 75              for j, (priv, pub) in enumerate(keys):
 76                  if j == i:
 77                      desc = desc.replace(f"${i}", priv)
 78                  else:
 79                      desc = desc.replace(f"${j}", pub)
 80  
 81              import_descs = [{
 82                  "desc": descsum_create(desc),
 83                  "active": True,
 84                  "timestamp": "now",
 85              }]
 86  
 87              res = wallet.importdescriptors(import_descs)
 88              for r in res:
 89                  assert_equal(r["success"], True)
 90  
 91      def setup_musig_scenario(self, pat):
 92          wallets, keys = self.create_wallets_and_keys_from_pattern(pat)
 93          self.construct_and_import_musig_descriptor_in_wallets(pat, wallets, keys, only_one_musig_wallet=False)
 94  
 95          # Fund address
 96          addr = wallets[0].getnewaddress(address_type="bech32m")
 97          for wallet in wallets[1:]:
 98              assert_equal(addr, wallet.getnewaddress(address_type="bech32m"))
 99  
100          self.def_wallet.sendtoaddress(addr, 10)
101          self.generate(self.nodes[0], 1)
102  
103          # Create PSBT
104          utxo = wallets[0].listunspent()[0]
105          psbt = wallets[0].walletcreatefundedpsbt(
106              outputs=[{self.def_wallet.getnewaddress(): 5}],
107              inputs=[utxo],
108              change_type="bech32m",
109              changePosition=1
110          )["psbt"]
111  
112          return wallets, psbt
113  
114      def test_failure_case_1(self, comment, pat):
115          self.log.info(f"Testing {comment}")
116          wallets, psbt = self.setup_musig_scenario(pat)
117  
118          # Only 2 out of 3 participants provide nonces
119          nonce_psbts = []
120          for i in range(2):
121              proc = wallets[i].walletprocesspsbt(psbt=psbt)
122              nonce_psbts.append(proc["psbt"])
123  
124          comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts)
125  
126          # Attempt to create partial sigs. This should not complete due to the
127          # missing nonce.
128          for wallet in wallets[:2]:
129              proc = wallet.walletprocesspsbt(psbt=comb_nonce_psbt)
130              assert_equal(proc["complete"], False)
131              # No partial sigs are created
132              dec = self.nodes[0].decodepsbt(proc["psbt"])
133              # There are still only two nonces
134              assert_equal(len(dec["inputs"][0].get("musig2_pubnonces", [])), 2)
135  
136      def test_failure_case_2(self, comment, pat):
137          self.log.info(f"Testing {comment}")
138          wallets, psbt = self.setup_musig_scenario(pat)
139          nonce_psbts = [w.walletprocesspsbt(psbt=psbt)["psbt"] for w in wallets]
140          comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts)
141  
142          # Only 2 out of 3 provide partial sigs
143          psig_psbts = []
144          for i in range(2):
145              proc = wallets[i].walletprocesspsbt(psbt=comb_nonce_psbt)
146              psig_psbts.append(proc["psbt"])
147  
148          comb_psig_psbt = self.nodes[0].combinepsbt(psig_psbts)
149  
150          # Finalization fails due to missing partial sig
151          finalized = self.nodes[0].finalizepsbt(comb_psig_psbt)
152          assert_equal(finalized["complete"], False)
153  
154          # Still only two partial sigs in combined PSBT
155          dec = self.nodes[0].decodepsbt(comb_psig_psbt)
156          assert_equal(len(dec["inputs"][0]["musig2_partial_sigs"]), 2)
157  
158      def test_failure_case_3(self, comment, pat):
159          self.log.info(f"Testing {comment}")
160          wallets, psbt = self.setup_musig_scenario(pat)
161          nonce_psbts = [w.walletprocesspsbt(psbt=psbt)["psbt"] for w in wallets]
162          comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts)
163  
164          finalized = self.nodes[0].finalizepsbt(comb_nonce_psbt)
165          assert_equal(finalized["complete"], False)
166  
167          dec = self.nodes[0].decodepsbt(comb_nonce_psbt)
168          assert "musig2_pubnonces" in dec["inputs"][0]
169          assert "musig2_partial_sigs" not in dec["inputs"][0]
170  
171      def test_success_case(self, comment, pattern, sighash_type=None, scriptpath=False, nosign_wallets=None, only_one_musig_wallet=False):
172          self.log.info(f"Testing {comment}")
173          has_internal = MULTIPATH_TWO_RE.search(pattern) is not None
174  
175          pat = pattern.replace("$H", H_POINT)
176          wallets, keys = self.create_wallets_and_keys_from_pattern(pat)
177          self.construct_and_import_musig_descriptor_in_wallets(pat, wallets, keys, only_one_musig_wallet)
178  
179          expected_pubnonces = 0
180          expected_partial_sigs = 0
181          for musig in MUSIG_RE.findall(pat):
182              musig_partial_sigs = 0
183              for placeholder in PLACEHOLDER_RE.findall(musig):
184                  wallet_index = int(placeholder[1:])
185                  if nosign_wallets is None or wallet_index not in nosign_wallets:
186                      expected_pubnonces += 1
187                  else:
188                      musig_partial_sigs = None
189                  if musig_partial_sigs is not None:
190                      musig_partial_sigs += 1
191                  if wallet_index < len(wallets):
192                      continue
193              if musig_partial_sigs is not None:
194                  expected_partial_sigs += musig_partial_sigs
195  
196          # Check that the wallets agree on the same musig address
197          addr = None
198          change_addr = None
199          for i, wallet in enumerate(wallets):
200              if only_one_musig_wallet and i > 0:
201                  continue
202              if addr is None:
203                  addr = wallet.getnewaddress(address_type="bech32m")
204              else:
205                  assert_equal(addr, wallet.getnewaddress(address_type="bech32m"))
206              if has_internal:
207                  if change_addr is None:
208                      change_addr = wallet.getrawchangeaddress(address_type="bech32m")
209                  else:
210                      assert_equal(change_addr, wallet.getrawchangeaddress(address_type="bech32m"))
211  
212          # Fund that address
213          self.def_wallet.sendtoaddress(addr, 10)
214          self.generate(self.nodes[0], 1)
215  
216          # Spend that UTXO
217          utxo = None
218          for i, wallet in enumerate(wallets):
219              if only_one_musig_wallet and i > 0:
220                  continue
221              if utxo is None:
222                  utxo = wallet.listunspent()[0]
223              else:
224                  assert_equal(utxo, wallet.listunspent()[0])
225          psbt = wallets[0].walletcreatefundedpsbt(outputs=[{self.def_wallet.getnewaddress(): 5}], inputs=[utxo], change_type="bech32m", changePosition=1, locktime=self.nodes[0].getblockcount())["psbt"]
226  
227          dec_psbt = self.nodes[0].decodepsbt(psbt)
228          assert_equal(len(dec_psbt["inputs"]), 1)
229          assert_equal(len(dec_psbt["inputs"][0]["musig2_participant_pubkeys"]), pattern.count("musig("))
230          if has_internal:
231              assert_equal(len(dec_psbt["outputs"][1]["musig2_participant_pubkeys"]), pattern.count("musig("))
232  
233          # Check all participant pubkeys in the input and change output
234          psbt_maps = [dec_psbt["inputs"][0]]
235          if has_internal:
236              psbt_maps.append(dec_psbt["outputs"][1])
237          for psbt_map in psbt_maps:
238              part_pks = set()
239              for agg in psbt_map["musig2_participant_pubkeys"]:
240                  for part_pub in agg["participant_pubkeys"]:
241                      part_pks.add(part_pub[2:])
242              # Check that there are as many participants as we expected
243              assert_equal(len(part_pks), len(keys))
244              # Check that each participant has a derivation path
245              for deriv_path in psbt_map["taproot_bip32_derivs"]:
246                  if deriv_path["pubkey"] in part_pks:
247                      part_pks.remove(deriv_path["pubkey"])
248              assert_equal(len(part_pks), 0)
249  
250          # Add pubnonces
251          nonce_psbts = []
252          for i, wallet in enumerate(wallets):
253              if nosign_wallets and i in nosign_wallets:
254                  continue
255              proc = wallet.walletprocesspsbt(psbt=psbt, sighashtype=sighash_type)
256              assert_equal(proc["complete"], False)
257              nonce_psbts.append(proc["psbt"])
258  
259          comb_nonce_psbt = self.nodes[0].combinepsbt(nonce_psbts)
260  
261          dec_psbt = self.nodes[0].decodepsbt(comb_nonce_psbt)
262          assert_equal(len(dec_psbt["inputs"][0]["musig2_pubnonces"]), expected_pubnonces)
263          for pn in dec_psbt["inputs"][0]["musig2_pubnonces"]:
264              pubkey = pn["aggregate_pubkey"][2:]
265              if "pkh" in pattern or "pk_h" in pattern:
266                  pubkey = hash160(bytes.fromhex(pubkey)).hex()
267              if pubkey in dec_psbt["inputs"][0]["witness_utxo"]["scriptPubKey"]["hex"]:
268                  continue
269              elif "taproot_scripts" in dec_psbt["inputs"][0]:
270                  for leaf_scripts in dec_psbt["inputs"][0]["taproot_scripts"]:
271                      if pubkey in leaf_scripts["script"]:
272                          break
273                  else:
274                      assert False, "Aggregate pubkey for pubnonce not seen as output key, or in any scripts"
275              else:
276                  assert False, "Aggregate pubkey for pubnonce not seen as output key or internal key"
277  
278          # Add partial sigs
279          psig_psbts = []
280          for i, wallet in enumerate(wallets):
281              if nosign_wallets and i in nosign_wallets:
282                  continue
283              proc = wallet.walletprocesspsbt(psbt=comb_nonce_psbt, sighashtype=sighash_type)
284              assert_equal(proc["complete"], False)
285              psig_psbts.append(proc["psbt"])
286  
287          comb_psig_psbt = self.nodes[0].combinepsbt(psig_psbts)
288  
289          dec_psbt = self.nodes[0].decodepsbt(comb_psig_psbt)
290          assert_equal(len(dec_psbt["inputs"][0]["musig2_partial_sigs"]), expected_partial_sigs)
291          for ps in dec_psbt["inputs"][0]["musig2_partial_sigs"]:
292              pubkey = ps["aggregate_pubkey"][2:]
293              if "pkh" in pattern or "pk_h" in pattern:
294                  pubkey = hash160(bytes.fromhex(pubkey)).hex()
295              if pubkey in dec_psbt["inputs"][0]["witness_utxo"]["scriptPubKey"]["hex"]:
296                  continue
297              elif "taproot_scripts" in dec_psbt["inputs"][0]:
298                  for leaf_scripts in dec_psbt["inputs"][0]["taproot_scripts"]:
299                      if pubkey in leaf_scripts["script"]:
300                          break
301                  else:
302                      assert False, "Aggregate pubkey for partial sig not seen as output key or in any scripts"
303              else:
304                  assert False, "Aggregate pubkey for partial sig not seen as output key"
305  
306          # Non-participant aggregates partial sigs and send
307          finalized = self.nodes[0].finalizepsbt(psbt=comb_psig_psbt, extract=False)
308          assert_equal(finalized["complete"], True)
309          witness = self.nodes[0].decodepsbt(finalized["psbt"])["inputs"][0]["final_scriptwitness"]
310          if scriptpath:
311              assert_greater_than(len(witness), 1)
312          else:
313              assert_equal(len(witness), 1)
314          finalized = self.nodes[0].finalizepsbt(comb_psig_psbt)
315          assert "hex" in finalized
316          self.nodes[0].sendrawtransaction(finalized["hex"])
317  
318      def run_test(self):
319          self.def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
320  
321          self.test_success_case("rawtr(musig(keys/*))", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))")
322          self.test_success_case("rawtr(musig(keys/*)) with ALL|ANYONECANPAY", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))", "ALL|ANYONECANPAY")
323          self.test_success_case("tr(musig(keys/*)) no multipath", "tr(musig($0/0/*,$1/1/*,$2/2/*))")
324          self.test_success_case("tr(musig(keys/*)) 2 index multipath", "tr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))")
325          self.test_success_case("tr(musig(keys/*)) 3 index multipath", "tr(musig($0/<0;1;2>/*,$1/<1;2;3>/*,$2/<2;3;4>/*))")
326          self.test_success_case("rawtr(musig/*)", "rawtr(musig($0,$1,$2)/<0;1>/*)")
327          self.test_success_case("tr(musig/*)", "tr(musig($0,$1,$2)/<0;1>/*)")
328          self.test_success_case("rawtr(musig(keys/*)) without all wallets importing", "rawtr(musig($0/<0;1>/*,$1/<0;1>/*,$2/<0;1>/*))", only_one_musig_wallet=True)
329          self.test_success_case("tr(musig(keys/*)) without all wallets importing", "tr(musig($0/<0;1>/*,$1/<0;1>/*,$2/<0;1>/*))", only_one_musig_wallet=True)
330          self.test_success_case("tr(H, pk(musig(keys/*)))", "tr($H,pk(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*)))", scriptpath=True)
331          self.test_success_case("tr(H,pk(musig/*))", "tr($H,pk(musig($0,$1,$2)/<0;1>/*))", scriptpath=True)
332          self.test_success_case("tr(H,{pk(musig/*), pk(musig/*)})", "tr($H,{pk(musig($0,$1,$2)/<0;1>/*),pk(musig($3,$4,$5)/0/*)})", scriptpath=True)
333          self.test_success_case("tr(H,{pk(musig/*), pk(same keys different musig/*)})", "tr($H,{pk(musig($0,$1,$2)/<0;1>/*),pk(musig($1,$2)/0/*)})", scriptpath=True)
334          self.test_success_case("tr(musig/*,{pk(partial keys diff musig-1/*),pk(partial keys diff musig-2/*)})}", "tr(musig($0,$1,$2)/<3;4>/*,{pk(musig($0,$1)/<5;6>/*),pk(musig($1,$2)/7/*)})")
335          self.test_success_case("tr(musig/*,{pk(partial keys diff musig-1/*),pk(partial keys diff musig-2/*)})} script-path", "tr(musig($0,$1,$2)/<3;4>/*,{pk(musig($0,$1)/<5;6>/*),pk(musig($1,$2)/7/*)})", scriptpath=True, nosign_wallets=[0])
336          self.test_success_case("tr(H,and(pk(musig/*),after(1)))", "tr($H,and_v(v:pk(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True)
337          self.test_success_case("tr(H,and(pk_k(musig/*),after(1)))", "tr($H,and_v(vc:pk_k(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True)
338          self.test_success_case("tr(H,and(pkh(musig/*),after(1)))", "tr($H,and_v(v:pkh(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True)
339          self.test_success_case("tr(H,and(pk_h(musig/*),after(1)))", "tr($H,and_v(vc:pk_h(musig($0,$1,$2)/<0;1>/*),after(1)))", scriptpath=True)
340          self.test_success_case("tr(H,{and(pk(musig/*),after(1)),and(pk(musig/*),after(1))})", "tr($H,{and_v(v:pk(musig($0,$2)/0/*),after(1)),and_v(v:pk(musig($1,$2)/0/*),after(1))})", scriptpath=True)
341  
342          self.test_failure_case_1("missing participant nonce", "tr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))")
343          self.test_failure_case_2("insufficient partial signatures", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*,$2/<2;3>/*))")
344          self.test_failure_case_3("finalize without partial sigs", "rawtr(musig($0/<0;1>/*,$1/<1;2>/*))")
345  
346  if __name__ == '__main__':
347      WalletMuSigTest(__file__).main()