/ adafruit_bluefruitspi.py
adafruit_bluefruitspi.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2018 Kevin Townsend for Adafruit_Industries 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `adafruit_bluefruitspi` 24 ==================================================== 25 26 Helper class to work with the Adafruit Bluefruit LE SPI friend breakout. 27 28 * Author(s): Kevin Townsend 29 30 Implementation Notes 31 -------------------- 32 33 **Hardware:** 34 35 "* `Adafruit Bluefruit LE SPI Friend <https://www.adafruit.com/product/2633>`_" 36 37 **Software and Dependencies:** 38 39 * Adafruit CircuitPython firmware for the supported boards: 40 https://github.com/adafruit/circuitpython/releases 41 42 * Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice 43 """ 44 45 __version__ = "0.0.0-auto.0" 46 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BluefruitSPI.git" 47 48 import time 49 import struct 50 from digitalio import Direction, Pull 51 from adafruit_bus_device.spi_device import SPIDevice 52 from micropython import const 53 54 # pylint: disable=bad-whitespace 55 _MSG_COMMAND = const(0x10) # Command message 56 _MSG_RESPONSE = const(0x20) # Response message 57 _MSG_ALERT = const(0x40) # Alert message 58 _MSG_ERROR = const(0x80) # Error message 59 60 _SDEP_INITIALIZE = const(0xBEEF) # Resets the Bluefruit device 61 _SDEP_ATCOMMAND = const(0x0A00) # AT command wrapper 62 _SDEP_BLEUART_TX = const(0x0A01) # BLE UART transmit data 63 _SDEP_BLEUART_RX = const(0x0A02) # BLE UART read data 64 65 _ARG_STRING = const(0x0100) # String data type 66 _ARG_BYTEARRAY = const(0x0200) # Byte array data type 67 _ARG_INT32 = const(0x0300) # Signed 32-bit integer data type 68 _ARG_UINT32 = const(0x0400) # Unsigned 32-bit integer data type 69 _ARG_INT16 = const(0x0500) # Signed 16-bit integer data type 70 _ARG_UINT16 = const(0x0600) # Unsigned 16-bit integer data type 71 _ARG_INT8 = const(0x0700) # Signed 8-bit integer data type 72 _ARG_UINT8 = const(0x0800) # Unsigned 8-bit integer data type 73 74 _ERROR_INVALIDMSGTYPE = const(0x8021) # SDEP: Unexpected SDEP MsgType 75 _ERROR_INVALIDCMDID = const(0x8022) # SDEP: Unknown command ID 76 _ERROR_INVALIDPAYLOAD = const(0x8023) # SDEP: Payload problem 77 _ERROR_INVALIDLEN = const(0x8024) # SDEP: Indicated len too large 78 _ERROR_INVALIDINPUT = const(0x8060) # AT: Invalid data 79 _ERROR_UNKNOWNCMD = const(0x8061) # AT: Unknown command name 80 _ERROR_INVALIDPARAM = const(0x8062) # AT: Invalid param value 81 _ERROR_UNSUPPORTED = const(0x8063) # AT: Unsupported command 82 83 # For the Bluefruit Connect packets 84 _PACKET_BUTTON_LEN = const(5) 85 _PACKET_COLOR_LEN = const(6) 86 87 # pylint: enable=bad-whitespace 88 89 90 class BluefruitSPI: 91 """Helper for the Bluefruit LE SPI Friend""" 92 93 def __init__( 94 self, spi, cs, irq, reset, debug=False 95 ): # pylint: disable=too-many-arguments 96 self._irq = irq 97 self._buf_tx = bytearray(20) 98 self._buf_rx = bytearray(20) 99 self._debug = debug 100 101 # a cache of data, used for packet parsing 102 self._buffer = [] 103 104 # Reset 105 reset.direction = Direction.OUTPUT 106 reset.value = False 107 time.sleep(0.01) 108 reset.value = True 109 time.sleep(0.5) 110 111 # CS is an active low output 112 cs.direction = Direction.OUTPUT 113 cs.value = True 114 115 # irq line is active high input, so set a pulldown as a precaution 116 self._irq.direction = Direction.INPUT 117 self._irq.pull = Pull.DOWN 118 119 self._spi_device = SPIDevice(spi, cs, baudrate=4000000, phase=0, polarity=0) 120 121 def _cmd(self, cmd): # pylint: disable=too-many-branches 122 """ 123 Executes the supplied AT command, which must be terminated with 124 a new-line character. 125 Returns msgtype, rspid, rsp, which are 8-bit int, 16-bit int and a 126 bytearray. 127 :param cmd: The new-line terminated AT command to execute. 128 """ 129 # Make sure we stay within the 255 byte limit 130 if len(cmd) > 127: 131 if self._debug: 132 print("ERROR: Command too long.") 133 raise ValueError("Command too long.") 134 135 more = 0x80 # More bit is in pos 8, 1 = more data available 136 pos = 0 137 while len(cmd) - pos: 138 # Construct the SDEP packet 139 if len(cmd) - pos <= 16: 140 # Last or sole packet 141 more = 0 142 plen = len(cmd) - pos 143 if plen > 16: 144 plen = 16 145 # Note the 'more' value in bit 8 of the packet len 146 struct.pack_into( 147 "<BHB16s", 148 self._buf_tx, 149 0, 150 _MSG_COMMAND, 151 _SDEP_ATCOMMAND, 152 plen | more, 153 cmd[pos : pos + plen], 154 ) 155 if self._debug: 156 print("Writing: ", [hex(b) for b in self._buf_tx]) 157 else: 158 time.sleep(0.05) 159 160 # Update the position if there is data remaining 161 pos += plen 162 163 # Send out the SPI bus 164 with self._spi_device as spi: 165 spi.write(self._buf_tx, end=len(cmd) + 4) # pylint: disable=no-member 166 167 # Wait up to 200ms for a response 168 timeout = 0.2 169 while timeout > 0 and not self._irq.value: 170 time.sleep(0.01) 171 timeout -= 0.01 172 if timeout <= 0: 173 if self._debug: 174 print("ERROR: Timed out waiting for a response.") 175 raise RuntimeError("Timed out waiting for a response.") 176 177 # Retrieve the response message 178 msgtype = 0 179 rspid = 0 180 rsplen = 0 181 rsp = b"" 182 while self._irq.value is True: 183 # Read the current response packet 184 time.sleep(0.01) 185 with self._spi_device as spi: 186 spi.readinto(self._buf_rx) 187 188 # Read the message envelope and contents 189 msgtype, rspid, rsplen = struct.unpack(">BHB", self._buf_rx[0:4]) 190 if rsplen >= 16: 191 rsp += self._buf_rx[4:20] 192 else: 193 rsp += self._buf_rx[4 : rsplen + 4] 194 if self._debug: 195 print("Reading: ", [hex(b) for b in self._buf_rx]) 196 else: 197 time.sleep(0.05) 198 # Clean up the response buffer 199 if self._debug: 200 print(rsp) 201 202 return msgtype, rspid, rsp 203 204 def init(self): 205 """ 206 Sends the SDEP initialize command, which causes the board to reset. 207 This command should complete in under 1s. 208 """ 209 # Construct the SDEP packet 210 struct.pack_into("<BHB", self._buf_tx, 0, _MSG_COMMAND, _SDEP_INITIALIZE, 0) 211 if self._debug: 212 print("Writing: ", [hex(b) for b in self._buf_tx]) 213 214 # Send out the SPI bus 215 with self._spi_device as spi: 216 spi.write(self._buf_tx, end=4) # pylint: disable=no-member 217 218 # Wait 1 second for the command to complete. 219 time.sleep(1) 220 221 @property 222 def connected(self): 223 """Whether the Bluefruit module is connected to the central""" 224 return int(self.command_check_OK(b"AT+GAPGETCONN")) == 1 225 226 def uart_tx(self, data): 227 """ 228 Sends the specific bytestring out over BLE UART. 229 :param data: The bytestring to send. 230 """ 231 return self._cmd(b"AT+BLEUARTTX=" + data + b"\r\n") 232 233 def uart_rx(self): 234 """ 235 Reads byte data from the BLE UART FIFO. 236 """ 237 data = self.command_check_OK(b"AT+BLEUARTRX") 238 if data: 239 # remove \r\n from ending 240 return data[:-2] 241 return None 242 243 def command(self, string): 244 """Send a command and check response code""" 245 try: 246 msgtype, msgid, rsp = self._cmd(string + "\n") 247 if msgtype == _MSG_ERROR: 248 raise RuntimeError("Error (id:{0})".format(hex(msgid))) 249 if msgtype == _MSG_RESPONSE: 250 return rsp 251 raise RuntimeError("Unknown response (id:{0})".format(hex(msgid))) 252 except RuntimeError as error: 253 raise RuntimeError("AT command failure: " + repr(error)) 254 255 def command_check_OK(self, command, delay=0.0): # pylint: disable=invalid-name 256 """Send a fully formed bytestring AT command, and check 257 whether we got an 'OK' back. Returns payload bytes if there is any""" 258 ret = self.command(command) 259 time.sleep(delay) 260 if not ret or not ret[-4:]: 261 raise RuntimeError("Not OK") 262 if ret[-4:] != b"OK\r\n": 263 raise RuntimeError("Not OK") 264 if ret[:-4]: 265 return ret[:-4] 266 return None 267 268 def read_packet(self): # pylint: disable=too-many-return-statements 269 """ 270 Will read a Bluefruit Connect packet and return it in a parsed format. 271 Currently supports Button and Color packets only 272 """ 273 data = self.uart_rx() 274 if not data: 275 return None 276 # convert to an array of character bytes 277 self._buffer += [chr(b) for b in data] 278 # Find beginning of new packet, starts with a '!' 279 while self._buffer and self._buffer[0] != "!": 280 self._buffer.pop(0) 281 # we need at least 2 bytes in the buffer 282 if len(self._buffer) < 2: 283 return None 284 285 # Packet beginning found 286 if self._buffer[1] == "B": 287 plen = _PACKET_BUTTON_LEN 288 elif self._buffer[1] == "C": 289 plen = _PACKET_COLOR_LEN 290 else: 291 # unknown packet type 292 self._buffer.pop(0) 293 return None 294 295 # split packet off of buffer cache 296 packet = self._buffer[0:plen] 297 298 self._buffer = self._buffer[plen:] # remove packet from buffer 299 if sum([ord(x) for x in packet]) % 256 != 255: # check sum 300 return None 301 302 # OK packet's good! 303 if packet[1] == "B": # buttons have 2 int args to parse 304 # button number & True/False press 305 return ("B", int(packet[2]), packet[3] == "1") 306 if packet[1] == "C": # colorpick has 3 int args to parse 307 # red, green and blue 308 return ("C", ord(packet[2]), ord(packet[3]), ord(packet[4])) 309 # We don't nicely parse this yet 310 return packet[1:-1]