/ circuitpython / bm_serial.py
bm_serial.py
1 import board 2 import busio 3 4 5 class BristlemouthSerial: 6 7 def __init__(self, uart=None, node_id: int = 0xC0FFEEEEF0CACC1A) -> None: 8 self.node_id = node_id 9 if uart is None: 10 self.uart = busio.UART(board.TX, board.RX, baudrate=115200) 11 else: 12 self.uart = uart 13 14 def spotter_tx(self, data: bytes) -> int | None: 15 topic = b"spotter/transmit-data" 16 packet = ( 17 self.get_pub_header() 18 + len(topic).to_bytes(2, "little") 19 + topic 20 + b"\x01" 21 + data 22 ) 23 cobs = self.finalize_packet(packet) 24 return self.uart.write(cobs) 25 26 def spotter_log(self, filename: str, data: str) -> int | None: 27 topic = b"spotter/fprintf" 28 packet = ( 29 self.get_pub_header() 30 + len(topic).to_bytes(2, "little") 31 + topic 32 + ("\x00" * 8) 33 + len(filename).to_bytes(2, "little") 34 + (len(data) + 1).to_bytes(2, "little") 35 + filename 36 + data 37 + "\n" 38 ) 39 cobs = self.finalize_packet(packet) 40 return self.uart.write(cobs) 41 42 def finalize_packet(self, packet: bytearray) -> bytes: 43 checksum = self.crc(0, packet) 44 packet[2] = checksum & 0xFF 45 packet[3] = (checksum >> 8) & 0xFF 46 cobs = self.cobs_encode(packet) + b"\x00" 47 return cobs 48 49 def get_pub_header(self) -> bytearray: 50 return ( 51 bytearray.fromhex("02000000") 52 + self.node_id.to_bytes(8, "little") 53 + bytearray.fromhex("0101") 54 ) 55 56 # Adapted from https://github.com/cmcqueen/cobs-python 57 def cobs_encode(self, in_bytes: bytes) -> bytes: 58 final_zero = True 59 out_bytes = bytearray() 60 idx = 0 61 search_start_idx = 0 62 for in_char in in_bytes: 63 if in_char == 0: 64 final_zero = True 65 out_bytes.append(idx - search_start_idx + 1) 66 out_bytes += in_bytes[search_start_idx:idx] 67 search_start_idx = idx + 1 68 else: 69 if idx - search_start_idx == 0xFD: 70 final_zero = False 71 out_bytes.append(0xFF) 72 out_bytes += in_bytes[search_start_idx : idx + 1] 73 search_start_idx = idx + 1 74 idx += 1 75 if idx != search_start_idx or final_zero: 76 out_bytes.append(idx - search_start_idx + 1) 77 out_bytes += in_bytes[search_start_idx:idx] 78 return bytes(out_bytes) 79 80 def crc(self, seed: int, src: bytes) -> int: 81 e, f = 0, 0 82 for i in src: 83 e = (seed ^ i) & 0xFF 84 f = e ^ ((e << 4) & 0xFF) 85 seed = (seed >> 8) ^ (((f << 8) & 0xFFFF) ^ ((f << 3) & 0xFFFF)) ^ (f >> 4) 86 return seed