/ contrib / message-capture / message-capture-parser.py
message-capture-parser.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2020-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Parse message capture binary files.  To be used in conjunction with -capturemessages."""
  6  
  7  import argparse
  8  import os
  9  import shutil
 10  import sys
 11  from io import BytesIO
 12  import json
 13  from pathlib import Path
 14  from typing import Any, Optional
 15  
 16  sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
 17  
 18  from test_framework.messages import ser_uint256     # noqa: E402
 19  from test_framework.p2p import MESSAGEMAP           # noqa: E402
 20  
 21  TIME_SIZE = 8
 22  LENGTH_SIZE = 4
 23  MSGTYPE_SIZE = 12
 24  
 25  # The test framework classes stores hashes as large ints in many cases.
 26  # These are variables of type uint256 in core.
 27  # There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
 28  # As such, they are itemized here.
 29  # Any variables with these names that are of type int are actually uint256 variables.
 30  # (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
 31  HASH_INTS = [
 32      "blockhash",
 33      "block_hash",
 34      "hash",
 35      "hashMerkleRoot",
 36      "hashPrevBlock",
 37      "hashstop",
 38      "prev_header",
 39      "sha256",
 40      "stop_hash",
 41  ]
 42  
 43  HASH_INT_VECTORS = [
 44      "hashes",
 45      "headers",
 46      "vHave",
 47      "vHash",
 48  ]
 49  
 50  
 51  class ProgressBar:
 52      def __init__(self, total: float):
 53          self.total = total
 54          self.running = 0
 55  
 56      def set_progress(self, progress: float):
 57          cols = shutil.get_terminal_size()[0]
 58          if cols <= 12:
 59              return
 60          max_blocks = cols - 9
 61          num_blocks = int(max_blocks * progress)
 62          print('\r[ {}{} ] {:3.0f}%'
 63                .format('#' * num_blocks,
 64                        ' ' * (max_blocks - num_blocks),
 65                        progress * 100),
 66                end ='')
 67  
 68      def update(self, more: float):
 69          self.running += more
 70          self.set_progress(self.running / self.total)
 71  
 72  
 73  def to_jsonable(obj: Any) -> Any:
 74      if hasattr(obj, "__dict__"):
 75          return obj.__dict__
 76      elif hasattr(obj, "__slots__"):
 77          ret = {}    # type: Any
 78          for slot in obj.__slots__:
 79              val = getattr(obj, slot, None)
 80              if slot in HASH_INTS and isinstance(val, int):
 81                  ret[slot] = ser_uint256(val).hex()
 82              elif slot in HASH_INT_VECTORS and all(isinstance(a, int) for a in val):
 83                  ret[slot] = [ser_uint256(a).hex() for a in val]
 84              else:
 85                  ret[slot] = to_jsonable(val)
 86          return ret
 87      elif isinstance(obj, list):
 88          return [to_jsonable(a) for a in obj]
 89      elif isinstance(obj, bytes):
 90          return obj.hex()
 91      else:
 92          return obj
 93  
 94  
 95  def process_file(path: str, messages: list[Any], recv: bool, progress_bar: Optional[ProgressBar]) -> None:
 96      with open(path, 'rb') as f_in:
 97          if progress_bar:
 98              bytes_read = 0
 99  
100          while True:
101              if progress_bar:
102                  # Update progress bar
103                  diff = f_in.tell() - bytes_read - 1
104                  progress_bar.update(diff)
105                  bytes_read = f_in.tell() - 1
106  
107              # Read the Header
108              tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
109              if not tmp_header_raw:
110                  break
111              tmp_header = BytesIO(tmp_header_raw)
112              time = int.from_bytes(tmp_header.read(TIME_SIZE), "little")      # type: int
113              msgtype = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0]     # type: bytes
114              length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little")  # type: int
115  
116              # Start converting the message to a dictionary
117              msg_dict = {}
118              msg_dict["direction"] = "recv" if recv else "sent"
119              msg_dict["time"] = time
120              msg_dict["size"] = length   # "size" is less readable here, but more readable in the output
121  
122              msg_ser = BytesIO(f_in.read(length))
123  
124              # Determine message type
125              if msgtype not in MESSAGEMAP:
126                  # Unrecognized message type
127                  try:
128                      msgtype_tmp = msgtype.decode()
129                      if not msgtype_tmp.isprintable():
130                          raise UnicodeDecodeError
131                      msg_dict["msgtype"] = msgtype_tmp
132                  except UnicodeDecodeError:
133                      msg_dict["msgtype"] = "UNREADABLE"
134                  msg_dict["body"] = msg_ser.read().hex()
135                  msg_dict["error"] = "Unrecognized message type."
136                  messages.append(msg_dict)
137                  print(f"WARNING - Unrecognized message type {msgtype} in {path}", file=sys.stderr)
138                  continue
139  
140              # Deserialize the message
141              msg = MESSAGEMAP[msgtype]()
142              msg_dict["msgtype"] = msgtype.decode()
143  
144              try:
145                  msg.deserialize(msg_ser)
146              except KeyboardInterrupt:
147                  raise
148              except Exception:
149                  # Unable to deserialize message body
150                  msg_ser.seek(0, os.SEEK_SET)
151                  msg_dict["body"] = msg_ser.read().hex()
152                  msg_dict["error"] = "Unable to deserialize message."
153                  messages.append(msg_dict)
154                  print(f"WARNING - Unable to deserialize message in {path}", file=sys.stderr)
155                  continue
156  
157              # Convert body of message into a jsonable object
158              if length:
159                  msg_dict["body"] = to_jsonable(msg)
160              messages.append(msg_dict)
161  
162          if progress_bar:
163              # Update the progress bar to the end of the current file
164              # in case we exited the loop early
165              f_in.seek(0, os.SEEK_END)   # Go to end of file
166              diff = f_in.tell() - bytes_read - 1
167              progress_bar.update(diff)
168  
169  
170  def main():
171      parser = argparse.ArgumentParser(
172          description=__doc__,
173          epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(sys.argv[0]),
174          formatter_class=argparse.RawTextHelpFormatter)
175      parser.add_argument(
176          "capturepaths",
177          nargs='+',
178          help="binary message capture files to parse.")
179      parser.add_argument(
180          "-o", "--output",
181          help="output file.  If unset print to stdout")
182      parser.add_argument(
183          "-n", "--no-progress-bar",
184          action='store_true',
185          help="disable the progress bar.  Automatically set if the output is not a terminal")
186      args = parser.parse_args()
187      capturepaths = [Path.cwd() / Path(capturepath) for capturepath in args.capturepaths]
188      output = Path.cwd() / Path(args.output) if args.output else False
189      use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()
190  
191      messages = []   # type: list[Any]
192      if use_progress_bar:
193          total_size = sum(capture.stat().st_size for capture in capturepaths)
194          progress_bar = ProgressBar(total_size)
195      else:
196          progress_bar = None
197  
198      for capture in capturepaths:
199          process_file(str(capture), messages, "recv" in capture.stem, progress_bar)
200  
201      messages.sort(key=lambda msg: msg['time'])
202  
203      if use_progress_bar:
204          progress_bar.set_progress(1)
205  
206      jsonrep = json.dumps(messages)
207      if output:
208          with open(str(output), 'w+') as f_out:
209              f_out.write(jsonrep)
210      else:
211          print(jsonrep)
212  
213  if __name__ == "__main__":
214      main()