/ adafruit_irremote.py
adafruit_irremote.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2017 Scott Shawcroft for Adafruit Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_irremote`
 24  ====================================================
 25  
 26  Demo code for Circuit Playground Express:
 27  
 28  .. code-block:: python
 29  
 30      # Circuit Playground Express Demo Code
 31      # Adjust the pulseio 'board.PIN' if using something else
 32      import pulseio
 33      import board
 34      import adafruit_irremote
 35  
 36      pulsein = pulseio.PulseIn(board.REMOTEIN, maxlen=120, idle_state=True)
 37      decoder = adafruit_irremote.GenericDecode()
 38  
 39  
 40      while True:
 41          pulses = decoder.read_pulses(pulsein)
 42          print("Heard", len(pulses), "Pulses:", pulses)
 43          try:
 44              code = decoder.decode_bits(pulses)
 45              print("Decoded:", code)
 46          except adafruit_irremote.IRNECRepeatException:  # unusual short code!
 47              print("NEC repeat!")
 48          except adafruit_irremote.IRDecodeException as e:     # failed to decode
 49              print("Failed to decode: ", e.args)
 50  
 51          print("----------------------------")
 52  
 53  * Author(s): Scott Shawcroft
 54  
 55  Implementation Notes
 56  --------------------
 57  
 58  **Hardware:**
 59  
 60  * `CircuitPlayground Express <https://www.adafruit.com/product/3333>`_
 61  
 62  * `IR Receiver Sensor <https://www.adafruit.com/product/157>`_
 63  
 64  **Software and Dependencies:**
 65  
 66  * Adafruit CircuitPython firmware for the ESP8622 and M0-based boards:
 67    https://github.com/adafruit/circuitpython/releases
 68  
 69  """
 70  
 71  # Pretend self matter because we may add object level config later.
 72  # pylint: disable=no-self-use
 73  
 74  import array
 75  import time
 76  
 77  __version__ = "0.0.0-auto.0"
 78  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_IRRemote.git"
 79  
 80  
 81  class IRDecodeException(Exception):
 82      """Generic decode exception"""
 83  
 84  
 85  class IRNECRepeatException(Exception):
 86      """Exception when a NEC repeat is decoded"""
 87  
 88  
 89  class GenericDecode:
 90      """Generic decoding of infrared signals"""
 91  
 92      def bin_data(self, pulses):
 93          """Compute bins of pulse lengths where pulses are +-25% of the average.
 94  
 95             :param list pulses: Input pulse lengths
 96             """
 97          bins = [[pulses[0], 0]]
 98  
 99          for _, pulse in enumerate(pulses):
100              matchedbin = False
101              # print(pulse, end=": ")
102              for b, pulse_bin in enumerate(bins):
103                  if pulse_bin[0] * 0.75 <= pulse <= pulse_bin[0] * 1.25:
104                      # print("matches bin")
105                      bins[b][0] = (pulse_bin[0] + pulse) // 2  # avg em
106                      bins[b][1] += 1  # track it
107                      matchedbin = True
108                      break
109              if not matchedbin:
110                  bins.append([pulse, 1])
111              # print(bins)
112          return bins
113  
114      def decode_bits(self, pulses):
115          """Decode the pulses into bits."""
116          # pylint: disable=too-many-branches,too-many-statements
117  
118          # special exception for NEC repeat code!
119          if (
120              (len(pulses) == 3)
121              and (8000 <= pulses[0] <= 10000)
122              and (2000 <= pulses[1] <= 3000)
123              and (450 <= pulses[2] <= 700)
124          ):
125              raise IRNECRepeatException()
126  
127          if len(pulses) < 10:
128              raise IRDecodeException("10 pulses minimum")
129  
130          # Ignore any header (evens start at 1), and any trailer.
131          if len(pulses) % 2 == 0:
132              pulses_end = -1
133          else:
134              pulses_end = None
135  
136          evens = pulses[1:pulses_end:2]
137          odds = pulses[2:pulses_end:2]
138  
139          # bin both halves
140          even_bins = self.bin_data(evens)
141          odd_bins = self.bin_data(odds)
142  
143          outliers = [b[0] for b in (even_bins + odd_bins) if b[1] == 1]
144          even_bins = [b for b in even_bins if b[1] > 1]
145          odd_bins = [b for b in odd_bins if b[1] > 1]
146  
147          if not even_bins or not odd_bins:
148              raise IRDecodeException("Not enough data")
149  
150          if len(even_bins) == 1:
151              pulses = odds
152              pulse_bins = odd_bins
153          elif len(odd_bins) == 1:
154              pulses = evens
155              pulse_bins = even_bins
156          else:
157              raise IRDecodeException("Both even/odd pulses differ")
158  
159          if len(pulse_bins) == 1:
160              raise IRDecodeException("Pulses do not differ")
161          if len(pulse_bins) > 2:
162              raise IRDecodeException("Only mark & space handled")
163  
164          mark = min(pulse_bins[0][0], pulse_bins[1][0])
165          space = max(pulse_bins[0][0], pulse_bins[1][0])
166  
167          if outliers:
168              # skip outliers
169              pulses = [
170                  p
171                  for p in pulses
172                  if not (outliers[0] * 0.75) <= p <= (outliers[0] * 1.25)
173              ]
174          # convert marks/spaces to 0 and 1
175          for i, pulse_length in enumerate(pulses):
176              if (space * 0.75) <= pulse_length <= (space * 1.25):
177                  pulses[i] = False
178              elif (mark * 0.75) <= pulse_length <= (mark * 1.25):
179                  pulses[i] = True
180              else:
181                  raise IRDecodeException("Pulses outside mark/space")
182  
183          # convert bits to bytes!
184          output = [0] * ((len(pulses) + 7) // 8)
185          for i, pulse_length in enumerate(pulses):
186              output[i // 8] = output[i // 8] << 1
187              if pulse_length:
188                  output[i // 8] |= 1
189          return output
190  
191      def _read_pulses_non_blocking(
192          self, input_pulses, max_pulse=10000, pulse_window=0.10
193      ):
194          """Read out a burst of pulses without blocking until pulses stop for a specified
195              period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``.
196  
197              :param ~pulseio.PulseIn input_pulses: Object to read pulses from
198              :param int max_pulse: Pulse duration to end a burst
199              :param float pulse_window: pulses are collected for this period of time
200             """
201          received = None
202          recent_count = 0
203          pruning = False
204          while True:
205              while input_pulses:
206                  pulse = input_pulses.popleft()
207                  recent_count += 1
208                  if pulse > max_pulse:
209                      if received is None:
210                          continue
211                      pruning = True
212                  if not pruning:
213                      if received is None:
214                          received = []
215                      received.append(pulse)
216  
217              if recent_count == 0:
218                  return received
219              recent_count = 0
220              time.sleep(pulse_window)
221  
222      def read_pulses(
223          self,
224          input_pulses,
225          *,
226          max_pulse=10000,
227          blocking=True,
228          pulse_window=0.10,
229          blocking_delay=0.10
230      ):
231          """Read out a burst of pulses until pulses stop for a specified
232              period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``.
233  
234              :param ~pulseio.PulseIn input_pulses: Object to read pulses from
235              :param int max_pulse: Pulse duration to end a burst
236              :param bool blocking: If True, will block until pulses found.
237                  If False, will return None if no pulses.
238                  Defaults to True for backwards compatibility
239              :param float pulse_window: pulses are collected for this period of time
240              :param float blocking_delay: delay between pulse checks when blocking
241             """
242          while True:
243              pulses = self._read_pulses_non_blocking(
244                  input_pulses, max_pulse, pulse_window
245              )
246              if blocking and pulses is None:
247                  time.sleep(blocking_delay)
248                  continue
249              return pulses
250  
251  
252  class GenericTransmit:
253      """Generic infrared transmit class that handles encoding."""
254  
255      def __init__(self, header, one, zero, trail):
256          self.header = header
257          self.one = one
258          self.zero = zero
259          self.trail = trail
260  
261      def transmit(self, pulseout, data):
262          """Transmit the ``data`` using the ``pulseout``.
263  
264             :param pulseio.PulseOut pulseout: PulseOut to transmit on
265             :param bytearray data: Data to transmit
266             """
267          durations = array.array("H", [0] * (2 + len(data) * 8 * 2 + 1))
268          durations[0] = self.header[0]
269          durations[1] = self.header[1]
270          durations[-1] = self.trail
271          out = 2
272          for byte_index, _ in enumerate(data):
273              for i in range(7, -1, -1):
274                  if (data[byte_index] & 1 << i) > 0:
275                      durations[out] = self.one[0]
276                      durations[out + 1] = self.one[1]
277                  else:
278                      durations[out] = self.zero[0]
279                      durations[out + 1] = self.zero[1]
280                  out += 2
281  
282          # print(durations)
283          pulseout.send(durations)