example_test.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """An example functional test 6 7 The module-level docstring should include a high-level description of 8 what the test is doing. It's the first thing people see when they open 9 the file and should give the reader information about *what* the test 10 is testing and *how* it's being tested 11 """ 12 # Imports should be in PEP8 ordering (std library first, then third party 13 # libraries then local imports). 14 from collections import defaultdict 15 16 # Avoid wildcard * imports 17 # Use lexicographically sorted multi-line imports 18 from test_framework.blocktools import ( 19 create_block, 20 create_coinbase, 21 ) 22 from test_framework.messages import ( 23 CInv, 24 MSG_BLOCK, 25 ) 26 from test_framework.p2p import ( 27 P2PInterface, 28 msg_block, 29 msg_getdata, 30 p2p_lock, 31 ) 32 from test_framework.test_framework import BitcoinTestFramework 33 from test_framework.util import ( 34 assert_equal, 35 ) 36 37 # P2PInterface is a class containing callbacks to be executed when a P2P 38 # message is received from the node-under-test. Subclass P2PInterface and 39 # override the on_*() methods if you need custom behaviour. 40 class BaseNode(P2PInterface): 41 def __init__(self): 42 """Initialize the P2PInterface 43 44 Used to initialize custom properties for the Node that aren't 45 included by default in the base class. Be aware that the P2PInterface 46 base class already stores a counter for each P2P message type and the 47 last received message of each type, which should be sufficient for the 48 needs of most tests. 49 50 Call super().__init__() first for standard initialization and then 51 initialize custom properties.""" 52 super().__init__() 53 # Stores a dictionary of all blocks received 54 self.block_receive_map = defaultdict(int) 55 56 def on_block(self, message): 57 """Override the standard on_block callback 58 59 Store the hash of a received block in the dictionary.""" 60 message.block.calc_sha256() 61 self.block_receive_map[message.block.sha256] += 1 62 63 def on_inv(self, message): 64 """Override the standard on_inv callback""" 65 pass 66 67 def custom_function(): 68 """Do some custom behaviour 69 70 If this function is more generally useful for other tests, consider 71 moving it to a module in test_framework.""" 72 # self.log.info("running custom_function") # Oops! Can't run self.log outside the BitcoinTestFramework 73 pass 74 75 76 class ExampleTest(BitcoinTestFramework): 77 # Each functional test is a subclass of the BitcoinTestFramework class. 78 79 # Override the set_test_params(), skip_test_if_missing_module(), add_options(), setup_chain(), setup_network() 80 # and setup_nodes() methods to customize the test setup as required. 81 82 def add_options(self, parser): 83 self.add_wallet_options(parser) 84 85 def set_test_params(self): 86 """Override test parameters for your individual test. 87 88 This method must be overridden and num_nodes must be explicitly set.""" 89 # By default every test loads a pre-mined chain of 200 blocks from cache. 90 # Set setup_clean_chain to True to skip this and start from the Genesis 91 # block. 92 self.setup_clean_chain = True 93 self.num_nodes = 3 94 # Use self.extra_args to change command-line arguments for the nodes 95 self.extra_args = [[], ["-logips"], []] 96 97 # self.log.info("I've finished set_test_params") # Oops! Can't run self.log before run_test() 98 99 # Use skip_test_if_missing_module() to skip the test if your test requires certain modules to be present. 100 # This test uses generate which requires wallet to be compiled 101 def skip_test_if_missing_module(self): 102 self.skip_if_no_wallet() 103 104 # Use add_options() to add specific command-line options for your test. 105 # In practice this is not used very much, since the tests are mostly written 106 # to be run in automated environments without command-line options. 107 # def add_options() 108 # pass 109 110 # Use setup_chain() to customize the node data directories. In practice 111 # this is not used very much since the default behaviour is almost always 112 # fine 113 # def setup_chain(): 114 # pass 115 116 def setup_network(self): 117 """Setup the test network topology 118 119 Often you won't need to override this, since the standard network topology 120 (linear: node0 <-> node1 <-> node2 <-> ...) is fine for most tests. 121 122 If you do override this method, remember to start the nodes, assign 123 them to self.nodes, connect them and then sync.""" 124 125 self.setup_nodes() 126 127 # In this test, we're not connecting node2 to node0 or node1. Calls to 128 # sync_all() should not include node2, since we're not expecting it to 129 # sync. 130 self.connect_nodes(0, 1) 131 self.sync_all(self.nodes[0:2]) 132 133 # Use setup_nodes() to customize the node start behaviour (for example if 134 # you don't want to start all nodes at the start of the test). 135 # def setup_nodes(): 136 # pass 137 138 def custom_method(self): 139 """Do some custom behaviour for this test 140 141 Define it in a method here because you're going to use it repeatedly. 142 If you think it's useful in general, consider moving it to the base 143 BitcoinTestFramework class so other tests can use it.""" 144 145 self.log.info("Running custom_method") 146 147 def run_test(self): 148 """Main test logic""" 149 150 # Create P2P connections will wait for a verack to make sure the connection is fully up 151 peer_messaging = self.nodes[0].add_p2p_connection(BaseNode()) 152 153 # Generating a block on one of the nodes will get us out of IBD 154 blocks = [int(self.generate(self.nodes[0], sync_fun=lambda: self.sync_all(self.nodes[0:2]), nblocks=1)[0], 16)] 155 156 # Notice above how we called an RPC by calling a method with the same 157 # name on the node object. Notice also how we used a keyword argument 158 # to specify a named RPC argument. Neither of those are defined on the 159 # node object. Instead there's some __getattr__() magic going on under 160 # the covers to dispatch unrecognised attribute calls to the RPC 161 # interface. 162 163 # Logs are nice. Do plenty of them. They can be used in place of comments for 164 # breaking the test into sub-sections. 165 self.log.info("Starting test!") 166 167 self.log.info("Calling a custom function") 168 custom_function() 169 170 self.log.info("Calling a custom method") 171 self.custom_method() 172 173 self.log.info("Create some blocks") 174 self.tip = int(self.nodes[0].getbestblockhash(), 16) 175 self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1 176 177 height = self.nodes[0].getblockcount() 178 179 for _ in range(10): 180 # Use the blocktools functionality to manually build a block. 181 # Calling the generate() rpc is easier, but this allows us to exactly 182 # control the blocks and transactions. 183 block = create_block(self.tip, create_coinbase(height+1), self.block_time) 184 block.solve() 185 block_message = msg_block(block) 186 # Send message is used to send a P2P message to the node over our P2PInterface 187 peer_messaging.send_message(block_message) 188 self.tip = block.sha256 189 blocks.append(self.tip) 190 self.block_time += 1 191 height += 1 192 193 self.log.info("Wait for node1 to reach current tip (height 11) using RPC") 194 self.nodes[1].waitforblockheight(11) 195 196 self.log.info("Connect node2 and node1") 197 self.connect_nodes(1, 2) 198 199 self.log.info("Wait for node2 to receive all the blocks from node1") 200 self.sync_all() 201 202 self.log.info("Add P2P connection to node2") 203 self.nodes[0].disconnect_p2ps() 204 205 peer_receiving = self.nodes[2].add_p2p_connection(BaseNode()) 206 207 self.log.info("Test that node2 propagates all the blocks to us") 208 209 getdata_request = msg_getdata() 210 for block in blocks: 211 getdata_request.inv.append(CInv(MSG_BLOCK, block)) 212 peer_receiving.send_message(getdata_request) 213 214 # wait_until() will loop until a predicate condition is met. Use it to test properties of the 215 # P2PInterface objects. 216 peer_receiving.wait_until(lambda: sorted(blocks) == sorted(list(peer_receiving.block_receive_map.keys())), timeout=5) 217 218 self.log.info("Check that each block was received only once") 219 # The network thread uses a global lock on data access to the P2PConnection objects when sending and receiving 220 # messages. The test thread should acquire the global lock before accessing any P2PConnection data to avoid locking 221 # and synchronization issues. Note p2p.wait_until() acquires this global lock internally when testing the predicate. 222 with p2p_lock: 223 for block in peer_receiving.block_receive_map.values(): 224 assert_equal(block, 1) 225 226 227 if __name__ == '__main__': 228 ExampleTest().main()