/ tools / scripts / misc / pcapstream.py
pcapstream.py
  1  """
  2  Pcap capture/stream helper
  3  
  4  Connect to the second bristlemouth USB port to start capture.
  5  To use wireshark, make sure it is installed. You can override the default macOS
  6  path with the --wireshark_path argument.
  7  
  8  Use --wireshark to open a wireshark window and stream the packets
  9  
 10  Use --filename to save to a .pcap file for later analysis
 11  """
 12  from collections import namedtuple
 13  import argparse
 14  import os
 15  import serial
 16  import signal
 17  import struct
 18  import subprocess
 19  import sys
 20  import tempfile
 21  import time
 22  
 23  # Pcap header definition
 24  PCAP_HDR = namedtuple(
 25      "PCAP_HDR",
 26      "magic_number version_major version_minor thiszone sigfigs snaplen network",
 27  )
 28  PCAP_STRUCT = "<LHHlLLL"
 29  
 30  # Pcap record header
 31  RECORD_HDR = namedtuple("RECORD_HDR", "ts_sec ts_usec incl_len orig_len")
 32  RECORD_STRUCT = "<LLLL"
 33  
 34  # Default packet header as suggested in
 35  # https://wiki.wireshark.org/Development/LibpcapFileFormat
 36  header = PCAP_HDR(
 37      magic_number=0xA1B2C3D4,
 38      version_major=2,
 39      version_minor=4,
 40      thiszone=0,
 41      sigfigs=0,
 42      snaplen=65535,
 43      network=1,
 44  )
 45  
 46  
 47  #
 48  # Write bytes to pcap file and/or pipe
 49  #
 50  def write(data, pcap, pcap_pipe):
 51      if pcap_pipe:
 52          pcap_pipe.write(data)
 53          pcap_pipe.flush()
 54  
 55      if pcap:
 56          pcap.write(data)
 57          pcap.flush()
 58  
 59  #
 60  # Capture pcap data from USB serial interface
 61  #
 62  def capture_packets(pcap, pcap_pipe):
 63      header = ser.read(24)
 64      print(PCAP_HDR._make(struct.unpack(PCAP_STRUCT, header)))
 65      # TODO - check header
 66  
 67      write(header, pcap, pcap_pipe)
 68  
 69      while True:
 70          header = ser.read(16)
 71          record = RECORD_HDR._make(struct.unpack(RECORD_STRUCT, header))
 72          print(record)
 73          data = ser.read(record.incl_len)
 74          write(header, pcap, pcap_pipe)
 75          write(data, pcap, pcap_pipe)
 76  
 77  
 78  def handler(signum, frame):
 79      print("Exiting")
 80      if pcap:
 81          pcap.close()
 82      exit(0)
 83  
 84  
 85  signal.signal(signal.SIGINT, handler)
 86  
 87  
 88  parser = argparse.ArgumentParser()
 89  parser.add_argument("device", help="Serial port")
 90  parser.add_argument(
 91      "--wireshark", action="store_true", help="Stream capture to wireshark"
 92  )
 93  parser.add_argument(
 94      "--wireshark_path",
 95      default="/Applications/Wireshark.app/Contents/MacOS/Wireshark",
 96      help="Stream capture to wireshark",
 97  )
 98  parser.add_argument("--filename", help="Save stream to pcap file")
 99  args, unknownargs = parser.parse_known_args()
100  
101  pipe_path = None
102  if args.wireshark:
103      # Create temporary directory to create named pipe for wireshark
104      tmpdir = tempfile.mkdtemp()
105      pipe_path = os.path.join(tmpdir, "bm_packet_pipe")
106  
107      # Create named pipe
108      os.mkfifo(pipe_path)
109      print("Created pipe in", pipe_path)
110  
111      print("Running Wireshark")
112      subprocess.Popen([args.wireshark_path, "-i", pipe_path, "-k"])
113  
114  ser = serial.Serial(args.device)
115  
116  pcap = None
117  pcap_pipe = None
118  
119  if args.filename:
120      pcap = open(args.filename, "wb")
121  if pipe_path:
122      pcap_pipe = open(pipe_path, "wb")
123  
124  try:
125      capture_packets(pcap, pcap_pipe)
126  except BrokenPipeError:
127      if pipe_path:
128          print("Removing", pipe_path)
129          os.unlink(pipe_path)