wallet.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2020-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """A limited-functionality wallet, which may replace a real wallet in tests""" 6 7 from copy import deepcopy 8 from decimal import Decimal 9 from enum import Enum 10 from typing import ( 11 Any, 12 Optional, 13 ) 14 from test_framework.address import ( 15 address_to_scriptpubkey, 16 create_deterministic_address_bcrt1_p2tr_op_true, 17 key_to_p2pkh, 18 key_to_p2sh_p2wpkh, 19 key_to_p2wpkh, 20 output_key_to_p2tr, 21 ) 22 from test_framework.blocktools import COINBASE_MATURITY 23 from test_framework.descriptors import descsum_create 24 from test_framework.key import ( 25 ECKey, 26 compute_xonly_pubkey, 27 ) 28 from test_framework.messages import ( 29 COIN, 30 COutPoint, 31 CTransaction, 32 CTxIn, 33 CTxInWitness, 34 CTxOut, 35 ) 36 from test_framework.script import ( 37 CScript, 38 LEAF_VERSION_TAPSCRIPT, 39 OP_NOP, 40 OP_RETURN, 41 OP_TRUE, 42 sign_input_legacy, 43 taproot_construct, 44 ) 45 from test_framework.script_util import ( 46 key_to_p2pk_script, 47 key_to_p2pkh_script, 48 key_to_p2sh_p2wpkh_script, 49 key_to_p2wpkh_script, 50 ) 51 from test_framework.util import ( 52 assert_equal, 53 assert_greater_than_or_equal, 54 ) 55 from test_framework.wallet_util import generate_keypair 56 57 DEFAULT_FEE = Decimal("0.0001") 58 59 class MiniWalletMode(Enum): 60 """Determines the transaction type the MiniWallet is creating and spending. 61 62 For most purposes, the default mode ADDRESS_OP_TRUE should be sufficient; 63 it simply uses a fixed bech32m P2TR address whose coins are spent with a 64 witness stack of OP_TRUE, i.e. following an anyone-can-spend policy. 65 However, if the transactions need to be modified by the user (e.g. prepending 66 scriptSig for testing opcodes that are activated by a soft-fork), or the txs 67 should contain an actual signature, the raw modes RAW_OP_TRUE and RAW_P2PK 68 can be useful. Summary of modes: 69 70 | output | | tx is | can modify | needs 71 mode | description | address | standard | scriptSig | signing 72 ----------------+-------------------+-----------+----------+------------+---------- 73 ADDRESS_OP_TRUE | anyone-can-spend | bech32m | yes | no | no 74 RAW_OP_TRUE | anyone-can-spend | - (raw) | no | yes | no 75 RAW_P2PK | pay-to-public-key | - (raw) | yes | yes | yes 76 """ 77 ADDRESS_OP_TRUE = 1 78 RAW_OP_TRUE = 2 79 RAW_P2PK = 3 80 81 82 class MiniWallet: 83 def __init__(self, test_node, *, mode=MiniWalletMode.ADDRESS_OP_TRUE): 84 self._test_node = test_node 85 self._utxos = [] 86 self._mode = mode 87 88 assert isinstance(mode, MiniWalletMode) 89 if mode == MiniWalletMode.RAW_OP_TRUE: 90 self._scriptPubKey = bytes(CScript([OP_TRUE])) 91 elif mode == MiniWalletMode.RAW_P2PK: 92 # use simple deterministic private key (k=1) 93 self._priv_key = ECKey() 94 self._priv_key.set((1).to_bytes(32, 'big'), True) 95 pub_key = self._priv_key.get_pubkey() 96 self._scriptPubKey = key_to_p2pk_script(pub_key.get_bytes()) 97 elif mode == MiniWalletMode.ADDRESS_OP_TRUE: 98 self._address, self._internal_key = create_deterministic_address_bcrt1_p2tr_op_true() 99 self._scriptPubKey = address_to_scriptpubkey(self._address) 100 101 # When the pre-mined test framework chain is used, it contains coinbase 102 # outputs to the MiniWallet's default address in blocks 76-100 103 # (see method BitcoinTestFramework._initialize_chain()) 104 # The MiniWallet needs to rescan_utxos() in order to account 105 # for those mature UTXOs, so that all txs spend confirmed coins 106 self.rescan_utxos() 107 108 def _create_utxo(self, *, txid, vout, value, height, coinbase, confirmations): 109 return {"txid": txid, "vout": vout, "value": value, "height": height, "coinbase": coinbase, "confirmations": confirmations} 110 111 def _bulk_tx(self, tx, target_weight): 112 """Pad a transaction with extra outputs until it reaches a target weight (or higher). 113 returns the tx 114 """ 115 tx.vout.append(CTxOut(nValue=0, scriptPubKey=CScript([OP_RETURN, b'a']))) 116 dummy_vbytes = (target_weight - tx.get_weight() + 3) // 4 117 tx.vout[-1].scriptPubKey = CScript([OP_RETURN, b'a' * dummy_vbytes]) 118 # Lower bound should always be off by at most 3 119 assert_greater_than_or_equal(tx.get_weight(), target_weight) 120 # Higher bound should always be off by at most 3 + 12 weight (for encoding the length) 121 assert_greater_than_or_equal(target_weight + 15, tx.get_weight()) 122 123 def get_balance(self): 124 return sum(u['value'] for u in self._utxos) 125 126 def rescan_utxos(self, *, include_mempool=True): 127 """Drop all utxos and rescan the utxo set""" 128 self._utxos = [] 129 res = self._test_node.scantxoutset(action="start", scanobjects=[self.get_descriptor()]) 130 assert_equal(True, res['success']) 131 for utxo in res['unspents']: 132 self._utxos.append( 133 self._create_utxo(txid=utxo["txid"], 134 vout=utxo["vout"], 135 value=utxo["amount"], 136 height=utxo["height"], 137 coinbase=utxo["coinbase"], 138 confirmations=res["height"] - utxo["height"] + 1)) 139 if include_mempool: 140 mempool = self._test_node.getrawmempool(verbose=True) 141 # Sort tx by ancestor count. See BlockAssembler::SortForBlock in src/node/miner.cpp 142 sorted_mempool = sorted(mempool.items(), key=lambda item: (item[1]["ancestorcount"], int(item[0], 16))) 143 for txid, _ in sorted_mempool: 144 self.scan_tx(self._test_node.getrawtransaction(txid=txid, verbose=True)) 145 146 def scan_tx(self, tx): 147 """Scan the tx and adjust the internal list of owned utxos""" 148 for spent in tx["vin"]: 149 # Mark spent. This may happen when the caller has ownership of a 150 # utxo that remained in this wallet. For example, by passing 151 # mark_as_spent=False to get_utxo or by using an utxo returned by a 152 # create_self_transfer* call. 153 try: 154 self.get_utxo(txid=spent["txid"], vout=spent["vout"]) 155 except StopIteration: 156 pass 157 for out in tx['vout']: 158 if out['scriptPubKey']['hex'] == self._scriptPubKey.hex(): 159 self._utxos.append(self._create_utxo(txid=tx["txid"], vout=out["n"], value=out["value"], height=0, coinbase=False, confirmations=0)) 160 161 def scan_txs(self, txs): 162 for tx in txs: 163 self.scan_tx(tx) 164 165 def sign_tx(self, tx, fixed_length=True): 166 if self._mode == MiniWalletMode.RAW_P2PK: 167 # for exact fee calculation, create only signatures with fixed size by default (>49.89% probability): 168 # 65 bytes: high-R val (33 bytes) + low-S val (32 bytes) 169 # with the DER header/skeleton data of 6 bytes added, plus 2 bytes scriptSig overhead 170 # (OP_PUSHn and SIGHASH_ALL), this leads to a scriptSig target size of 73 bytes 171 tx.vin[0].scriptSig = b'' 172 while not len(tx.vin[0].scriptSig) == 73: 173 tx.vin[0].scriptSig = b'' 174 sign_input_legacy(tx, 0, self._scriptPubKey, self._priv_key) 175 if not fixed_length: 176 break 177 elif self._mode == MiniWalletMode.RAW_OP_TRUE: 178 for i in tx.vin: 179 i.scriptSig = CScript([OP_NOP] * 43) # pad to identical size 180 elif self._mode == MiniWalletMode.ADDRESS_OP_TRUE: 181 tx.wit.vtxinwit = [CTxInWitness()] * len(tx.vin) 182 for i in tx.wit.vtxinwit: 183 i.scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key] 184 else: 185 assert False 186 187 def generate(self, num_blocks, **kwargs): 188 """Generate blocks with coinbase outputs to the internal address, and call rescan_utxos""" 189 blocks = self._test_node.generatetodescriptor(num_blocks, self.get_descriptor(), **kwargs) 190 # Calling rescan_utxos here makes sure that after a generate the utxo 191 # set is in a clean state. For example, the wallet will update 192 # - if the caller consumed utxos, but never used them 193 # - if the caller sent a transaction that is not mined or got rbf'd 194 # - after block re-orgs 195 # - the utxo height for mined mempool txs 196 # - However, the wallet will not consider remaining mempool txs 197 self.rescan_utxos() 198 return blocks 199 200 def get_scriptPubKey(self): 201 return self._scriptPubKey 202 203 def get_descriptor(self): 204 return descsum_create(f'raw({self._scriptPubKey.hex()})') 205 206 def get_address(self): 207 assert_equal(self._mode, MiniWalletMode.ADDRESS_OP_TRUE) 208 return self._address 209 210 def get_utxo(self, *, txid: str = '', vout: Optional[int] = None, mark_as_spent=True, confirmed_only=False) -> dict: 211 """ 212 Returns a utxo and marks it as spent (pops it from the internal list) 213 214 Args: 215 txid: get the first utxo we find from a specific transaction 216 """ 217 self._utxos = sorted(self._utxos, key=lambda k: (k['value'], -k['height'])) # Put the largest utxo last 218 blocks_height = self._test_node.getblockchaininfo()['blocks'] 219 mature_coins = list(filter(lambda utxo: not utxo['coinbase'] or COINBASE_MATURITY - 1 <= blocks_height - utxo['height'], self._utxos)) 220 if txid: 221 utxo_filter: Any = filter(lambda utxo: txid == utxo['txid'], self._utxos) 222 else: 223 utxo_filter = reversed(mature_coins) # By default the largest utxo 224 if vout is not None: 225 utxo_filter = filter(lambda utxo: vout == utxo['vout'], utxo_filter) 226 if confirmed_only: 227 utxo_filter = filter(lambda utxo: utxo['confirmations'] > 0, utxo_filter) 228 index = self._utxos.index(next(utxo_filter)) 229 if mark_as_spent: 230 return self._utxos.pop(index) 231 else: 232 return self._utxos[index] 233 234 def get_utxos(self, *, include_immature_coinbase=False, mark_as_spent=True, confirmed_only=False): 235 """Returns the list of all utxos and optionally mark them as spent""" 236 if not include_immature_coinbase: 237 blocks_height = self._test_node.getblockchaininfo()['blocks'] 238 utxo_filter = filter(lambda utxo: not utxo['coinbase'] or COINBASE_MATURITY - 1 <= blocks_height - utxo['height'], self._utxos) 239 else: 240 utxo_filter = self._utxos 241 if confirmed_only: 242 utxo_filter = filter(lambda utxo: utxo['confirmations'] > 0, utxo_filter) 243 utxos = deepcopy(list(utxo_filter)) 244 if mark_as_spent: 245 self._utxos = [] 246 return utxos 247 248 def send_self_transfer(self, *, from_node, **kwargs): 249 """Call create_self_transfer and send the transaction.""" 250 tx = self.create_self_transfer(**kwargs) 251 self.sendrawtransaction(from_node=from_node, tx_hex=tx['hex']) 252 return tx 253 254 def send_to(self, *, from_node, scriptPubKey, amount, fee=1000): 255 """ 256 Create and send a tx with an output to a given scriptPubKey/amount, 257 plus a change output to our internal address. To keep things simple, a 258 fixed fee given in Satoshi is used. 259 260 Note that this method fails if there is no single internal utxo 261 available that can cover the cost for the amount and the fixed fee 262 (the utxo with the largest value is taken). 263 """ 264 tx = self.create_self_transfer(fee_rate=0)["tx"] 265 assert_greater_than_or_equal(tx.vout[0].nValue, amount + fee) 266 tx.vout[0].nValue -= (amount + fee) # change output -> MiniWallet 267 tx.vout.append(CTxOut(amount, scriptPubKey)) # arbitrary output -> to be returned 268 txid = self.sendrawtransaction(from_node=from_node, tx_hex=tx.serialize().hex()) 269 return { 270 "sent_vout": 1, 271 "txid": txid, 272 "wtxid": tx.getwtxid(), 273 "hex": tx.serialize().hex(), 274 "tx": tx, 275 } 276 277 def send_self_transfer_multi(self, *, from_node, **kwargs): 278 """Call create_self_transfer_multi and send the transaction.""" 279 tx = self.create_self_transfer_multi(**kwargs) 280 self.sendrawtransaction(from_node=from_node, tx_hex=tx["hex"]) 281 return tx 282 283 def create_self_transfer_multi( 284 self, 285 *, 286 utxos_to_spend: Optional[list[dict]] = None, 287 num_outputs=1, 288 amount_per_output=0, 289 version=2, 290 locktime=0, 291 sequence=0, 292 fee_per_output=1000, 293 target_weight=0, 294 confirmed_only=False, 295 ): 296 """ 297 Create and return a transaction that spends the given UTXOs and creates a 298 certain number of outputs with equal amounts. The output amounts can be 299 set by amount_per_output or automatically calculated with a fee_per_output. 300 """ 301 utxos_to_spend = utxos_to_spend or [self.get_utxo(confirmed_only=confirmed_only)] 302 sequence = [sequence] * len(utxos_to_spend) if type(sequence) is int else sequence 303 assert_equal(len(utxos_to_spend), len(sequence)) 304 305 # calculate output amount 306 inputs_value_total = sum([int(COIN * utxo['value']) for utxo in utxos_to_spend]) 307 outputs_value_total = inputs_value_total - fee_per_output * num_outputs 308 amount_per_output = amount_per_output or (outputs_value_total // num_outputs) 309 assert amount_per_output > 0 310 outputs_value_total = amount_per_output * num_outputs 311 fee = Decimal(inputs_value_total - outputs_value_total) / COIN 312 313 # create tx 314 tx = CTransaction() 315 tx.vin = [CTxIn(COutPoint(int(utxo_to_spend['txid'], 16), utxo_to_spend['vout']), nSequence=seq) for utxo_to_spend, seq in zip(utxos_to_spend, sequence)] 316 tx.vout = [CTxOut(amount_per_output, bytearray(self._scriptPubKey)) for _ in range(num_outputs)] 317 tx.nVersion = version 318 tx.nLockTime = locktime 319 320 self.sign_tx(tx) 321 322 if target_weight: 323 self._bulk_tx(tx, target_weight) 324 325 txid = tx.rehash() 326 return { 327 "new_utxos": [self._create_utxo( 328 txid=txid, 329 vout=i, 330 value=Decimal(tx.vout[i].nValue) / COIN, 331 height=0, 332 coinbase=False, 333 confirmations=0, 334 ) for i in range(len(tx.vout))], 335 "fee": fee, 336 "txid": txid, 337 "wtxid": tx.getwtxid(), 338 "hex": tx.serialize().hex(), 339 "tx": tx, 340 } 341 342 def create_self_transfer( 343 self, 344 *, 345 fee_rate=Decimal("0.003"), 346 fee=Decimal("0"), 347 utxo_to_spend=None, 348 target_weight=0, 349 confirmed_only=False, 350 **kwargs, 351 ): 352 """Create and return a tx with the specified fee. If fee is 0, use fee_rate, where the resulting fee may be exact or at most one satoshi higher than needed.""" 353 utxo_to_spend = utxo_to_spend or self.get_utxo(confirmed_only=confirmed_only) 354 assert fee_rate >= 0 355 assert fee >= 0 356 # calculate fee 357 if self._mode in (MiniWalletMode.RAW_OP_TRUE, MiniWalletMode.ADDRESS_OP_TRUE): 358 vsize = Decimal(104) # anyone-can-spend 359 elif self._mode == MiniWalletMode.RAW_P2PK: 360 vsize = Decimal(168) # P2PK (73 bytes scriptSig + 35 bytes scriptPubKey + 60 bytes other) 361 else: 362 assert False 363 send_value = utxo_to_spend["value"] - (fee or (fee_rate * vsize / 1000)) 364 365 # create tx 366 tx = self.create_self_transfer_multi( 367 utxos_to_spend=[utxo_to_spend], 368 amount_per_output=int(COIN * send_value), 369 target_weight=target_weight, 370 **kwargs, 371 ) 372 if not target_weight: 373 assert_equal(tx["tx"].get_vsize(), vsize) 374 tx["new_utxo"] = tx.pop("new_utxos")[0] 375 376 return tx 377 378 def sendrawtransaction(self, *, from_node, tx_hex, maxfeerate=0, **kwargs): 379 txid = from_node.sendrawtransaction(hexstring=tx_hex, maxfeerate=maxfeerate, **kwargs) 380 self.scan_tx(from_node.decoderawtransaction(tx_hex)) 381 return txid 382 383 def create_self_transfer_chain(self, *, chain_length, utxo_to_spend=None): 384 """ 385 Create a "chain" of chain_length transactions. The nth transaction in 386 the chain is a child of the n-1th transaction and parent of the n+1th transaction. 387 """ 388 chaintip_utxo = utxo_to_spend or self.get_utxo() 389 chain = [] 390 391 for _ in range(chain_length): 392 tx = self.create_self_transfer(utxo_to_spend=chaintip_utxo) 393 chaintip_utxo = tx["new_utxo"] 394 chain.append(tx) 395 396 return chain 397 398 def send_self_transfer_chain(self, *, from_node, **kwargs): 399 """Create and send a "chain" of chain_length transactions. The nth transaction in 400 the chain is a child of the n-1th transaction and parent of the n+1th transaction. 401 402 Returns a list of objects for each tx (see create_self_transfer_multi). 403 """ 404 chain = self.create_self_transfer_chain(**kwargs) 405 for t in chain: 406 self.sendrawtransaction(from_node=from_node, tx_hex=t["hex"]) 407 return chain 408 409 410 def getnewdestination(address_type='bech32m'): 411 """Generate a random destination of the specified type and return the 412 corresponding public key, scriptPubKey and address. Supported types are 413 'legacy', 'p2sh-segwit', 'bech32' and 'bech32m'. Can be used when a random 414 destination is needed, but no compiled wallet is available (e.g. as 415 replacement to the getnewaddress/getaddressinfo RPCs).""" 416 key, pubkey = generate_keypair() 417 if address_type == 'legacy': 418 scriptpubkey = key_to_p2pkh_script(pubkey) 419 address = key_to_p2pkh(pubkey) 420 elif address_type == 'p2sh-segwit': 421 scriptpubkey = key_to_p2sh_p2wpkh_script(pubkey) 422 address = key_to_p2sh_p2wpkh(pubkey) 423 elif address_type == 'bech32': 424 scriptpubkey = key_to_p2wpkh_script(pubkey) 425 address = key_to_p2wpkh(pubkey) 426 elif address_type == 'bech32m': 427 tap = taproot_construct(compute_xonly_pubkey(key.get_bytes())[0]) 428 pubkey = tap.output_pubkey 429 scriptpubkey = tap.scriptPubKey 430 address = output_key_to_p2tr(pubkey) 431 else: 432 assert False 433 return pubkey, scriptpubkey, address