/ adafruit_bitbangio.py
adafruit_bitbangio.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2020 Melissa LeBlanc-Williams for Adafruit Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_bitbangio`
 24  ================================================================================
 25  
 26  A library for adding bitbang I2C and SPI to CircuitPython without the built-in bitbangio module.
 27  The interface is intended to be the same as bitbangio and therefore there is no bit order or chip
 28  select functionality. If your board supports bitbangio, it is recommended to use that instead
 29  as the timing should be more reliable.
 30  
 31  * Author(s): Melissa LeBlanc-Williams
 32  
 33  Implementation Notes
 34  --------------------
 35  
 36  **Software and Dependencies:**
 37  
 38  * Adafruit CircuitPython firmware for the supported boards:
 39    https://github.com/adafruit/circuitpython/releases
 40  
 41  """
 42  
 43  # imports
 44  from time import monotonic, sleep
 45  from digitalio import DigitalInOut
 46  
 47  __version__ = "0.0.0-auto.0"
 48  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BitbangIO.git"
 49  
 50  MSBFIRST = 0
 51  LSBFIRST = 1
 52  
 53  
 54  class _BitBangIO:
 55      """Base class for subclassing only"""
 56  
 57      def __init__(self):
 58          self._locked = False
 59  
 60      def try_lock(self):
 61          """Attempt to grab the lock. Return True on success, False if the lock is already taken."""
 62          if self._locked:
 63              return False
 64          self._locked = True
 65          return True
 66  
 67      def unlock(self):
 68          """Release the lock so others may use the resource."""
 69          if self._locked:
 70              self._locked = False
 71          else:
 72              raise ValueError("Not locked")
 73  
 74      def _check_lock(self):
 75          if not self._locked:
 76              raise RuntimeError("First call try_lock()")
 77          return True
 78  
 79      def __enter__(self):
 80          return self
 81  
 82      def __exit__(self, exc_type, exc_value, traceback):
 83          self.deinit()
 84  
 85      # pylint: disable=no-self-use
 86      def deinit(self):
 87          """Free any hardware used by the object."""
 88          return
 89  
 90      # pylint: enable=no-self-use
 91  
 92  
 93  class I2C(_BitBangIO):
 94      """Software-based implementation of the I2C protocol over GPIO pins."""
 95  
 96      def __init__(self, scl, sda, *, frequency=400000, timeout=1):
 97          """Initialize bitbang (or software) based I2C.  Must provide the I2C
 98          clock, and data pin numbers.
 99          """
100          super().__init__()
101  
102          # Set pins as outputs/inputs.
103          self._scl = DigitalInOut(scl)
104          self._scl.switch_to_output()
105          self._scl.value = 1
106  
107          # SDA flips between being input and output
108          self._sda = DigitalInOut(sda)
109          self._sda.switch_to_output()
110          self._sda.value = 1
111  
112          self._delay = 1 / frequency / 2
113          self._timeout = timeout
114  
115      def scan(self):
116          """Perform an I2C Device Scan"""
117          found = []
118          if self._check_lock():
119              for address in range(0, 0x80):
120                  if self._probe(address):
121                      found.append(address)
122          return found
123  
124      def writeto(self, address, buffer, *, start=0, end=None, stop=True):
125          """Write data from the buffer to an address"""
126          if end is None:
127              end = len(buffer)
128          if self._check_lock():
129              self._write(address, buffer[start:end], stop)
130  
131      def readfrom_into(self, address, buffer, *, start=0, end=None):
132          """Read data from an address and into the buffer"""
133          if end is None:
134              end = len(buffer)
135  
136          if self._check_lock():
137              readin = self._read(address, end - start)
138              for i in range(end - start):
139                  buffer[i + start] = readin[i]
140  
141      def writeto_then_readfrom(
142          self,
143          address,
144          buffer_out,
145          buffer_in,
146          *,
147          out_start=0,
148          out_end=None,
149          in_start=0,
150          in_end=None,
151          stop=True
152      ):
153          """Write data from buffer_out to an address and then
154          read data from an address and into buffer_in
155          """
156          if out_end is None:
157              out_end = len(buffer_out)
158          if in_end is None:
159              in_end = len(buffer_in)
160          if self._check_lock():
161              self.writeto(address, buffer_out, start=out_start, end=out_end, stop=stop)
162              self.readfrom_into(address, buffer_in, start=in_start, end=in_end)
163  
164      def _scl_low(self):
165          self._scl.value = 0
166  
167      def _sda_low(self):
168          self._sda.value = 0
169  
170      def _scl_release(self):
171          """Release and let the pullups lift"""
172          # Use self._timeout to add clock stretching
173          self._scl.value = 1
174  
175      def _sda_release(self):
176          """Release and let the pullups lift"""
177          # Use self._timeout to add clock stretching
178          self._sda.value = 1
179  
180      def _set_values(self, *, scl, sda, delay=None):
181          if delay is None:
182              delay = self._delay
183          self._scl.value = scl
184          self._scl.value = sda
185          sleep(delay)
186  
187      def _start(self):
188          self._sda_release()
189          self._scl_release()
190          sleep(self._delay)
191          self._sda_low()
192          sleep(self._delay)
193  
194      def _stop(self):
195          self._scl_low()
196          sleep(self._delay)
197          self._sda_low()
198          sleep(self._delay)
199          self._scl_release()
200          sleep(self._delay)
201          self._sda_release()
202          sleep(self._delay)
203  
204      def _repeated_start(self):
205          self._scl_low()
206          sleep(self._delay)
207          self._sda_release()
208          sleep(self._delay)
209          self._scl_release()
210          sleep(self._delay)
211          self._sda_low()
212          sleep(self._delay)
213  
214      def _write_byte(self, byte):
215          for bit_position in range(8):
216              self._scl_low()
217              sleep(self._delay)
218              if byte & (0x80 >> bit_position):
219                  self._sda_release()
220              else:
221                  self._sda_low()
222              sleep(self._delay)
223              self._scl_release()
224              sleep(self._delay)
225          self._scl_low()
226          sleep(self._delay * 2)
227  
228          self._scl_release()
229          sleep(self._delay)
230  
231          self._sda.switch_to_input()
232          ack = self._sda.value
233          self._sda.switch_to_output()
234          sleep(self._delay)
235  
236          self._scl_low()
237  
238          return not ack
239  
240      def _read_byte(self, ack=False):
241          self._scl_low()
242          sleep(self._delay)
243  
244          data = 0
245          self._sda.switch_to_input()
246          for _ in range(8):
247              self._scl_release()
248              sleep(self._delay)
249              data = (data << 1) | int(self._sda.value)
250              sleep(self._delay)
251              self._scl_low()
252              sleep(self._delay)
253          self._sda.switch_to_output()
254  
255          if ack:
256              self._sda_low()
257          else:
258              self._sda_release()
259          sleep(self._delay)
260          self._scl_release()
261          sleep(self._delay)
262          return data & 0xFF
263  
264      def _probe(self, address):
265          self._start()
266          ok = self._write_byte(address << 1)
267          self._stop()
268          return ok > 0
269  
270      def _write(self, address, buffer, transmit_stop):
271          self._start()
272          if not self._write_byte(address << 1):
273              raise RuntimeError("Device not responding at 0x{:02X}".format(address))
274          for byte in buffer:
275              self._write_byte(byte)
276          if transmit_stop:
277              self._stop()
278  
279      def _read(self, address, length):
280          self._start()
281          if not self._write_byte(address << 1 | 1):
282              raise RuntimeError("Device not responding at 0x{:02X}".format(address))
283          buffer = bytearray(length)
284          for byte_position in range(length):
285              buffer[byte_position] = self._read_byte(ack=(byte_position != length - 1))
286          self._stop()
287          return buffer
288  
289  
290  class SPI(_BitBangIO):
291      """Software-based implementation of the SPI protocol over GPIO pins."""
292  
293      def __init__(self, clock, MOSI=None, MISO=None):
294          """Initialize bit bang (or software) based SPI.  Must provide the SPI
295          clock, and optionally MOSI and MISO pin numbers. If MOSI is set to None
296          then writes will be disabled and fail with an error, likewise for MISO
297          reads will be disabled.
298          """
299          super().__init__()
300  
301          while self.try_lock():
302              pass
303  
304          self.configure()
305          self.unlock()
306  
307          # Set pins as outputs/inputs.
308          self._sclk = DigitalInOut(clock)
309          self._sclk.switch_to_output()
310  
311          if MOSI is not None:
312              self._mosi = DigitalInOut(MOSI)
313              self._mosi.switch_to_output()
314  
315          if MISO is not None:
316              self._miso = DigitalInOut(MISO)
317              self._miso.switch_to_input()
318  
319      def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8):
320          """Configures the SPI bus. Only valid when locked."""
321          if self._check_lock():
322              if not isinstance(baudrate, int):
323                  raise ValueError("baudrate must be an integer")
324              if not isinstance(bits, int):
325                  raise ValueError("bits must be an integer")
326              if bits < 1 or bits > 8:
327                  raise ValueError("bits must be in the range of 1-8")
328              if polarity not in (0, 1):
329                  raise ValueError("polarity must be either 0 or 1")
330              if phase not in (0, 1):
331                  raise ValueError("phase must be either 0 or 1")
332              self._baudrate = baudrate
333              self._polarity = polarity
334              self._phase = phase
335              self._bits = bits
336              self._half_period = (1 / self._baudrate) / 2  # 50% Duty Cyle delay
337  
338      def _wait(self, start=None):
339          """Wait for up to one half cycle"""
340          while (start + self._half_period) > monotonic():
341              pass
342          return monotonic()  # Return current time
343  
344      def write(self, buffer, start=0, end=None):
345          """Write the data contained in buf. Requires the SPI being locked.
346          If the buffer is empty, nothing happens.
347          """
348          # Fail MOSI is not specified.
349          if self._mosi is None:
350              raise RuntimeError("Write attempted with no MOSI pin specified.")
351          if end is None:
352              end = len(buffer)
353  
354          if self._check_lock():
355              start_time = monotonic()
356              for byte in buffer[start:end]:
357                  for bit_position in range(self._bits):
358                      bit_value = byte & 0x80 >> bit_position
359                      # Set clock to base
360                      if not self._phase:  # Mode 0, 2
361                          self._mosi.value = bit_value
362                      self._sclk.value = self._polarity
363                      start_time = self._wait(start_time)
364  
365                      # Flip clock off base
366                      if self._phase:  # Mode 1, 3
367                          self._mosi.value = bit_value
368                      self._sclk.value = not self._polarity
369                      start_time = self._wait(start_time)
370  
371              # Return pins to base positions
372              self._mosi.value = 0
373              self._sclk.value = self._polarity
374  
375      # pylint: disable=too-many-branches
376      def readinto(self, buffer, start=0, end=None, write_value=0):
377          """Read into the buffer specified by buf while writing zeroes. Requires the SPI being
378          locked. If the number of bytes to read is 0, nothing happens.
379          """
380          if self._miso is None:
381              raise RuntimeError("Read attempted with no MISO pin specified.")
382          if end is None:
383              end = len(buffer)
384  
385          if self._check_lock():
386              start_time = monotonic()
387              for byte_position, _ in enumerate(buffer[start:end]):
388                  for bit_position in range(self._bits):
389                      bit_mask = 0x80 >> bit_position
390                      bit_value = write_value & 0x80 >> bit_position
391                      # Return clock to base
392                      self._sclk.value = self._polarity
393                      start_time = self._wait(start_time)
394                      # Handle read on leading edge of clock.
395                      if not self._phase:  # Mode 0, 2
396                          if self._mosi is not None:
397                              self._mosi.value = bit_value
398                          if self._miso.value:
399                              # Set bit to 1 at appropriate location.
400                              buffer[byte_position] |= bit_mask
401                          else:
402                              # Set bit to 0 at appropriate location.
403                              buffer[byte_position] &= ~bit_mask
404                      # Flip clock off base
405                      self._sclk.value = not self._polarity
406                      start_time = self._wait(start_time)
407                      # Handle read on trailing edge of clock.
408                      if self._phase:  # Mode 1, 3
409                          if self._mosi is not None:
410                              self._mosi.value = bit_value
411                          if self._miso.value:
412                              # Set bit to 1 at appropriate location.
413                              buffer[byte_position] |= bit_mask
414                          else:
415                              # Set bit to 0 at appropriate location.
416                              buffer[byte_position] &= ~bit_mask
417  
418              # Return pins to base positions
419              self._mosi.value = 0
420              self._sclk.value = self._polarity
421  
422      def write_readinto(
423          self,
424          buffer_out,
425          buffer_in,
426          *,
427          out_start=0,
428          out_end=None,
429          in_start=0,
430          in_end=None
431      ):
432          """Write out the data in buffer_out while simultaneously reading data into buffer_in.
433          The lengths of the slices defined by buffer_out[out_start:out_end] and
434          buffer_in[in_start:in_end] must be equal. If buffer slice lengths are
435          both 0, nothing happens.
436          """
437          if self._mosi is None:
438              raise RuntimeError("Write attempted with no MOSI pin specified.")
439          if self._miso is None:
440              raise RuntimeError("Read attempted with no MISO pin specified.")
441          if out_end is None:
442              out_end = len(buffer_out)
443          if in_end is None:
444              in_end = len(buffer_in)
445          if len(buffer_out[out_start:out_end]) != len(buffer_in[in_start:in_end]):
446              raise RuntimeError("Buffer slices must be equal length")
447  
448          if self._check_lock():
449              start_time = monotonic()
450              for byte_position, _ in enumerate(buffer_out[out_start:out_end]):
451                  for bit_position in range(self._bits):
452                      bit_mask = 0x80 >> bit_position
453                      bit_value = (
454                          buffer_out[byte_position + out_start] & 0x80 >> bit_position
455                      )
456                      in_byte_position = byte_position + in_start
457                      # Return clock to 0
458                      self._sclk.value = self._polarity
459                      start_time = self._wait(start_time)
460                      # Handle read on leading edge of clock.
461                      if not self._phase:  # Mode 0, 2
462                          self._mosi.value = bit_value
463                          if self._miso.value:
464                              # Set bit to 1 at appropriate location.
465                              buffer_in[in_byte_position] |= bit_mask
466                          else:
467                              # Set bit to 0 at appropriate location.
468                              buffer_in[in_byte_position] &= ~bit_mask
469                      # Flip clock off base
470                      self._sclk.value = not self._polarity
471                      start_time = self._wait(start_time)
472                      # Handle read on trailing edge of clock.
473                      if self._phase:  # Mode 1, 3
474                          self._mosi.value = bit_value
475                          if self._miso.value:
476                              # Set bit to 1 at appropriate location.
477                              buffer_in[in_byte_position] |= bit_mask
478                          else:
479                              # Set bit to 0 at appropriate location.
480                              buffer_in[in_byte_position] &= ~bit_mask
481  
482              # Return pins to base positions
483              self._mosi.value = 0
484              self._sclk.value = self._polarity
485  
486      # pylint: enable=too-many-branches
487  
488      @property
489      def frequency(self):
490          """Return the currently configured baud rate"""
491          return self._baudrate