/ test / functional / wallet_signer.py
wallet_signer.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2017-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test external signer.
  6  
  7  Verify that a bitcoind node can use an external signer command
  8  See also rpc_signer.py for tests without wallet context.
  9  """
 10  import os
 11  import sys
 12  
 13  from test_framework.test_framework import BitcoinTestFramework
 14  from test_framework.util import (
 15      assert_equal,
 16      assert_greater_than,
 17      assert_raises_rpc_error,
 18  )
 19  
 20  
 21  class WalletSignerTest(BitcoinTestFramework):
 22      def mock_signer_path(self):
 23          path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'signer.py')
 24          return sys.executable + " " + path
 25  
 26      def mock_no_connected_signer_path(self):
 27          path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'no_signer.py')
 28          return sys.executable + " " + path
 29  
 30      def mock_invalid_signer_path(self):
 31          path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'invalid_signer.py')
 32          return sys.executable + " " + path
 33  
 34      def mock_multi_signers_path(self):
 35          path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'mocks', 'multi_signers.py')
 36          return sys.executable + " " + path
 37  
 38      def set_test_params(self):
 39          self.num_nodes = 2
 40  
 41          self.extra_args = [
 42              [],
 43              [f"-signer={self.mock_signer_path()}", '-keypool=10'],
 44          ]
 45  
 46      def skip_test_if_missing_module(self):
 47          self.skip_if_no_external_signer()
 48          self.skip_if_no_wallet()
 49  
 50      def set_mock_result(self, node, res):
 51          with open(os.path.join(node.cwd, "mock_result"), "w") as f:
 52              f.write(res)
 53  
 54      def clear_mock_result(self, node):
 55          os.remove(os.path.join(node.cwd, "mock_result"))
 56  
 57      def run_test(self):
 58          self.test_valid_signer()
 59          self.test_disconnected_signer()
 60          self.restart_node(1, [f"-signer={self.mock_invalid_signer_path()}", "-keypool=10"])
 61          self.test_invalid_signer()
 62          self.restart_node(1, [f"-signer={self.mock_multi_signers_path()}", "-keypool=10"])
 63          self.test_multiple_signers()
 64  
 65      def test_valid_signer(self):
 66          self.log.debug(f"-signer={self.mock_signer_path()}")
 67  
 68          # Create new wallets for an external signer.
 69          # disable_private_keys and descriptors must be true:
 70          assert_raises_rpc_error(-4, "Private keys must be disabled when using an external signer", self.nodes[1].createwallet, wallet_name='not_hww', disable_private_keys=False, external_signer=True)
 71          self.nodes[1].createwallet(wallet_name='hww', disable_private_keys=True, external_signer=True)
 72          hww = self.nodes[1].get_wallet_rpc('hww')
 73          assert_equal(hww.getwalletinfo()["external_signer"], True)
 74  
 75          # Flag can't be set afterwards (could be added later for non-blank descriptor based watch-only wallets)
 76          self.nodes[1].createwallet(wallet_name='not_hww', disable_private_keys=True, external_signer=False)
 77          not_hww = self.nodes[1].get_wallet_rpc('not_hww')
 78          assert_equal(not_hww.getwalletinfo()["external_signer"], False)
 79          assert_raises_rpc_error(-8, "Wallet flag is immutable: external_signer", not_hww.setwalletflag, "external_signer", True)
 80  
 81  
 82          self.set_mock_result(self.nodes[1], '0 {"invalid json"}')
 83          assert_raises_rpc_error(-1, 'Unable to parse JSON',
 84              self.nodes[1].createwallet, wallet_name='hww2', disable_private_keys=True, external_signer=True
 85          )
 86          self.clear_mock_result(self.nodes[1])
 87  
 88          assert_equal(hww.getwalletinfo()["keypoolsize"], 40)
 89  
 90          address1 = hww.getnewaddress(address_type="bech32")
 91          assert_equal(address1, "bcrt1qm90ugl4d48jv8n6e5t9ln6t9zlpm5th68x4f8g")
 92          address_info = hww.getaddressinfo(address1)
 93          assert_equal(address_info['solvable'], True)
 94          assert_equal(address_info['ismine'], True)
 95          assert_equal(address_info['hdkeypath'], "m/84h/1h/0h/0/0")
 96  
 97          address2 = hww.getnewaddress(address_type="p2sh-segwit")
 98          assert_equal(address2, "2N2gQKzjUe47gM8p1JZxaAkTcoHPXV6YyVp")
 99          address_info = hww.getaddressinfo(address2)
100          assert_equal(address_info['solvable'], True)
101          assert_equal(address_info['ismine'], True)
102          assert_equal(address_info['hdkeypath'], "m/49h/1h/0h/0/0")
103  
104          address3 = hww.getnewaddress(address_type="legacy")
105          assert_equal(address3, "n1LKejAadN6hg2FrBXoU1KrwX4uK16mco9")
106          address_info = hww.getaddressinfo(address3)
107          assert_equal(address_info['solvable'], True)
108          assert_equal(address_info['ismine'], True)
109          assert_equal(address_info['hdkeypath'], "m/44h/1h/0h/0/0")
110  
111          address4 = hww.getnewaddress(address_type="bech32m")
112          assert_equal(address4, "bcrt1phw4cgpt6cd30kz9k4wkpwm872cdvhss29jga2xpmftelhqll62ms4e9sqj")
113          address_info = hww.getaddressinfo(address4)
114          assert_equal(address_info['solvable'], True)
115          assert_equal(address_info['ismine'], True)
116          assert_equal(address_info['hdkeypath'], "m/86h/1h/0h/0/0")
117  
118          self.log.info('Test walletdisplayaddress')
119          for address in [address1, address2, address3]:
120              result = hww.walletdisplayaddress(address)
121              assert_equal(result, {"address": address})
122  
123          # Handle error thrown by script
124          self.set_mock_result(self.nodes[1], "2")
125          assert_raises_rpc_error(-1, 'RunCommandParseJSON error',
126              hww.walletdisplayaddress, address1
127          )
128          self.clear_mock_result(self.nodes[1])
129  
130          # Returned address MUST match:
131          address_fail = hww.getnewaddress(address_type="bech32")
132          assert_equal(address_fail, "bcrt1ql7zg7ukh3dwr25ex2zn9jse926f27xy2jz58tm")
133          assert_raises_rpc_error(-1, 'Signer echoed unexpected address wrong_address',
134              hww.walletdisplayaddress, address_fail
135          )
136  
137          self.log.info('Prepare mock PSBT')
138          self.nodes[0].sendtoaddress(address4, 1)
139          self.generate(self.nodes[0], 1)
140  
141          # Load private key into wallet to generate a signed PSBT for the mock
142          self.nodes[1].createwallet(wallet_name="mock", disable_private_keys=False, blank=True)
143          mock_wallet = self.nodes[1].get_wallet_rpc("mock")
144          assert mock_wallet.getwalletinfo()['private_keys_enabled']
145  
146          result = mock_wallet.importdescriptors([{
147              "desc": "tr([00000001/86h/1h/0']tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0/*)#7ew68cn8",
148              "timestamp": 0,
149              "range": [0,1],
150              "internal": False,
151              "active": True
152          },
153          {
154              "desc": "tr([00000001/86h/1h/0']tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/1/*)#0dtm6drl",
155              "timestamp": 0,
156              "range": [0, 0],
157              "internal": True,
158              "active": True
159          }])
160          assert_equal(result[0], {'success': True})
161          assert_equal(result[1], {'success': True})
162          assert_equal(mock_wallet.getwalletinfo()["txcount"], 1)
163          dest = self.nodes[0].getnewaddress(address_type='bech32')
164          mock_psbt = mock_wallet.walletcreatefundedpsbt([], {dest:0.5}, 0, {'replaceable': True}, True)['psbt']
165          mock_psbt_signed = mock_wallet.walletprocesspsbt(psbt=mock_psbt, sign=True, sighashtype="ALL", bip32derivs=True)
166          mock_tx = mock_psbt_signed["hex"]
167          assert mock_wallet.testmempoolaccept([mock_tx])[0]["allowed"]
168  
169          assert_equal(hww.getwalletinfo()["txcount"], 1)
170  
171          assert hww.testmempoolaccept([mock_tx])[0]["allowed"]
172  
173          with open(os.path.join(self.nodes[1].cwd, "mock_psbt"), "w") as f:
174              f.write(mock_psbt_signed["psbt"])
175  
176          self.log.info('Test send using hww1')
177  
178          # Don't broadcast transaction yet so the RPC returns the raw hex
179          res = hww.send(outputs={dest:0.5},add_to_wallet=False)
180          assert res["complete"]
181          assert_equal(res["hex"], mock_tx)
182  
183          self.log.info('Test sendall using hww1')
184  
185          res = hww.sendall(recipients=[{dest:0.5}, hww.getrawchangeaddress()], add_to_wallet=False)
186          assert res["complete"]
187          assert_equal(res["hex"], mock_tx)
188          # Broadcast transaction so we can bump the fee
189          hww.sendrawtransaction(res["hex"])
190  
191          self.log.info('Prepare fee bumped mock PSBT')
192  
193          # Now that the transaction is broadcast, bump fee in mock wallet:
194          orig_tx_id = res["txid"]
195          mock_psbt_bumped = mock_wallet.psbtbumpfee(orig_tx_id)["psbt"]
196          mock_psbt_bumped_signed = mock_wallet.walletprocesspsbt(psbt=mock_psbt_bumped, sign=True, sighashtype="ALL", bip32derivs=True)
197  
198          with open(os.path.join(self.nodes[1].cwd, "mock_psbt"), "w") as f:
199              f.write(mock_psbt_bumped_signed["psbt"])
200  
201          self.log.info('Test bumpfee using hww1')
202  
203          # Bump fee
204          res = hww.bumpfee(orig_tx_id)
205          assert_greater_than(res["fee"], res["origfee"])
206          assert_equal(res["errors"], [])
207  
208  
209      def test_disconnected_signer(self):
210          self.log.info('Test disconnected external signer')
211  
212          # First create a wallet with the signer connected
213          self.nodes[1].createwallet(wallet_name='hww_disconnect', disable_private_keys=True, external_signer=True)
214          hww = self.nodes[1].get_wallet_rpc('hww_disconnect')
215          assert_equal(hww.getwalletinfo()["external_signer"], True)
216  
217          # Fund wallet
218          self.nodes[0].sendtoaddress(hww.getnewaddress(address_type="bech32m"), 1)
219          self.generate(self.nodes[0], 1)
220  
221          # Restart node with no signer connected
222          self.log.debug(f"-signer={self.mock_no_connected_signer_path()}")
223          self.restart_node(1, [f"-signer={self.mock_no_connected_signer_path()}", "-keypool=10"])
224          self.nodes[1].loadwallet('hww_disconnect')
225          hww = self.nodes[1].get_wallet_rpc('hww_disconnect')
226  
227          # Try to spend
228          dest = hww.getrawchangeaddress()
229          assert_raises_rpc_error(-25, "External signer not found", hww.send, outputs=[{dest:0.5}])
230  
231      def test_invalid_signer(self):
232          self.log.debug(f"-signer={self.mock_invalid_signer_path()}")
233          self.log.info('Test invalid external signer')
234          assert_raises_rpc_error(-1, "Invalid descriptor", self.nodes[1].createwallet, wallet_name='hww_invalid', disable_private_keys=True, external_signer=True)
235  
236      def test_multiple_signers(self):
237          self.log.debug(f"-signer={self.mock_multi_signers_path()}")
238          self.log.info('Test multiple external signers')
239  
240          assert_raises_rpc_error(-1, "More than one external signer found", self.nodes[1].createwallet, wallet_name='multi_hww', disable_private_keys=True, external_signer=True)
241  
242  if __name__ == '__main__':
243      WalletSignerTest(__file__).main()