example_test.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2017-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """An example functional test 6 7 The module-level docstring should include a high-level description of 8 what the test is doing. It's the first thing people see when they open 9 the file and should give the reader information about *what* the test 10 is testing and *how* it's being tested 11 """ 12 # Imports should be in PEP8 ordering (std library first, then third party 13 # libraries then local imports). 14 from collections import defaultdict 15 16 # Avoid wildcard * imports 17 # Use lexicographically sorted multi-line imports 18 from test_framework.blocktools import ( 19 create_block, 20 create_coinbase, 21 ) 22 from test_framework.messages import ( 23 CInv, 24 MSG_BLOCK, 25 ) 26 from test_framework.p2p import ( 27 P2PInterface, 28 msg_block, 29 msg_getdata, 30 p2p_lock, 31 ) 32 from test_framework.test_framework import BitcoinTestFramework 33 from test_framework.util import ( 34 assert_equal, 35 ) 36 37 # P2PInterface is a class containing callbacks to be executed when a P2P 38 # message is received from the node-under-test. Subclass P2PInterface and 39 # override the on_*() methods if you need custom behaviour. 40 class BaseNode(P2PInterface): 41 def __init__(self): 42 """Initialize the P2PInterface 43 44 Used to initialize custom properties for the Node that aren't 45 included by default in the base class. Be aware that the P2PInterface 46 base class already stores a counter for each P2P message type and the 47 last received message of each type, which should be sufficient for the 48 needs of most tests. 49 50 Call super().__init__() first for standard initialization and then 51 initialize custom properties.""" 52 super().__init__() 53 # Stores a dictionary of all blocks received 54 self.block_receive_map = defaultdict(int) 55 56 def on_block(self, message): 57 """Override the standard on_block callback 58 59 Store the hash of a received block in the dictionary.""" 60 message.block.calc_sha256() 61 self.block_receive_map[message.block.sha256] += 1 62 63 def on_inv(self, message): 64 """Override the standard on_inv callback""" 65 pass 66 67 def custom_function(): 68 """Do some custom behaviour 69 70 If this function is more generally useful for other tests, consider 71 moving it to a module in test_framework.""" 72 # self.log.info("running custom_function") # Oops! Can't run self.log outside the BitcoinTestFramework 73 pass 74 75 76 class ExampleTest(BitcoinTestFramework): 77 # Each functional test is a subclass of the BitcoinTestFramework class. 78 79 # Override the set_test_params(), skip_test_if_missing_module(), add_options(), setup_chain(), setup_network() 80 # and setup_nodes() methods to customize the test setup as required. 81 82 def set_test_params(self): 83 """Override test parameters for your individual test. 84 85 This method must be overridden and num_nodes must be explicitly set.""" 86 # By default every test loads a pre-mined chain of 200 blocks from cache. 87 # Set setup_clean_chain to True to skip this and start from the Genesis 88 # block. 89 self.setup_clean_chain = True 90 self.num_nodes = 3 91 # Use self.extra_args to change command-line arguments for the nodes 92 self.extra_args = [[], ["-logips"], []] 93 94 # self.log.info("I've finished set_test_params") # Oops! Can't run self.log before run_test() 95 96 # Use skip_test_if_missing_module() to skip the test if your test requires certain modules to be present. 97 # This test uses generate which requires wallet to be compiled 98 def skip_test_if_missing_module(self): 99 self.skip_if_no_wallet() 100 101 # Use add_options() to add specific command-line options for your test. 102 # In practice this is not used very much, since the tests are mostly written 103 # to be run in automated environments without command-line options. 104 # def add_options() 105 # pass 106 107 # Use setup_chain() to customize the node data directories. In practice 108 # this is not used very much since the default behaviour is almost always 109 # fine 110 # def setup_chain(): 111 # pass 112 113 def setup_network(self): 114 """Setup the test network topology 115 116 Often you won't need to override this, since the standard network topology 117 (linear: node0 <-> node1 <-> node2 <-> ...) is fine for most tests. 118 119 If you do override this method, remember to start the nodes, assign 120 them to self.nodes, connect them and then sync.""" 121 122 self.setup_nodes() 123 124 # In this test, we're not connecting node2 to node0 or node1. Calls to 125 # sync_all() should not include node2, since we're not expecting it to 126 # sync. 127 self.connect_nodes(0, 1) 128 self.sync_all(self.nodes[0:2]) 129 130 # Use setup_nodes() to customize the node start behaviour (for example if 131 # you don't want to start all nodes at the start of the test). 132 # def setup_nodes(): 133 # pass 134 135 def custom_method(self): 136 """Do some custom behaviour for this test 137 138 Define it in a method here because you're going to use it repeatedly. 139 If you think it's useful in general, consider moving it to the base 140 BitcoinTestFramework class so other tests can use it.""" 141 142 self.log.info("Running custom_method") 143 144 def run_test(self): 145 """Main test logic""" 146 147 # Create P2P connections will wait for a verack to make sure the connection is fully up 148 peer_messaging = self.nodes[0].add_p2p_connection(BaseNode()) 149 150 # Generating a block on one of the nodes will get us out of IBD 151 blocks = [int(self.generate(self.nodes[0], sync_fun=lambda: self.sync_all(self.nodes[0:2]), nblocks=1)[0], 16)] 152 153 # Notice above how we called an RPC by calling a method with the same 154 # name on the node object. Notice also how we used a keyword argument 155 # to specify a named RPC argument. Neither of those are defined on the 156 # node object. Instead there's some __getattr__() magic going on under 157 # the covers to dispatch unrecognised attribute calls to the RPC 158 # interface. 159 160 # Logs are nice. Do plenty of them. They can be used in place of comments for 161 # breaking the test into sub-sections. 162 self.log.info("Starting test!") 163 164 self.log.info("Calling a custom function") 165 custom_function() 166 167 self.log.info("Calling a custom method") 168 self.custom_method() 169 170 self.log.info("Create some blocks") 171 self.tip = int(self.nodes[0].getbestblockhash(), 16) 172 self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1 173 174 height = self.nodes[0].getblockcount() 175 176 for _ in range(10): 177 # Use the blocktools functionality to manually build a block. 178 # Calling the generate() rpc is easier, but this allows us to exactly 179 # control the blocks and transactions. 180 block = create_block(self.tip, create_coinbase(height+1), self.block_time) 181 block.solve() 182 block_message = msg_block(block) 183 # Send message is used to send a P2P message to the node over our P2PInterface 184 peer_messaging.send_without_ping(block_message) 185 self.tip = block.sha256 186 blocks.append(self.tip) 187 self.block_time += 1 188 height += 1 189 190 self.log.info("Wait for node1 to reach current tip (height 11) using RPC") 191 self.nodes[1].waitforblockheight(11) 192 193 self.log.info("Connect node2 and node1") 194 self.connect_nodes(1, 2) 195 196 self.log.info("Wait for node2 to receive all the blocks from node1") 197 self.sync_all() 198 199 self.log.info("Add P2P connection to node2") 200 self.nodes[0].disconnect_p2ps() 201 202 peer_receiving = self.nodes[2].add_p2p_connection(BaseNode()) 203 204 self.log.info("Test that node2 propagates all the blocks to us") 205 206 getdata_request = msg_getdata() 207 for block in blocks: 208 getdata_request.inv.append(CInv(MSG_BLOCK, block)) 209 peer_receiving.send_without_ping(getdata_request) 210 211 # wait_until() will loop until a predicate condition is met. Use it to test properties of the 212 # P2PInterface objects. 213 peer_receiving.wait_until(lambda: sorted(blocks) == sorted(list(peer_receiving.block_receive_map.keys())), timeout=5) 214 215 self.log.info("Check that each block was received only once") 216 # The network thread uses a global lock on data access to the P2PConnection objects when sending and receiving 217 # messages. The test thread should acquire the global lock before accessing any P2PConnection data to avoid locking 218 # and synchronization issues. Note p2p.wait_until() acquires this global lock internally when testing the predicate. 219 with p2p_lock: 220 for block in peer_receiving.block_receive_map.values(): 221 assert_equal(block, 1) 222 223 224 if __name__ == '__main__': 225 ExampleTest(__file__).main()