p2p_message_capture.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2020-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test per-peer message capture capability. 6 7 Additionally, the output of contrib/message-capture/message-capture-parser.py should be verified manually. 8 """ 9 10 import glob 11 from io import BytesIO 12 import os 13 14 from test_framework.p2p import P2PDataStore, MESSAGEMAP 15 from test_framework.test_framework import BitcoinTestFramework 16 from test_framework.util import assert_equal 17 18 TIME_SIZE = 8 19 LENGTH_SIZE = 4 20 MSGTYPE_SIZE = 12 21 22 def mini_parser(dat_file: str) -> None: 23 """Parse a data file created by CaptureMessageToFile. 24 25 From the data file we'll only check the structure. 26 27 We won't care about things like: 28 - Deserializing the payload of the message 29 - This is managed by the deserialize methods in test_framework.messages 30 - The order of the messages 31 - There's no reason why we can't, say, change the order of the messages in the handshake 32 - Message Type 33 - We can add new message types 34 35 We're ignoring these because they're simply too brittle to test here. 36 """ 37 with open(dat_file, 'rb') as f_in: 38 # This should have at least one message in it 39 assert os.fstat(f_in.fileno()).st_size >= TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE 40 while True: 41 tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE) 42 if not tmp_header_raw: 43 break 44 tmp_header = BytesIO(tmp_header_raw) 45 tmp_header.read(TIME_SIZE) # skip the timestamp field 46 msgtype = tmp_header.read(MSGTYPE_SIZE).rstrip(b'\x00') 47 assert msgtype in MESSAGEMAP 48 length: int = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") 49 data = f_in.read(length) 50 assert_equal(len(data), length) 51 52 53 54 class MessageCaptureTest(BitcoinTestFramework): 55 def set_test_params(self): 56 self.num_nodes = 1 57 self.extra_args = [["-capturemessages"]] 58 self.setup_clean_chain = True 59 60 def run_test(self): 61 capturedir = self.nodes[0].chain_path / "message_capture" 62 # Connect a node so that the handshake occurs 63 self.nodes[0].add_p2p_connection(P2PDataStore()) 64 self.nodes[0].disconnect_p2ps() 65 recv_file = glob.glob(os.path.join(capturedir, "*/msgs_recv.dat"))[0] 66 mini_parser(recv_file) 67 sent_file = glob.glob(os.path.join(capturedir, "*/msgs_sent.dat"))[0] 68 mini_parser(sent_file) 69 70 71 if __name__ == '__main__': 72 MessageCaptureTest().main()