uart_capture.py
1 #!/usr/bin/env python3 2 """ 3 AERIS-10 UART Diagnostic Capture Tool 4 5 Captures STM32 DIAG output from USART3 (115200 8N1) and writes to both 6 the terminal and a timestamped log file. Designed for board-day bring-up. 7 8 DIAG output format (from diag_log.h): 9 [ 12345 ms] SUBSYS: message 10 [ 12345 ms] SUBSYS WARN: message 11 [ 12345 ms] SUBSYS **ERR**: message 12 [ 12345 ms] ======== Section Title ======== 13 14 Subsystem tags: CLK, LO, LO_DRV, BF, PA, FPGA, USB, PWR, IMU, MOT, SYS 15 16 Requirements: 17 pip install pyserial 18 19 Usage: 20 python3 uart_capture.py # auto-detect port 21 python3 uart_capture.py -p /dev/cu.usbmodem* # explicit port 22 python3 uart_capture.py --filter LO,PA # only show LO and PA lines 23 python3 uart_capture.py --errors-only # only show WARN and ERR lines 24 python3 uart_capture.py --no-log # terminal only, no log file 25 python3 uart_capture.py --list # list available serial ports 26 """ 27 28 import argparse 29 import datetime 30 import glob 31 import os 32 import re 33 import signal 34 import sys 35 import time 36 37 try: 38 import serial 39 import serial.tools.list_ports 40 except ImportError: 41 print("ERROR: pyserial not installed. Run: pip install pyserial") 42 sys.exit(1) 43 44 # --------------------------------------------------------------------------- 45 # Constants 46 # --------------------------------------------------------------------------- 47 48 DEFAULT_BAUD = 115200 49 ENCODING = "utf-8" 50 LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "logs") 51 52 # ANSI color codes for terminal 53 COLORS = { 54 "RESET": "\033[0m", 55 "RED": "\033[91m", 56 "YELLOW": "\033[93m", 57 "GREEN": "\033[92m", 58 "CYAN": "\033[96m", 59 "DIM": "\033[2m", 60 "BOLD": "\033[1m", 61 "MAGENTA": "\033[95m", 62 } 63 64 # Subsystem tag → color mapping 65 SUBSYS_COLORS = { 66 "CLK": "CYAN", 67 "LO": "GREEN", 68 "LO_DRV": "GREEN", 69 "BF": "MAGENTA", 70 "PA": "YELLOW", 71 "FPGA": "CYAN", 72 "USB": "CYAN", 73 "PWR": "RED", 74 "IMU": "DIM", 75 "MOT": "DIM", 76 "SYS": "BOLD", 77 } 78 79 # Regex patterns for DIAG output parsing 80 RE_DIAG_LINE = re.compile( 81 r"^\[\s*(\d+)\s*ms\]\s+" # timestamp 82 r"(?:={8}\s+(.+?)\s+={8}|" # section separator 83 r"(\w+)" # subsystem tag 84 r"(?:\s+(WARN|\*\*ERR\*\*))?" # optional severity 85 r":\s+(.*))" # message 86 ) 87 88 89 # --------------------------------------------------------------------------- 90 # Helpers 91 # --------------------------------------------------------------------------- 92 93 def list_ports(): 94 """Print available serial ports.""" 95 ports = serial.tools.list_ports.comports() 96 if not ports: 97 print("No serial ports found.") 98 return 99 print(f"{'Port':<30} {'Description':<40} {'HWID'}") 100 print("-" * 100) 101 for p in sorted(ports, key=lambda x: x.device): 102 print(f"{p.device:<30} {p.description:<40} {p.hwid}") 103 104 105 def auto_detect_port(): 106 """Try to auto-detect the STM32 serial port on macOS.""" 107 # Priority order: ST-Link VCP, generic USB serial 108 patterns = [ 109 "/dev/cu.usbmodem*", # ST-Link Virtual COM Port 110 "/dev/cu.usbserial*", # FTDI/CH340/CP210x adapters 111 "/dev/cu.SLAB*", # Silicon Labs CP210x 112 "/dev/cu.wchusbserial*", # CH340/CH341 113 ] 114 for pattern in patterns: 115 matches = sorted(glob.glob(pattern)) 116 if matches: 117 return matches[0] 118 return None 119 120 121 def colorize(line, use_color=True): 122 """Apply ANSI colors to a DIAG line for terminal display.""" 123 if not use_color: 124 return line 125 126 m = RE_DIAG_LINE.match(line) 127 if not m: 128 # Non-DIAG line — show dimmed 129 return f"{COLORS['DIM']}{line}{COLORS['RESET']}" 130 131 timestamp, section, subsys, severity, msg = m.groups() 132 133 if section: 134 # Section separator 135 return ( 136 f"{COLORS['DIM']}[{timestamp:>7} ms]{COLORS['RESET']} " 137 f"{COLORS['BOLD']}======== {section} ========{COLORS['RESET']}" 138 ) 139 140 # Pick color based on severity first, then subsystem 141 if severity == "**ERR**": 142 color = COLORS["RED"] 143 sev_str = f" {COLORS['RED']}**ERR**{COLORS['RESET']}" 144 elif severity == "WARN": 145 color = COLORS["YELLOW"] 146 sev_str = f" {COLORS['YELLOW']}WARN{COLORS['RESET']}" 147 else: 148 color = COLORS.get(SUBSYS_COLORS.get(subsys, ""), "") 149 sev_str = "" 150 151 return ( 152 f"{COLORS['DIM']}[{timestamp:>7} ms]{COLORS['RESET']} " 153 f"{color}{subsys}{COLORS['RESET']}{sev_str}: {msg}" 154 ) 155 156 157 def should_display(line, filter_subsys=None, errors_only=False): 158 """Decide whether to display a line based on filters.""" 159 m = RE_DIAG_LINE.match(line) 160 if not m: 161 # Non-DIAG lines: always show (could be raw HAL_UART_Transmit output) 162 return True 163 164 _, section, subsys, severity, _ = m.groups() 165 166 # Section separators always shown 167 if section: 168 return True 169 170 # Error filter 171 if errors_only and severity not in ("WARN", "**ERR**"): 172 return False 173 174 # Subsystem filter 175 if filter_subsys and subsys not in filter_subsys: 176 return False 177 178 return True 179 180 181 # --------------------------------------------------------------------------- 182 # Stats tracker 183 # --------------------------------------------------------------------------- 184 185 class CaptureStats: 186 """Track line counts per subsystem and severity.""" 187 188 def __init__(self): 189 self.total = 0 190 self.errors = 0 191 self.warnings = 0 192 self.by_subsys = {} 193 self.start_time = time.time() 194 195 def update(self, line): 196 self.total += 1 197 m = RE_DIAG_LINE.match(line) 198 if not m: 199 return 200 _, section, subsys, severity, _ = m.groups() 201 if section: 202 return 203 if subsys: 204 self.by_subsys[subsys] = self.by_subsys.get(subsys, 0) + 1 205 if severity == "**ERR**": 206 self.errors += 1 207 elif severity == "WARN": 208 self.warnings += 1 209 210 def summary(self): 211 elapsed = time.time() - self.start_time 212 lines = [ 213 "", 214 "--- Capture Summary ---", 215 f"Duration: {elapsed:.1f}s", 216 f"Lines: {self.total}", 217 f"Errors: {self.errors}", 218 f"Warnings: {self.warnings}", 219 ] 220 if self.by_subsys: 221 lines.append("By subsystem:") 222 for tag in sorted(self.by_subsys, key=self.by_subsys.get, reverse=True): 223 lines.append(f" {tag:<8} {self.by_subsys[tag]}") 224 return "\n".join(lines) 225 226 227 # --------------------------------------------------------------------------- 228 # Main capture loop 229 # --------------------------------------------------------------------------- 230 231 def capture(port, baud, log_file, filter_subsys, errors_only, use_color): 232 """Open serial port and capture DIAG output.""" 233 stats = CaptureStats() 234 running = True 235 236 def handle_signal(sig, frame): 237 nonlocal running 238 running = False 239 240 signal.signal(signal.SIGINT, handle_signal) 241 signal.signal(signal.SIGTERM, handle_signal) 242 243 try: 244 ser = serial.Serial( 245 port=port, 246 baudrate=baud, 247 bytesize=serial.EIGHTBITS, 248 parity=serial.PARITY_NONE, 249 stopbits=serial.STOPBITS_ONE, 250 timeout=0.1, # 100ms read timeout for responsive Ctrl-C 251 ) 252 except serial.SerialException as e: 253 print(f"ERROR: Could not open {port}: {e}") 254 sys.exit(1) 255 256 print(f"Connected to {port} at {baud} baud") 257 if log_file: 258 print(f"Logging to {log_file}") 259 if filter_subsys: 260 print(f"Filter: {', '.join(sorted(filter_subsys))}") 261 if errors_only: 262 print("Mode: errors/warnings only") 263 print("Press Ctrl-C to stop.\n") 264 265 flog = None 266 if log_file: 267 os.makedirs(os.path.dirname(log_file), exist_ok=True) 268 flog = open(log_file, "w", encoding=ENCODING) 269 flog.write(f"# AERIS-10 UART capture — {datetime.datetime.now().isoformat()}\n") 270 flog.write(f"# Port: {port} Baud: {baud}\n") 271 flog.write(f"# Host: {os.uname().nodename}\n\n") 272 flog.flush() 273 274 line_buf = b"" 275 276 try: 277 while running: 278 try: 279 chunk = ser.read(256) 280 except serial.SerialException as e: 281 print(f"\nSerial error: {e}") 282 break 283 284 if not chunk: 285 continue 286 287 line_buf += chunk 288 289 # Process complete lines 290 while b"\n" in line_buf: 291 raw_line, line_buf = line_buf.split(b"\n", 1) 292 line = raw_line.decode(ENCODING, errors="replace").rstrip("\r") 293 294 if not line: 295 continue 296 297 stats.update(line) 298 299 # Log file always gets everything (unfiltered, no color) 300 if flog: 301 wall_ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] 302 flog.write(f"{wall_ts} {line}\n") 303 flog.flush() 304 305 # Terminal display respects filters 306 if should_display(line, filter_subsys, errors_only): 307 print(colorize(line, use_color)) 308 309 finally: 310 ser.close() 311 if flog: 312 flog.write(f"\n{stats.summary()}\n") 313 flog.close() 314 print(stats.summary()) 315 316 317 # --------------------------------------------------------------------------- 318 # CLI 319 # --------------------------------------------------------------------------- 320 321 def main(): 322 parser = argparse.ArgumentParser( 323 description="AERIS-10 UART Diagnostic Capture Tool", 324 formatter_class=argparse.RawDescriptionHelpFormatter, 325 epilog=__doc__.split("Usage:")[0], 326 ) 327 parser.add_argument( 328 "-p", "--port", 329 help="Serial port (default: auto-detect)", 330 ) 331 parser.add_argument( 332 "-b", "--baud", 333 type=int, 334 default=DEFAULT_BAUD, 335 help=f"Baud rate (default: {DEFAULT_BAUD})", 336 ) 337 parser.add_argument( 338 "--filter", 339 help="Comma-separated subsystem tags to display (e.g. LO,PA,CLK)", 340 ) 341 parser.add_argument( 342 "--errors-only", 343 action="store_true", 344 help="Only display WARN and ERR lines", 345 ) 346 parser.add_argument( 347 "--no-log", 348 action="store_true", 349 help="Disable log file output", 350 ) 351 parser.add_argument( 352 "--no-color", 353 action="store_true", 354 help="Disable ANSI color output", 355 ) 356 parser.add_argument( 357 "--list", 358 action="store_true", 359 help="List available serial ports and exit", 360 ) 361 parser.add_argument( 362 "-o", "--output", 363 help="Log file path (default: logs/uart_YYYYMMDD_HHMMSS.log)", 364 ) 365 366 args = parser.parse_args() 367 368 if args.list: 369 list_ports() 370 sys.exit(0) 371 372 # Resolve port 373 port = args.port 374 if not port: 375 port = auto_detect_port() 376 if not port: 377 print("ERROR: No serial port detected. Use -p to specify, or --list to see ports.") 378 sys.exit(1) 379 print(f"Auto-detected port: {port}") 380 381 # Resolve log file 382 log_file = None 383 if not args.no_log: 384 if args.output: 385 log_file = args.output 386 else: 387 ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 388 log_file = os.path.join(LOG_DIR, f"uart_{ts}.log") 389 390 # Parse filter 391 filter_subsys = None 392 if args.filter: 393 filter_subsys = set(t.strip().upper() for t in args.filter.split(",")) 394 395 # Color detection 396 use_color = not args.no_color and sys.stdout.isatty() 397 398 capture(port, args.baud, log_file, filter_subsys, args.errors_only, use_color) 399 400 401 if __name__ == "__main__": 402 main()