/ test / functional / p2p_message_capture.py
p2p_message_capture.py
 1  #!/usr/bin/env python3
 2  # Copyright (c) 2020-2022 The Bitcoin Core developers
 3  # Distributed under the MIT software license, see the accompanying
 4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
 5  """Test per-peer message capture capability.
 6  
 7  Additionally, the output of contrib/message-capture/message-capture-parser.py should be verified manually.
 8  """
 9  
10  import glob
11  from io import BytesIO
12  import os
13  
14  from test_framework.p2p import P2PDataStore, MESSAGEMAP
15  from test_framework.test_framework import BitcoinTestFramework
16  from test_framework.util import assert_equal
17  
18  TIME_SIZE = 8
19  LENGTH_SIZE = 4
20  MSGTYPE_SIZE = 12
21  
22  def mini_parser(dat_file: str) -> None:
23      """Parse a data file created by CaptureMessageToFile.
24  
25      From the data file we'll only check the structure.
26  
27      We won't care about things like:
28      - Deserializing the payload of the message
29          - This is managed by the deserialize methods in test_framework.messages
30      - The order of the messages
31          - There's no reason why we can't, say, change the order of the messages in the handshake
32      - Message Type
33          - We can add new message types
34  
35      We're ignoring these because they're simply too brittle to test here.
36      """
37      with open(dat_file, 'rb') as f_in:
38          # This should have at least one message in it
39          assert os.fstat(f_in.fileno()).st_size >= TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE
40          while True:
41              tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
42              if not tmp_header_raw:
43                  break
44              tmp_header = BytesIO(tmp_header_raw)
45              tmp_header.read(TIME_SIZE) # skip the timestamp field
46              msgtype = tmp_header.read(MSGTYPE_SIZE).rstrip(b'\x00')
47              assert msgtype in MESSAGEMAP
48              length: int = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little")
49              data = f_in.read(length)
50              assert_equal(len(data), length)
51  
52  
53  
54  class MessageCaptureTest(BitcoinTestFramework):
55      def set_test_params(self):
56          self.num_nodes = 1
57          self.extra_args = [["-capturemessages"]]
58          self.setup_clean_chain = True
59  
60      def run_test(self):
61          capturedir = self.nodes[0].chain_path / "message_capture"
62          # Connect a node so that the handshake occurs
63          self.nodes[0].add_p2p_connection(P2PDataStore())
64          self.nodes[0].disconnect_p2ps()
65          recv_file = glob.glob(os.path.join(capturedir, "*/msgs_recv.dat"))[0]
66          mini_parser(recv_file)
67          sent_file = glob.glob(os.path.join(capturedir, "*/msgs_sent.dat"))[0]
68          mini_parser(sent_file)
69  
70  
71  if __name__ == '__main__':
72      MessageCaptureTest().main()