/ 9_Firmware / tools / uart_capture.py
uart_capture.py
  1  #!/usr/bin/env python3
  2  """
  3  AERIS-10 UART Diagnostic Capture Tool
  4  
  5  Captures STM32 DIAG output from USART3 (115200 8N1) and writes to both
  6  the terminal and a timestamped log file. Designed for board-day bring-up.
  7  
  8  DIAG output format (from diag_log.h):
  9      [  12345 ms] SUBSYS: message
 10      [  12345 ms] SUBSYS WARN: message
 11      [  12345 ms] SUBSYS **ERR**: message
 12      [  12345 ms] ======== Section Title ========
 13  
 14  Subsystem tags: CLK, LO, LO_DRV, BF, PA, FPGA, USB, PWR, IMU, MOT, SYS
 15  
 16  Requirements:
 17      pip install pyserial
 18  
 19  Usage:
 20      python3 uart_capture.py                     # auto-detect port
 21      python3 uart_capture.py -p /dev/cu.usbmodem*  # explicit port
 22      python3 uart_capture.py --filter LO,PA      # only show LO and PA lines
 23      python3 uart_capture.py --errors-only       # only show WARN and ERR lines
 24      python3 uart_capture.py --no-log            # terminal only, no log file
 25      python3 uart_capture.py --list              # list available serial ports
 26  """
 27  
 28  import argparse
 29  import datetime
 30  import glob
 31  import os
 32  import re
 33  import signal
 34  import sys
 35  import time
 36  
 37  try:
 38      import serial
 39      import serial.tools.list_ports
 40  except ImportError:
 41      print("ERROR: pyserial not installed. Run: pip install pyserial")
 42      sys.exit(1)
 43  
 44  # ---------------------------------------------------------------------------
 45  # Constants
 46  # ---------------------------------------------------------------------------
 47  
 48  DEFAULT_BAUD = 115200
 49  ENCODING = "utf-8"
 50  LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "logs")
 51  
 52  # ANSI color codes for terminal
 53  COLORS = {
 54      "RESET":   "\033[0m",
 55      "RED":     "\033[91m",
 56      "YELLOW":  "\033[93m",
 57      "GREEN":   "\033[92m",
 58      "CYAN":    "\033[96m",
 59      "DIM":     "\033[2m",
 60      "BOLD":    "\033[1m",
 61      "MAGENTA": "\033[95m",
 62  }
 63  
 64  # Subsystem tag → color mapping
 65  SUBSYS_COLORS = {
 66      "CLK":    "CYAN",
 67      "LO":     "GREEN",
 68      "LO_DRV": "GREEN",
 69      "BF":     "MAGENTA",
 70      "PA":     "YELLOW",
 71      "FPGA":   "CYAN",
 72      "USB":    "CYAN",
 73      "PWR":    "RED",
 74      "IMU":    "DIM",
 75      "MOT":    "DIM",
 76      "SYS":    "BOLD",
 77  }
 78  
 79  # Regex patterns for DIAG output parsing
 80  RE_DIAG_LINE = re.compile(
 81      r"^\[\s*(\d+)\s*ms\]\s+"     # timestamp
 82      r"(?:={8}\s+(.+?)\s+={8}|"   # section separator
 83      r"(\w+)"                      # subsystem tag
 84      r"(?:\s+(WARN|\*\*ERR\*\*))?" # optional severity
 85      r":\s+(.*))"                  # message
 86  )
 87  
 88  
 89  # ---------------------------------------------------------------------------
 90  # Helpers
 91  # ---------------------------------------------------------------------------
 92  
 93  def list_ports():
 94      """Print available serial ports."""
 95      ports = serial.tools.list_ports.comports()
 96      if not ports:
 97          print("No serial ports found.")
 98          return
 99      print(f"{'Port':<30} {'Description':<40} {'HWID'}")
