/ adafruit_bluefruitspi.py
adafruit_bluefruitspi.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2018 Kevin Townsend for Adafruit_Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_bluefruitspi`
 24  ====================================================
 25  
 26  Helper class to work with the Adafruit Bluefruit LE SPI friend breakout.
 27  
 28  * Author(s): Kevin Townsend
 29  
 30  Implementation Notes
 31  --------------------
 32  
 33  **Hardware:**
 34  
 35  "* `Adafruit Bluefruit LE SPI Friend <https://www.adafruit.com/product/2633>`_"
 36  
 37  **Software and Dependencies:**
 38  
 39  * Adafruit CircuitPython firmware for the supported boards:
 40    https://github.com/adafruit/circuitpython/releases
 41  
 42  * Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice
 43  """
 44  
 45  __version__ = "0.0.0-auto.0"
 46  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BluefruitSPI.git"
 47  
 48  import time
 49  import struct
 50  from digitalio import Direction, Pull
 51  from adafruit_bus_device.spi_device import SPIDevice
 52  from micropython import const
 53  
 54  # pylint: disable=bad-whitespace
 55  _MSG_COMMAND = const(0x10)  # Command message
 56  _MSG_RESPONSE = const(0x20)  # Response message
 57  _MSG_ALERT = const(0x40)  # Alert message
 58  _MSG_ERROR = const(0x80)  # Error message
 59  
 60  _SDEP_INITIALIZE = const(0xBEEF)  # Resets the Bluefruit device
 61  _SDEP_ATCOMMAND = const(0x0A00)  # AT command wrapper
 62  _SDEP_BLEUART_TX = const(0x0A01)  # BLE UART transmit data
 63  _SDEP_BLEUART_RX = const(0x0A02)  # BLE UART read data
 64  
 65  _ARG_STRING = const(0x0100)  # String data type
 66  _ARG_BYTEARRAY = const(0x0200)  # Byte array data type
 67  _ARG_INT32 = const(0x0300)  # Signed 32-bit integer data type
 68  _ARG_UINT32 = const(0x0400)  # Unsigned 32-bit integer data type
 69  _ARG_INT16 = const(0x0500)  # Signed 16-bit integer data type
 70  _ARG_UINT16 = const(0x0600)  # Unsigned 16-bit integer data type
 71  _ARG_INT8 = const(0x0700)  # Signed 8-bit integer data type
 72  _ARG_UINT8 = const(0x0800)  # Unsigned 8-bit integer data type
 73  
 74  _ERROR_INVALIDMSGTYPE = const(0x8021)  # SDEP: Unexpected SDEP MsgType
 75  _ERROR_INVALIDCMDID = const(0x8022)  # SDEP: Unknown command ID
 76  _ERROR_INVALIDPAYLOAD = const(0x8023)  # SDEP: Payload problem
 77  _ERROR_INVALIDLEN = const(0x8024)  # SDEP: Indicated len too large
 78  _ERROR_INVALIDINPUT = const(0x8060)  # AT: Invalid data
 79  _ERROR_UNKNOWNCMD = const(0x8061)  # AT: Unknown command name
 80  _ERROR_INVALIDPARAM = const(0x8062)  # AT: Invalid param value
 81  _ERROR_UNSUPPORTED = const(0x8063)  # AT: Unsupported command
 82  
 83  # For the Bluefruit Connect packets
 84  _PACKET_BUTTON_LEN = const(5)
 85  _PACKET_COLOR_LEN = const(6)
 86  
 87  # pylint: enable=bad-whitespace
 88  
 89  
 90  class BluefruitSPI:
 91      """Helper for the Bluefruit LE SPI Friend"""
 92  
 93      def __init__(
 94          self, spi, cs, irq, reset, debug=False
 95      ):  # pylint: disable=too-many-arguments
 96          self._irq = irq
 97          self._buf_tx = bytearray(20)
 98          self._buf_rx = bytearray(20)
 99          self._debug = debug
100  
101          # a cache of data, used for packet parsing
102          self._buffer = []
103  
104          # Reset
105          reset.direction = Direction.OUTPUT
106          reset.value = False
107          time.sleep(0.01)
108          reset.value = True
109          time.sleep(0.5)
110  
111          # CS is an active low output
112          cs.direction = Direction.OUTPUT
113          cs.value = True
114  
115          # irq line is active high input, so set a pulldown as a precaution
116          self._irq.direction = Direction.INPUT
117          self._irq.pull = Pull.DOWN
118  
119          self._spi_device = SPIDevice(spi, cs, baudrate=4000000, phase=0, polarity=0)
120  
121      def _cmd(self, cmd):  # pylint: disable=too-many-branches
122          """
123          Executes the supplied AT command, which must be terminated with
124          a new-line character.
125          Returns msgtype, rspid, rsp, which are 8-bit int, 16-bit int and a
126          bytearray.
127          :param cmd: The new-line terminated AT command to execute.
128          """
129          # Make sure we stay within the 255 byte limit
130          if len(cmd) > 127:
131              if self._debug:
132                  print("ERROR: Command too long.")
133              raise ValueError("Command too long.")
134  
135          more = 0x80  # More bit is in pos 8, 1 = more data available
136          pos = 0
137          while len(cmd) - pos:
138              # Construct the SDEP packet
139              if len(cmd) - pos <= 16:
140                  # Last or sole packet
141                  more = 0
142              plen = len(cmd) - pos
143              if plen > 16:
144                  plen = 16
145              # Note the 'more' value in bit 8 of the packet len
146              struct.pack_into(
147                  "<BHB16s",
148                  self._buf_tx,
149                  0,
150                  _MSG_COMMAND,
151                  _SDEP_ATCOMMAND,
152                  plen | more,
153                  cmd[pos : pos + plen],
154              )
155              if self._debug:
156                  print("Writing: ", [hex(b) for b in self._buf_tx])
157              else:
158                  time.sleep(0.05)
159  
160              # Update the position if there is data remaining
161              pos += plen
162  
163              # Send out the SPI bus
164              with self._spi_device as spi:
165                  spi.write(self._buf_tx, end=len(cmd) + 4)  # pylint: disable=no-member
166  
167          # Wait up to 200ms for a response
168          timeout = 0.2
169          while timeout > 0 and not self._irq.value:
170              time.sleep(0.01)
171              timeout -= 0.01
172          if timeout <= 0:
173              if self._debug:
174                  print("ERROR: Timed out waiting for a response.")
175              raise RuntimeError("Timed out waiting for a response.")
176  
177          # Retrieve the response message
178          msgtype = 0
179          rspid = 0
180          rsplen = 0
181          rsp = b""
182          while self._irq.value is True:
183              # Read the current response packet
184              time.sleep(0.01)
185              with self._spi_device as spi:
186                  spi.readinto(self._buf_rx)
187  
188              # Read the message envelope and contents
189              msgtype, rspid, rsplen = struct.unpack(">BHB", self._buf_rx[0:4])
190              if rsplen >= 16:
191                  rsp += self._buf_rx[4:20]
192              else:
193                  rsp += self._buf_rx[4 : rsplen + 4]
194              if self._debug:
195                  print("Reading: ", [hex(b) for b in self._buf_rx])
196              else:
197                  time.sleep(0.05)
198          # Clean up the response buffer
199          if self._debug:
200              print(rsp)
201  
202          return msgtype, rspid, rsp
203  
204      def init(self):
205          """
206          Sends the SDEP initialize command, which causes the board to reset.
207          This command should complete in under 1s.
208          """
209          # Construct the SDEP packet
210          struct.pack_into("<BHB", self._buf_tx, 0, _MSG_COMMAND, _SDEP_INITIALIZE, 0)
211          if self._debug:
212              print("Writing: ", [hex(b) for b in self._buf_tx])
213  
214          # Send out the SPI bus
215          with self._spi_device as spi:
216              spi.write(self._buf_tx, end=4)  # pylint: disable=no-member
217  
218          # Wait 1 second for the command to complete.
219          time.sleep(1)
220  
221      @property
222      def connected(self):
223          """Whether the Bluefruit module is connected to the central"""
224          return int(self.command_check_OK(b"AT+GAPGETCONN")) == 1
225  
226      def uart_tx(self, data):
227          """
228          Sends the specific bytestring out over BLE UART.
229          :param data: The bytestring to send.
230          """
231          return self._cmd(b"AT+BLEUARTTX=" + data + b"\r\n")
232  
233      def uart_rx(self):
234          """
235          Reads byte data from the BLE UART FIFO.
236          """
237          data = self.command_check_OK(b"AT+BLEUARTRX")
238          if data:
239              # remove \r\n from ending
240              return data[:-2]
241          return None
242  
243      def command(self, string):
244          """Send a command and check response code"""
245          try:
246              msgtype, msgid, rsp = self._cmd(string + "\n")
247              if msgtype == _MSG_ERROR:
248                  raise RuntimeError("Error (id:{0})".format(hex(msgid)))
249              if msgtype == _MSG_RESPONSE:
250                  return rsp
251              raise RuntimeError("Unknown response (id:{0})".format(hex(msgid)))
252          except RuntimeError as error:
253              raise RuntimeError("AT command failure: " + repr(error))
254  
255      def command_check_OK(self, command, delay=0.0):  # pylint: disable=invalid-name
256          """Send a fully formed bytestring AT command, and check
257          whether we got an 'OK' back. Returns payload bytes if there is any"""
258          ret = self.command(command)
259          time.sleep(delay)
260          if not ret or not ret[-4:]:
261              raise RuntimeError("Not OK")
262          if ret[-4:] != b"OK\r\n":
263              raise RuntimeError("Not OK")
264          if ret[:-4]:
265              return ret[:-4]
266          return None
267  
268      def read_packet(self):  # pylint: disable=too-many-return-statements
269          """
270          Will read a Bluefruit Connect packet and return it in a parsed format.
271          Currently supports Button and Color packets only
272          """
273          data = self.uart_rx()
274          if not data:
275              return None
276          # convert to an array of character bytes
277          self._buffer += [chr(b) for b in data]
278          # Find beginning of new packet, starts with a '!'
279          while self._buffer and self._buffer[0] != "!":
280              self._buffer.pop(0)
281          # we need at least 2 bytes in the buffer
282          if len(self._buffer) < 2:
283              return None
284  
285          # Packet beginning found
286          if self._buffer[1] == "B":
287              plen = _PACKET_BUTTON_LEN
288          elif self._buffer[1] == "C":
289              plen = _PACKET_COLOR_LEN
290          else:
291              # unknown packet type
292              self._buffer.pop(0)
293              return None
294  
295          # split packet off of buffer cache
296          packet = self._buffer[0:plen]
297  
298          self._buffer = self._buffer[plen:]  # remove packet from buffer
299          if sum([ord(x) for x in packet]) % 256 != 255:  # check sum
300              return None
301  
302          # OK packet's good!
303          if packet[1] == "B":  # buttons have 2 int args to parse
304              # button number & True/False press
305              return ("B", int(packet[2]), packet[3] == "1")
306          if packet[1] == "C":  # colorpick has 3 int args to parse
307              # red, green and blue
308              return ("C", ord(packet[2]), ord(packet[3]), ord(packet[4]))
309          # We don't nicely parse this yet
310          return packet[1:-1]