blocktools.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2015-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Utilities for manipulating blocks and transactions.""" 6 7 import struct 8 import time 9 import unittest 10 11 from .address import ( 12 address_to_scriptpubkey, 13 key_to_p2sh_p2wpkh, 14 key_to_p2wpkh, 15 script_to_p2sh_p2wsh, 16 script_to_p2wsh, 17 ) 18 from .messages import ( 19 CBlock, 20 COIN, 21 COutPoint, 22 CTransaction, 23 CTxIn, 24 CTxInWitness, 25 CTxOut, 26 SEQUENCE_FINAL, 27 hash256, 28 ser_uint256, 29 tx_from_hex, 30 uint256_from_compact, 31 WITNESS_SCALE_FACTOR, 32 MAX_SEQUENCE_NONFINAL, 33 ) 34 from .script import ( 35 CScript, 36 CScriptNum, 37 CScriptOp, 38 OP_0, 39 OP_RETURN, 40 OP_TRUE, 41 ) 42 from .script_util import ( 43 key_to_p2pk_script, 44 key_to_p2wpkh_script, 45 keys_to_multisig_script, 46 script_to_p2wsh_script, 47 ) 48 from .util import assert_equal 49 50 MAX_BLOCK_SIGOPS = 20000 51 MAX_BLOCK_SIGOPS_WEIGHT = MAX_BLOCK_SIGOPS * WITNESS_SCALE_FACTOR 52 MAX_STANDARD_TX_SIGOPS = 4000 53 MAX_STANDARD_TX_WEIGHT = 400000 54 55 # Genesis block time (regtest) 56 TIME_GENESIS_BLOCK = 1296688602 57 58 MAX_FUTURE_BLOCK_TIME = 2 * 60 * 60 59 60 # Coinbase transaction outputs can only be spent after this number of new blocks (network rule) 61 COINBASE_MATURITY = 100 62 63 # From BIP141 64 WITNESS_COMMITMENT_HEADER = b"\xaa\x21\xa9\xed" 65 66 NULL_OUTPOINT = COutPoint(0, 0xffffffff) 67 68 NORMAL_GBT_REQUEST_PARAMS = {"rules": ["segwit"]} 69 VERSIONBITS_LAST_OLD_BLOCK_VERSION = 4 70 MIN_BLOCKS_TO_KEEP = 288 71 72 REGTEST_RETARGET_PERIOD = 150 73 74 REGTEST_N_BITS = 0x207fffff # difficulty retargeting is disabled in REGTEST chainparams" 75 REGTEST_TARGET = 0x7fffff0000000000000000000000000000000000000000000000000000000000 76 assert_equal(uint256_from_compact(REGTEST_N_BITS), REGTEST_TARGET) 77 78 DIFF_1_N_BITS = 0x1d00ffff 79 DIFF_1_TARGET = 0x00000000ffff0000000000000000000000000000000000000000000000000000 80 assert_equal(uint256_from_compact(DIFF_1_N_BITS), DIFF_1_TARGET) 81 82 DIFF_4_N_BITS = 0x1c3fffc0 83 DIFF_4_TARGET = int(DIFF_1_TARGET / 4) 84 assert_equal(uint256_from_compact(DIFF_4_N_BITS), DIFF_4_TARGET) 85 86 # From BIP325 87 SIGNET_HEADER = b"\xec\xc7\xda\xa2" 88 89 # Number of blocks to create in temporary blockchain branch for reorg testing 90 FORK_LENGTH = 10 91 92 def nbits_str(nbits): 93 return f"{nbits:08x}" 94 95 def target_str(target): 96 return f"{target:064x}" 97 98 def create_block(hashprev=None, coinbase=None, ntime=None, *, version=None, tmpl=None, txlist=None): 99 """Create a block (with regtest difficulty).""" 100 block = CBlock() 101 if tmpl is None: 102 tmpl = {} 103 block.nVersion = version or tmpl.get('version') or VERSIONBITS_LAST_OLD_BLOCK_VERSION 104 block.nTime = ntime or tmpl.get('curtime') or int(time.time() + 600) 105 block.hashPrevBlock = hashprev or int(tmpl['previousblockhash'], 0x10) 106 if tmpl and tmpl.get('bits') is not None: 107 block.nBits = struct.unpack('>I', bytes.fromhex(tmpl['bits']))[0] 108 else: 109 block.nBits = REGTEST_N_BITS 110 if coinbase is None: 111 coinbase = create_coinbase(height=tmpl['height']) 112 block.vtx.append(coinbase) 113 if txlist: 114 for tx in txlist: 115 if type(tx) is str: 116 tx = tx_from_hex(tx) 117 block.vtx.append(tx) 118 block.hashMerkleRoot = block.calc_merkle_root() 119 return block 120 121 def create_empty_fork(node, fork_length=FORK_LENGTH): 122 ''' 123 Creates a fork using node's chaintip as the starting point. 124 Returns a list of blocks to submit in order. 125 ''' 126 tip = int(node.getbestblockhash(), 16) 127 height = node.getblockcount() 128 block_time = node.getblock(node.getbestblockhash())['time'] + 1 129 130 blocks = [] 131 for _ in range(fork_length): 132 block = create_block(tip, create_coinbase(height + 1), block_time) 133 block.solve() 134 blocks.append(block) 135 tip = block.hash_int 136 block_time += 1 137 height += 1 138 139 return blocks 140 141 def get_witness_script(witness_root, witness_nonce): 142 witness_commitment = hash256(ser_uint256(witness_root) + ser_uint256(witness_nonce)) 143 output_data = WITNESS_COMMITMENT_HEADER + witness_commitment 144 return CScript([OP_RETURN, output_data]) 145 146 def add_witness_commitment(block, nonce=0): 147 """Add a witness commitment to the block's coinbase transaction. 148 149 According to BIP141, blocks with witness rules active must commit to the 150 hash of all in-block transactions including witness.""" 151 # First calculate the merkle root of the block's 152 # transactions, with witnesses. 153 witness_nonce = nonce 154 witness_root = block.calc_witness_merkle_root() 155 # witness_nonce should go to coinbase witness. 156 block.vtx[0].wit.vtxinwit = [CTxInWitness()] 157 block.vtx[0].wit.vtxinwit[0].scriptWitness.stack = [ser_uint256(witness_nonce)] 158 159 # witness commitment is the last OP_RETURN output in coinbase 160 block.vtx[0].vout.append(CTxOut(0, get_witness_script(witness_root, witness_nonce))) 161 block.hashMerkleRoot = block.calc_merkle_root() 162 163 164 def script_BIP34_coinbase_height(height): 165 if height <= 16: 166 res = CScriptOp.encode_op_n(height) 167 # Append dummy extraNonce to increase scriptSig size to 2 (see bad-cb-length consensus rule) 168 return CScript([res, OP_0]) 169 return CScript([CScriptNum(height)]) 170 171 172 def create_coinbase(height, pubkey=None, *, script_pubkey=None, extra_output_script=None, fees=0, nValue=50, halving_period=REGTEST_RETARGET_PERIOD): 173 """Create a coinbase transaction. 174 175 If pubkey is passed in, the coinbase output will be a P2PK output; 176 otherwise an anyone-can-spend output. 177 178 If extra_output_script is given, make a 0-value output to that 179 script. This is useful to pad block weight/sigops as needed. """ 180 coinbase = CTransaction() 181 coinbase.nLockTime = height - 1 182 coinbase.vin.append(CTxIn(NULL_OUTPOINT, script_BIP34_coinbase_height(height), MAX_SEQUENCE_NONFINAL)) 183 coinbaseoutput = CTxOut() 184 coinbaseoutput.nValue = nValue * COIN 185 if nValue == 50: 186 halvings = int(height / halving_period) 187 coinbaseoutput.nValue >>= halvings 188 coinbaseoutput.nValue += fees 189 if pubkey is not None: 190 coinbaseoutput.scriptPubKey = key_to_p2pk_script(pubkey) 191 elif script_pubkey is not None: 192 coinbaseoutput.scriptPubKey = script_pubkey 193 else: 194 coinbaseoutput.scriptPubKey = CScript([OP_TRUE]) 195 coinbase.vout = [coinbaseoutput] 196 if extra_output_script is not None: 197 coinbaseoutput2 = CTxOut() 198 coinbaseoutput2.nValue = 0 199 coinbaseoutput2.scriptPubKey = extra_output_script 200 coinbase.vout.append(coinbaseoutput2) 201 return coinbase 202 203 def create_tx_with_script(prevtx, n, script_sig=b"", *, amount, output_script=None): 204 """Return one-input, one-output transaction object 205 spending the prevtx's n-th output with the given amount. 206 207 Can optionally pass scriptPubKey and scriptSig, default is anyone-can-spend output. 208 """ 209 if output_script is None: 210 output_script = CScript() 211 tx = CTransaction() 212 assert n < len(prevtx.vout) 213 tx.vin.append(CTxIn(COutPoint(prevtx.txid_int, n), script_sig, SEQUENCE_FINAL)) 214 tx.vout.append(CTxOut(amount, output_script)) 215 return tx 216 217 def get_legacy_sigopcount_block(block, accurate=True): 218 count = 0 219 for tx in block.vtx: 220 count += get_legacy_sigopcount_tx(tx, accurate) 221 return count 222 223 def get_legacy_sigopcount_tx(tx, accurate=True): 224 count = 0 225 for i in tx.vout: 226 count += i.scriptPubKey.GetSigOpCount(accurate) 227 for j in tx.vin: 228 # scriptSig might be of type bytes, so convert to CScript for the moment 229 count += CScript(j.scriptSig).GetSigOpCount(accurate) 230 return count 231 232 def witness_script(use_p2wsh, pubkey): 233 """Create a scriptPubKey for a pay-to-witness TxOut. 234 235 This is either a P2WPKH output for the given pubkey, or a P2WSH output of a 236 1-of-1 multisig for the given pubkey. Returns the hex encoding of the 237 scriptPubKey.""" 238 if not use_p2wsh: 239 # P2WPKH instead 240 pkscript = key_to_p2wpkh_script(pubkey) 241 else: 242 # 1-of-1 multisig 243 witness_script = keys_to_multisig_script([pubkey]) 244 pkscript = script_to_p2wsh_script(witness_script) 245 return pkscript.hex() 246 247 def create_witness_tx(node, use_p2wsh, utxo, pubkey, encode_p2sh, amount): 248 """Return a transaction (in hex) that spends the given utxo to a segwit output. 249 250 Optionally wrap the segwit output using P2SH.""" 251 if use_p2wsh: 252 program = keys_to_multisig_script([pubkey]) 253 addr = script_to_p2sh_p2wsh(program) if encode_p2sh else script_to_p2wsh(program) 254 else: 255 addr = key_to_p2sh_p2wpkh(pubkey) if encode_p2sh else key_to_p2wpkh(pubkey) 256 if not encode_p2sh: 257 assert_equal(address_to_scriptpubkey(addr).hex(), witness_script(use_p2wsh, pubkey)) 258 return node.createrawtransaction([utxo], {addr: amount}) 259 260 def send_to_witness(use_p2wsh, node, utxo, pubkey, encode_p2sh, amount, sign=True, insert_redeem_script=""): 261 """Create a transaction spending a given utxo to a segwit output. 262 263 The output corresponds to the given pubkey: use_p2wsh determines whether to 264 use P2WPKH or P2WSH; encode_p2sh determines whether to wrap in P2SH. 265 sign=True will have the given node sign the transaction. 266 insert_redeem_script will be added to the scriptSig, if given.""" 267 tx_to_witness = create_witness_tx(node, use_p2wsh, utxo, pubkey, encode_p2sh, amount) 268 if (sign): 269 signed = node.signrawtransactionwithwallet(tx_to_witness) 270 assert "errors" not in signed or len(["errors"]) == 0 271 return node.sendrawtransaction(signed["hex"]) 272 else: 273 if (insert_redeem_script): 274 tx = tx_from_hex(tx_to_witness) 275 tx.vin[0].scriptSig += CScript([bytes.fromhex(insert_redeem_script)]) 276 tx_to_witness = tx.serialize().hex() 277 278 return node.sendrawtransaction(tx_to_witness) 279 280 class TestFrameworkBlockTools(unittest.TestCase): 281 def test_create_coinbase(self): 282 height = 20 283 coinbase_tx = create_coinbase(height=height) 284 assert_equal(CScriptNum.decode(coinbase_tx.vin[0].scriptSig), height)