bus.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2017 Carter Nelson for Adafruit Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_onewire.bus`
 24  ====================================================
 25  
 26  Provide access to a 1-Wire bus.
 27  
 28  * Author(s): Carter Nelson
 29  """
 30  
 31  __version__ = "0.0.0-auto.0"
 32  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_OneWire.git"
 33  
 34  import busio
 35  from micropython import const
 36  
 37  _SEARCH_ROM = const(0xF0)
 38  _MATCH_ROM = const(0x55)
 39  _SKIP_ROM = const(0xCC)
 40  _MAX_DEV = const(10)
 41  
 42  
 43  class OneWireError(Exception):
 44      """A class to represent a 1-Wire exception."""
 45  
 46  
 47  class OneWireAddress:
 48      """A class to represent a 1-Wire address."""
 49  
 50      def __init__(self, rom):
 51          self._rom = rom
 52  
 53      @property
 54      def rom(self):
 55          """The unique 64 bit ROM code."""
 56          return self._rom
 57  
 58      @property
 59      def crc(self):
 60          """The 8 bit CRC."""
 61          return self._rom[7]
 62  
 63      @property
 64      def serial_number(self):
 65          """The 48 bit serial number."""
 66          return self._rom[1:7]
 67  
 68      @property
 69      def family_code(self):
 70          """The 8 bit family code."""
 71          return self._rom[0]
 72  
 73  
 74  class OneWireBus:
 75      """A class to represent a 1-Wire bus."""
 76  
 77      def __init__(self, pin):
 78          # pylint: disable=no-member
 79          self._ow = busio.OneWire(pin)
 80          self._readbit = self._ow.read_bit
 81          self._writebit = self._ow.write_bit
 82          self._maximum_devices = _MAX_DEV
 83  
 84      @property
 85      def maximum_devices(self):
 86          """The maximum number of devices the bus will scan for. Valid range is 1 to 255.
 87          It is an error to have more devices on the bus than this number. Having less is OK.
 88          """
 89          return self._maximum_devices
 90  
 91      @maximum_devices.setter
 92      def maximum_devices(self, count):
 93          if not isinstance(count, int):
 94              raise ValueError("Maximum must be an integer value 1 - 255.")
 95          if count < 1 or count > 0xFF:
 96              raise ValueError("Maximum must be an integer value 1 - 255.")
 97          self._maximum_devices = count
 98  
 99      def reset(self, required=False):
100          """
101          Perform a reset and check for presence pulse.
102  
103          :param bool required: require presence pulse
104          """
105          reset = self._ow.reset()
106          if required and reset:
107              raise OneWireError("No presence pulse found. Check devices and wiring.")
108          return not reset
109  
110      def readinto(self, buf, *, start=0, end=None):
111          """
112          Read into ``buf`` from the device. The number of bytes read will be the
113          length of ``buf``.
114  
115          If ``start`` or ``end`` is provided, then the buffer will be sliced
116          as if ``buf[start:end]``. This will not cause an allocation like
117          ``buf[start:end]`` will so it saves memory.
118  
119          :param bytearray buf: buffer to write into
120          :param int start: Index to start writing at
121          :param int end: Index to write up to but not include
122          """
123          if end is None:
124              end = len(buf)
125          for i in range(start, end):
126              buf[i] = self._readbyte()
127  
128      def write(self, buf, *, start=0, end=None):
129          """
130          Write the bytes from ``buf`` to the device.
131  
132          If ``start`` or ``end`` is provided, then the buffer will be sliced
133          as if ``buffer[start:end]``. This will not cause an allocation like
134          ``buffer[start:end]`` will so it saves memory.
135  
136          :param bytearray buf: buffer containing the bytes to write
137          :param int start: Index to start writing from
138          :param int end: Index to read up to but not include
139          """
140          if end is None:
141              end = len(buf)
142          for i in range(start, end):
143              self._writebyte(buf[i])
144  
145      def scan(self):
146          """Scan for devices on the bus and return a list of addresses."""
147          devices = []
148          diff = 65
149          rom = False
150          count = 0
151          for _ in range(0xFF):
152              rom, diff = self._search_rom(rom, diff)
153              if rom:
154                  count += 1
155                  if count > self.maximum_devices:
156                      raise RuntimeError(
157                          "Maximum device count of {} exceeded.".format(
158                              self.maximum_devices
159                          )
160                      )
161                  devices.append(OneWireAddress(rom))
162              if diff == 0:
163                  break
164          return devices
165  
166      def _readbyte(self):
167          val = 0
168          for i in range(8):
169              val |= self._ow.read_bit() << i
170          return val
171  
172      def _writebyte(self, value):
173          for i in range(8):
174              bit = (value >> i) & 0x1
175              self._ow.write_bit(bit)
176  
177      def _search_rom(self, l_rom, diff):
178          if not self.reset():
179              return None, 0
180          self._writebyte(_SEARCH_ROM)
181          if not l_rom:
182              l_rom = bytearray(8)
183          rom = bytearray(8)
184          next_diff = 0
185          i = 64
186          for byte in range(8):
187              r_b = 0
188              for bit in range(8):
189                  b = self._readbit()
190                  if self._readbit():
191                      if b:  # there are no devices or there is an error on the bus
192                          return None, 0
193                  else:
194                      if not b:  # collision, two devices with different bit meaning
195                          if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
196                              b = 1
197                              next_diff = i
198                  self._writebit(b)
199                  r_b |= b << bit
200                  i -= 1
201              rom[byte] = r_b
202          return rom, next_diff
203  
204      @staticmethod
205      def crc8(data):
206          """
207          Perform the 1-Wire CRC check on the provided data.
208  
209          :param bytearray data: 8 byte array representing 64 bit ROM code
210          """
211          crc = 0
212  
213          for byte in data:
214              crc ^= byte
215              for _ in range(8):
216                  if crc & 0x01:
217                      crc = (crc >> 1) ^ 0x8C
218                  else:
219                      crc >>= 1
220                  crc &= 0xFF
221          return crc