/ test / functional / test_framework / blocktools.py
blocktools.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2015-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Utilities for manipulating blocks and transactions."""
  6  
  7  import struct
  8  import time
  9  import unittest
 10  
 11  from .address import (
 12      address_to_scriptpubkey,
 13      key_to_p2sh_p2wpkh,
 14      key_to_p2wpkh,
 15      script_to_p2sh_p2wsh,
 16      script_to_p2wsh,
 17  )
 18  from .messages import (
 19      CBlock,
 20      COIN,
 21      COutPoint,
 22      CTransaction,
 23      CTxIn,
 24      CTxInWitness,
 25      CTxOut,
 26      SEQUENCE_FINAL,
 27      hash256,
 28      ser_uint256,
 29      tx_from_hex,
 30      uint256_from_compact,
 31      WITNESS_SCALE_FACTOR,
 32      MAX_SEQUENCE_NONFINAL,
 33  )
 34  from .script import (
 35      CScript,
 36      CScriptNum,
 37      CScriptOp,
 38      OP_0,
 39      OP_RETURN,
 40      OP_TRUE,
 41  )
 42  from .script_util import (
 43      key_to_p2pk_script,
 44      key_to_p2wpkh_script,
 45      keys_to_multisig_script,
 46      script_to_p2wsh_script,
 47  )
 48  from .util import assert_equal
 49  
 50  MAX_BLOCK_SIGOPS = 20000
 51  MAX_BLOCK_SIGOPS_WEIGHT = MAX_BLOCK_SIGOPS * WITNESS_SCALE_FACTOR
 52  MAX_STANDARD_TX_SIGOPS = 4000
 53  MAX_STANDARD_TX_WEIGHT = 400000
 54  
 55  # Genesis block time (regtest)
 56  TIME_GENESIS_BLOCK = 1296688602
 57  
 58  MAX_FUTURE_BLOCK_TIME = 2 * 60 * 60
 59  
 60  # Coinbase transaction outputs can only be spent after this number of new blocks (network rule)
 61  COINBASE_MATURITY = 100
 62  
 63  # From BIP141
 64  WITNESS_COMMITMENT_HEADER = b"\xaa\x21\xa9\xed"
 65  
 66  NULL_OUTPOINT = COutPoint(0, 0xffffffff)
 67  
 68  NORMAL_GBT_REQUEST_PARAMS = {"rules": ["segwit"]}
 69  VERSIONBITS_LAST_OLD_BLOCK_VERSION = 4
 70  MIN_BLOCKS_TO_KEEP = 288
 71  
 72  REGTEST_RETARGET_PERIOD = 150
 73  
 74  REGTEST_N_BITS = 0x207fffff  # difficulty retargeting is disabled in REGTEST chainparams"
 75  REGTEST_TARGET = 0x7fffff0000000000000000000000000000000000000000000000000000000000
 76  assert_equal(uint256_from_compact(REGTEST_N_BITS), REGTEST_TARGET)
 77  
 78  DIFF_1_N_BITS = 0x1d00ffff
 79  DIFF_1_TARGET = 0x00000000ffff0000000000000000000000000000000000000000000000000000
 80  assert_equal(uint256_from_compact(DIFF_1_N_BITS), DIFF_1_TARGET)
 81  
 82  DIFF_4_N_BITS = 0x1c3fffc0
 83  DIFF_4_TARGET = int(DIFF_1_TARGET / 4)
 84  assert_equal(uint256_from_compact(DIFF_4_N_BITS), DIFF_4_TARGET)
 85  
 86  # From BIP325
 87  SIGNET_HEADER = b"\xec\xc7\xda\xa2"
 88  
 89  # Number of blocks to create in temporary blockchain branch for reorg testing
 90  FORK_LENGTH = 10
 91  
 92  def nbits_str(nbits):
 93      return f"{nbits:08x}"
 94  
 95  def target_str(target):
 96      return f"{target:064x}"
 97  
 98  def create_block(hashprev=None, coinbase=None, *, ntime=None, height=None, version=None, tmpl=None, txlist=None):
 99      """Create a block (with regtest difficulty)."""
100      block = CBlock()
101      if tmpl is None:
102          tmpl = {}
103      block.nVersion = version or tmpl.get('version') or VERSIONBITS_LAST_OLD_BLOCK_VERSION
104      block.nTime = ntime or tmpl.get('curtime') or int(time.time() + 600)
105      block.hashPrevBlock = hashprev or int(tmpl['previousblockhash'], 0x10)
106      if tmpl and tmpl.get('bits') is not None:
107          block.nBits = struct.unpack('>I', bytes.fromhex(tmpl['bits']))[0]
108      else:
109          block.nBits = REGTEST_N_BITS
110      if coinbase is None:
111          coinbase = create_coinbase(height=height or tmpl["height"])
112      block.vtx.append(coinbase)
113      if txlist:
114          for tx in txlist:
115              if type(tx) is str:
116                  tx = tx_from_hex(tx)
117              block.vtx.append(tx)
118      block.hashMerkleRoot = block.calc_merkle_root()
119      return block
120  
121  def create_empty_fork(node, fork_length=FORK_LENGTH):
122      '''
123          Creates a fork using node's chaintip as the starting point.
124          Returns a list of blocks to submit in order.
125      '''
126      tip = int(node.getbestblockhash(), 16)
127      height = node.getblockcount()
128      block_time = node.getblock(node.getbestblockhash())['time'] + 1
129  
130      blocks = []
131      for _ in range(fork_length):
132          block = create_block(tip, height=height + 1, ntime=block_time)
133          block.solve()
134          blocks.append(block)
135          tip = block.hash_int
136          block_time += 1
137          height += 1
138  
139      return blocks
140  
141  def get_witness_script(witness_root, witness_nonce):
142      witness_commitment = hash256(ser_uint256(witness_root) + ser_uint256(witness_nonce))
143      output_data = WITNESS_COMMITMENT_HEADER + witness_commitment
144      return CScript([OP_RETURN, output_data])
145  
146  def add_witness_commitment(block, nonce=0):
147      """Add a witness commitment to the block's coinbase transaction.
148  
149      According to BIP141, blocks with witness rules active must commit to the
150      hash of all in-block transactions including witness."""
151      # First calculate the merkle root of the block's
152      # transactions, with witnesses.
153      witness_nonce = nonce
154      witness_root = block.calc_witness_merkle_root()
155      # witness_nonce should go to coinbase witness.
156      block.vtx[0].wit.vtxinwit = [CTxInWitness()]
157      block.vtx[0].wit.vtxinwit[0].scriptWitness.stack = [ser_uint256(witness_nonce)]
158  
159      # witness commitment is the last OP_RETURN output in coinbase
160      block.vtx[0].vout.append(CTxOut(0, get_witness_script(witness_root, witness_nonce)))
161      block.hashMerkleRoot = block.calc_merkle_root()
162  
163  
164  def script_BIP34_coinbase_height(height):
165      if height <= 16:
166          res = CScriptOp.encode_op_n(height)
167          # Append dummy extraNonce to increase scriptSig size to 2 (see bad-cb-length consensus rule)
168          return CScript([res, OP_0])
169      return CScript([CScriptNum(height)])
170  
171  
172  def create_coinbase(height, pubkey=None, *, script_pubkey=None, extra_output_script=None, fees=0, nValue=50, halving_period=REGTEST_RETARGET_PERIOD):
173      """Create a coinbase transaction.
174  
175      If pubkey is passed in, the coinbase output will be a P2PK output;
176      otherwise an anyone-can-spend output.
177  
178      If extra_output_script is given, make a 0-value output to that
179      script. This is useful to pad block weight/sigops as needed. """
180      coinbase = CTransaction()
181      coinbase.nLockTime = height - 1
182      coinbase.vin.append(CTxIn(NULL_OUTPOINT, script_BIP34_coinbase_height(height), MAX_SEQUENCE_NONFINAL))
183      coinbaseoutput = CTxOut()
184      coinbaseoutput.nValue = nValue * COIN
185      if nValue == 50:
186          halvings = int(height / halving_period)
187          coinbaseoutput.nValue >>= halvings
188          coinbaseoutput.nValue += fees
189      if pubkey is not None:
190          coinbaseoutput.scriptPubKey = key_to_p2pk_script(pubkey)
191      elif script_pubkey is not None:
192          coinbaseoutput.scriptPubKey = script_pubkey
193      else:
194          coinbaseoutput.scriptPubKey = CScript([OP_TRUE])
195      coinbase.vout = [coinbaseoutput]
196      if extra_output_script is not None:
197          coinbaseoutput2 = CTxOut()
198          coinbaseoutput2.nValue = 0
199          coinbaseoutput2.scriptPubKey = extra_output_script
200          coinbase.vout.append(coinbaseoutput2)
201      return coinbase
202  
203  def create_tx_with_script(prevtx, n, script_sig=b"", *, amount, output_script=None):
204      """Return one-input, one-output transaction object
205         spending the prevtx's n-th output with the given amount.
206  
207         Can optionally pass scriptPubKey and scriptSig, default is anyone-can-spend output.
208      """
209      if output_script is None:
210          output_script = CScript()
211      tx = CTransaction()
212      assert n < len(prevtx.vout)
213      tx.vin.append(CTxIn(COutPoint(prevtx.txid_int, n), script_sig, SEQUENCE_FINAL))
214      tx.vout.append(CTxOut(amount, output_script))
215      return tx
216  
217  def get_legacy_sigopcount_block(block, accurate=True):
218      count = 0
219      for tx in block.vtx:
220          count += get_legacy_sigopcount_tx(tx, accurate)
221      return count
222  
223  def get_legacy_sigopcount_tx(tx, accurate=True):
224      count = 0
225      for i in tx.vout:
226          count += i.scriptPubKey.GetSigOpCount(accurate)
227      for j in tx.vin:
228          # scriptSig might be of type bytes, so convert to CScript for the moment
229          count += CScript(j.scriptSig).GetSigOpCount(accurate)
230      return count
231  
232  def witness_script(use_p2wsh, pubkey):
233      """Create a scriptPubKey for a pay-to-witness TxOut.
234  
235      This is either a P2WPKH output for the given pubkey, or a P2WSH output of a
236      1-of-1 multisig for the given pubkey. Returns the hex encoding of the
237      scriptPubKey."""
238      if not use_p2wsh:
239          # P2WPKH instead
240          pkscript = key_to_p2wpkh_script(pubkey)
241      else:
242          # 1-of-1 multisig
243          witness_script = keys_to_multisig_script([pubkey])
244          pkscript = script_to_p2wsh_script(witness_script)
245      return pkscript.hex()
246  
247  def create_witness_tx(node, use_p2wsh, utxo, pubkey, encode_p2sh, amount):
248      """Return a transaction (in hex) that spends the given utxo to a segwit output.
249  
250      Optionally wrap the segwit output using P2SH."""
251      if use_p2wsh:
252          program = keys_to_multisig_script([pubkey])
253          addr = script_to_p2sh_p2wsh(program) if encode_p2sh else script_to_p2wsh(program)
254      else:
255          addr = key_to_p2sh_p2wpkh(pubkey) if encode_p2sh else key_to_p2wpkh(pubkey)
256      if not encode_p2sh:
257          assert_equal(address_to_scriptpubkey(addr).hex(), witness_script(use_p2wsh, pubkey))
258      return node.createrawtransaction([utxo], {addr: amount})
259  
260  def send_to_witness(use_p2wsh, node, utxo, pubkey, encode_p2sh, amount, sign=True, insert_redeem_script=""):
261      """Create a transaction spending a given utxo to a segwit output.
262  
263      The output corresponds to the given pubkey: use_p2wsh determines whether to
264      use P2WPKH or P2WSH; encode_p2sh determines whether to wrap in P2SH.
265      sign=True will have the given node sign the transaction.
266      insert_redeem_script will be added to the scriptSig, if given."""
267      tx_to_witness = create_witness_tx(node, use_p2wsh, utxo, pubkey, encode_p2sh, amount)
268      if (sign):
269          signed = node.signrawtransactionwithwallet(tx_to_witness)
270          assert "errors" not in signed
271          return node.sendrawtransaction(signed["hex"])
272      else:
273          if (insert_redeem_script):
274              tx = tx_from_hex(tx_to_witness)
275              tx.vin[0].scriptSig += CScript([bytes.fromhex(insert_redeem_script)])
276              tx_to_witness = tx.serialize().hex()
277  
278      return node.sendrawtransaction(tx_to_witness)
279  
280  class TestFrameworkBlockTools(unittest.TestCase):
281      def test_create_block_prefers_explicit_height(self):
282          block = create_block(
283              hashprev=1,
284              tmpl={"height": 100},
285              height=200,
286          )
287          assert_equal(CScriptNum.decode(block.vtx[0].vin[0].scriptSig), 200)
288  
289      def test_create_coinbase(self):
290          height = 20
291          coinbase_tx = create_coinbase(height=height)
292          assert_equal(CScriptNum.decode(coinbase_tx.vin[0].scriptSig), height)