100      print("-" * 100)
101      for p in sorted(ports, key=lambda x: x.device):
102          print(f"{p.device:<30} {p.description:<40} {p.hwid}")
103  
104  
105  def auto_detect_port():
106      """Try to auto-detect the STM32 serial port on macOS."""
107      # Priority order: ST-Link VCP, generic USB serial
108      patterns = [
109          "/dev/cu.usbmodem*",     # ST-Link Virtual COM Port
110          "/dev/cu.usbserial*",    # FTDI/CH340/CP210x adapters
111          "/dev/cu.SLAB*",         # Silicon Labs CP210x
112          "/dev/cu.wchusbserial*", # CH340/CH341
113      ]
114      for pattern in patterns:
115          matches = sorted(glob.glob(pattern))
116          if matches:
117              return matches[0]
118      return None
119  
120  
121  def colorize(line, use_color=True):
122      """Apply ANSI colors to a DIAG line for terminal display."""
123      if not use_color:
124          return line
125  
126      m = RE_DIAG_LINE.match(line)
127      if not m:
128          # Non-DIAG line — show dimmed
129          return f"{COLORS['DIM']}{line}{COLORS['RESET']}"
130  
131      timestamp, section, subsys, severity, msg = m.groups()
132  
133      if section:
134          # Section separator
135          return (
136              f"{COLORS['DIM']}[{timestamp:>7} ms]{COLORS['RESET']} "
137              f"{COLORS['BOLD']}======== {section} ========{COLORS['RESET']}"
138          )
139  
140      # Pick color based on severity first, then subsystem
141      if severity == "**ERR**":
142          color = COLORS["RED"]
143          sev_str = f" {COLORS['RED']}**ERR**{COLORS['RESET']}"
144      elif severity == "WARN":
145          color = COLORS["YELLOW"]
146          sev_str = f" {COLORS['YELLOW']}WARN{COLORS['RESET']}"
147      else:
148          color = COLORS.get(SUBSYS_COLORS.get(subsys, ""), "")
149          sev_str = ""
150  
151      return (
152          f"{COLORS['DIM']}[{timestamp:>7} ms]{COLORS['RESET']} "
153          f"{color}{subsys}{COLORS['RESET']}{sev_str}: {msg}"
154      )
155  
156  
157  def should_display(line, filter_subsys=None, errors_only=False):
158      """Decide whether to display a line based on filters."""
159      m = RE_DIAG_LINE.match(line)
160      if not m:
161          # Non-DIAG lines: always show (could be raw HAL_UART_Transmit output)
162          return True
163  
164      _, section, subsys, severity, _ = m.groups()
165  
166      # Section separators always shown
167      if section:
168          return True
169  
170      # Error filter
171      if errors_only and severity not in ("WARN", "**ERR**"):
172          return False
173  
174      # Subsystem filter
175      if filter_subsys and subsys not in filter_subsys:
176          return False
177  
178      return True
179  
180  
181  # ---------------------------------------------------------------------------
182  # Stats tracker
183  # ---------------------------------------------------------------------------
184  
185  class CaptureStats:
186      """Track line counts per subsystem and severity."""
187  
188      def __init__(self):
189          self.total = 0
190          self.errors = 0
191          self.warnings = 0
192          self.by_subsys = {}
193          self.start_time = time.time()
194  
195      def update(self, line):
196          self.total += 1
197          m = RE_DIAG_LINE.match(line)
198          if not m:
199              return
200          _, section, subsys, severity, _ = m.groups()
201          if section:
202              return
203          if subsys:
204              self.by_subsys[subsys] = self.by_subsys.get(subsys, 0) + 1
205          if severity == "**ERR**":
206              self.errors += 1
207          elif severity == "WARN":
208              self.warnings += 1
209  
210      def summary(self):
211          elapsed = time.time() - self.start_time
212          lines = [
213              "",
214              "--- Capture Summary ---",
215              f"Duration:  {elapsed:.1f}s",
216              f"Lines:     {self.total}",
217              f"Errors:    {self.errors}",
218              f"Warnings:  {self.warnings}",
219          ]
220          if self.by_subsys:
221              lines.append("By subsystem:")
222              for tag in sorted(self.by_subsys, key=self.by_subsys.get, reverse=True):
223                  lines.append(f"  {tag:<8} {self.by_subsys[tag]}")
224          return "\n".join(lines)
225  
226  
227  # ---------------------------------------------------------------------------
228  # Main capture loop
229  # ---------------------------------------------------------------------------
230  
231  def capture(port, baud, log_file, filter_subsys, errors_only, use_color):
232      """Open serial port and capture DIAG output."""
233      stats = CaptureStats()
234      running = True
235  
236      def handle_signal(sig, frame):
237          nonlocal running
238          running = False
239  
240      signal.signal(signal.SIGINT, handle_signal)
241      signal.signal(signal.SIGTERM, handle_signal)
242  
243      try:
244          ser = serial.Serial(
245              port=port,
246              baudrate=baud,
247              bytesize=serial.EIGHTBITS,
248              parity=serial.PARITY_NONE,
249              stopbits=serial.STOPBITS_ONE,
250              timeout=0.1,  # 100ms read timeout for responsive Ctrl-C
251          )
252      except serial.SerialException as e:
253          print(f"ERROR: Could not open {port}: {e}")
254          sys.exit(1)
255  
256      print(f"Connected to {port} at {baud} baud")
257      if log_file:
258          print(f"Logging to {log_file}")
259      if filter_subsys:
260          print(f"Filter: {', '.join(sorted(filter_subsys))}")
261      if errors_only:
262          print("Mode: errors/warnings only")
263      print("Press Ctrl-C to stop.\n")
264  
265      flog = None
266      if log_file:
267          os.makedirs(os.path.dirname(log_file), exist_ok=True)
268          flog = open(log_file, "w", encoding=ENCODING)
269          flog.write(f"# AERIS-10 UART capture — {datetime.datetime.now().isoformat()}\n")
270          flog.write(f"# Port: {port}  Baud: {baud}\n")
271          flog.write(f"# Host: {os.uname().nodename}\n\n")
272          flog.flush()
273  
274      line_buf = b""
275  
276      try:
277          while running:
278              try:
279                  chunk = ser.read(256)
280              except serial.SerialException as e:
281                  print(f"\nSerial error: {e}")
282                  break
283  
284              if not chunk:
285                  continue
286  
287              line_buf += chunk
288  
289              # Process complete lines
290              while b"\n" in line_buf:
291                  raw_line, line_buf = line_buf.split(b"\n", 1)
292                  line = raw_line.decode(ENCODING, errors="replace").rstrip("\r")
293  
294                  if not line:
295                      continue
296  
297                  stats.update(line)
298  
299                  # Log file always gets everything (unfiltered, no color)
300                  if flog:
301                      wall_ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
302                      flog.write(f"{wall_ts}  {line}\n")
303                      flog.flush()
304  
305                  # Terminal display respects filters
306                  if should_display(line, filter_subsys, errors_only):
307                      print(colorize(line, use_color))
308  
309      finally:
310          ser.close()
311          if flog:
312              flog.write(f"\n{stats.summary()}\n")
313              flog.close()
314          print(stats.summary())
315  
316  
317  # ---------------------------------------------------------------------------
318  # CLI
319  # ---------------------------------------------------------------------------
320  
321  def main():
322      parser = argparse.ArgumentParser(
323          description="AERIS-10 UART Diagnostic Capture Tool",
324          formatter_class=argparse.RawDescriptionHelpFormatter,
325          epilog=__doc__.split("Usage:")[0],
326      )
327      parser.add_argument(
328          "-p", "--port",
329          help="Serial port (default: auto-detect)",
330      )
331      parser.add_argument(
332          "-b", "--baud",
333          type=int,
334          default=DEFAULT_BAUD,
335          help=f"Baud rate (default: {DEFAULT_BAUD})",
336      )
337      parser.add_argument(
338          "--filter",
339          help="Comma-separated subsystem tags to display (e.g. LO,PA,CLK)",
340      )
341      parser.add_argument(
342          "--errors-only",
343          action="store_true",
344          help="Only display WARN and ERR lines",
345      )
346      parser.add_argument(
347          "--no-log",
348          action="store_true",
349          help="Disable log file output",
350      )
351      parser.add_argument(
352          "--no-color",
353          action="store_true",
354          help="Disable ANSI color output",
355      )
356      parser.add_argument(
357          "--list",
358          action="store_true",
359          help="List available serial ports and exit",
360      )
361      parser.add_argument(
362          "-o", "--output",
363          help="Log file path (default: logs/uart_YYYYMMDD_HHMMSS.log)",
364      )
365  
366      args = parser.parse_args()
367  
368      if args.list:
369          list_ports()
370          sys.exit(0)
371  
372      # Resolve port
373      port = args.port
374      if not port:
375          port = auto_detect_port()
376          if not port:
377              print("ERROR: No serial port detected. Use -p to specify, or --list to see ports.")
378              sys.exit(1)
379          print(f"Auto-detected port: {port}")
380  
381      # Resolve log file
382      log_file = None
383      if not args.no_log:
384          if args.output:
385              log_file = args.output
386          else:
387              ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
388              log_file = os.path.join(LOG_DIR, f"uart_{ts}.log")
389  
390      # Parse filter
391      filter_subsys = None
392      if args.filter:
393          filter_subsys = set(t.strip().upper() for t in args.filter.split(","))
394  
395      # Color detection
396      use_color = not args.no_color and sys.stdout.isatty()
397  
398      capture(port, args.baud, log_file, filter_subsys, args.errors_only, use_color)
399  
400  
401  if __name__ == "__main__":
402      main()