/ circuitpython / bm_serial.py
bm_serial.py
 1  import board
 2  import busio
 3  
 4  
 5  class BristlemouthSerial:
 6  
 7      def __init__(self, uart=None, node_id: int = 0xC0FFEEEEF0CACC1A) -> None:
 8          self.node_id = node_id
 9          if uart is None:
10              self.uart = busio.UART(board.TX, board.RX, baudrate=115200)
11          else:
12              self.uart = uart
13  
14      def spotter_tx(self, data: bytes) -> int | None:
15          topic = b"spotter/transmit-data"
16          packet = (
17              self.get_pub_header()
18              + len(topic).to_bytes(2, "little")
19              + topic
20              + b"\x01"
21              + data
22          )
23          cobs = self.finalize_packet(packet)
24          return self.uart.write(cobs)
25  
26      def spotter_log(self, filename: str, data: str) -> int | None:
27          topic = b"spotter/fprintf"
28          packet = (
29              self.get_pub_header()
30              + len(topic).to_bytes(2, "little")
31              + topic
32              + ("\x00" * 8)
33              + len(filename).to_bytes(2, "little")
34              + (len(data) + 1).to_bytes(2, "little")
35              + filename
36              + data
37              + "\n"
38          )
39          cobs = self.finalize_packet(packet)
40          return self.uart.write(cobs)
41  
42      def finalize_packet(self, packet: bytearray) -> bytes:
43          checksum = self.crc(0, packet)
44          packet[2] = checksum & 0xFF
45          packet[3] = (checksum >> 8) & 0xFF
46          cobs = self.cobs_encode(packet) + b"\x00"
47          return cobs
48  
49      def get_pub_header(self) -> bytearray:
50          return (
51              bytearray.fromhex("02000000")
52              + self.node_id.to_bytes(8, "little")
53              + bytearray.fromhex("0101")
54          )
55  
56      # Adapted from https://github.com/cmcqueen/cobs-python
57      def cobs_encode(self, in_bytes: bytes) -> bytes:
58          final_zero = True
59          out_bytes = bytearray()
60          idx = 0
61          search_start_idx = 0
62          for in_char in in_bytes:
63              if in_char == 0:
64                  final_zero = True
65                  out_bytes.append(idx - search_start_idx + 1)
66                  out_bytes += in_bytes[search_start_idx:idx]
67                  search_start_idx = idx + 1
68              else:
69                  if idx - search_start_idx == 0xFD:
70                      final_zero = False
71                      out_bytes.append(0xFF)
72                      out_bytes += in_bytes[search_start_idx : idx + 1]
73                      search_start_idx = idx + 1
74              idx += 1
75          if idx != search_start_idx or final_zero:
76              out_bytes.append(idx - search_start_idx + 1)
77              out_bytes += in_bytes[search_start_idx:idx]
78          return bytes(out_bytes)
79  
80      def crc(self, seed: int, src: bytes) -> int:
81          e, f = 0, 0
82          for i in src:
83              e = (seed ^ i) & 0xFF
84              f = e ^ ((e << 4) & 0xFF)
85              seed = (seed >> 8) ^ (((f << 8) & 0xFFFF) ^ ((f << 3) & 0xFFFF)) ^ (f >> 4)
86          return seed