/ adafruit_irremote.py
adafruit_irremote.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2017 Scott Shawcroft for Adafruit Industries 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `adafruit_irremote` 24 ==================================================== 25 26 Demo code for Circuit Playground Express: 27 28 .. code-block:: python 29 30 # Circuit Playground Express Demo Code 31 # Adjust the pulseio 'board.PIN' if using something else 32 import pulseio 33 import board 34 import adafruit_irremote 35 36 pulsein = pulseio.PulseIn(board.REMOTEIN, maxlen=120, idle_state=True) 37 decoder = adafruit_irremote.GenericDecode() 38 39 40 while True: 41 pulses = decoder.read_pulses(pulsein) 42 print("Heard", len(pulses), "Pulses:", pulses) 43 try: 44 code = decoder.decode_bits(pulses) 45 print("Decoded:", code) 46 except adafruit_irremote.IRNECRepeatException: # unusual short code! 47 print("NEC repeat!") 48 except adafruit_irremote.IRDecodeException as e: # failed to decode 49 print("Failed to decode: ", e.args) 50 51 print("----------------------------") 52 53 * Author(s): Scott Shawcroft 54 55 Implementation Notes 56 -------------------- 57 58 **Hardware:** 59 60 * `CircuitPlayground Express <https://www.adafruit.com/product/3333>`_ 61 62 * `IR Receiver Sensor <https://www.adafruit.com/product/157>`_ 63 64 **Software and Dependencies:** 65 66 * Adafruit CircuitPython firmware for the ESP8622 and M0-based boards: 67 https://github.com/adafruit/circuitpython/releases 68 69 """ 70 71 # Pretend self matter because we may add object level config later. 72 # pylint: disable=no-self-use 73 74 import array 75 import time 76 77 __version__ = "0.0.0-auto.0" 78 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_IRRemote.git" 79 80 81 class IRDecodeException(Exception): 82 """Generic decode exception""" 83 84 85 class IRNECRepeatException(Exception): 86 """Exception when a NEC repeat is decoded""" 87 88 89 class GenericDecode: 90 """Generic decoding of infrared signals""" 91 92 def bin_data(self, pulses): 93 """Compute bins of pulse lengths where pulses are +-25% of the average. 94 95 :param list pulses: Input pulse lengths 96 """ 97 bins = [[pulses[0], 0]] 98 99 for _, pulse in enumerate(pulses): 100 matchedbin = False 101 # print(pulse, end=": ") 102 for b, pulse_bin in enumerate(bins): 103 if pulse_bin[0] * 0.75 <= pulse <= pulse_bin[0] * 1.25: 104 # print("matches bin") 105 bins[b][0] = (pulse_bin[0] + pulse) // 2 # avg em 106 bins[b][1] += 1 # track it 107 matchedbin = True 108 break 109 if not matchedbin: 110 bins.append([pulse, 1]) 111 # print(bins) 112 return bins 113 114 def decode_bits(self, pulses): 115 """Decode the pulses into bits.""" 116 # pylint: disable=too-many-branches,too-many-statements 117 118 # special exception for NEC repeat code! 119 if ( 120 (len(pulses) == 3) 121 and (8000 <= pulses[0] <= 10000) 122 and (2000 <= pulses[1] <= 3000) 123 and (450 <= pulses[2] <= 700) 124 ): 125 raise IRNECRepeatException() 126 127 if len(pulses) < 10: 128 raise IRDecodeException("10 pulses minimum") 129 130 # Ignore any header (evens start at 1), and any trailer. 131 if len(pulses) % 2 == 0: 132 pulses_end = -1 133 else: 134 pulses_end = None 135 136 evens = pulses[1:pulses_end:2] 137 odds = pulses[2:pulses_end:2] 138 139 # bin both halves 140 even_bins = self.bin_data(evens) 141 odd_bins = self.bin_data(odds) 142 143 outliers = [b[0] for b in (even_bins + odd_bins) if b[1] == 1] 144 even_bins = [b for b in even_bins if b[1] > 1] 145 odd_bins = [b for b in odd_bins if b[1] > 1] 146 147 if not even_bins or not odd_bins: 148 raise IRDecodeException("Not enough data") 149 150 if len(even_bins) == 1: 151 pulses = odds 152 pulse_bins = odd_bins 153 elif len(odd_bins) == 1: 154 pulses = evens 155 pulse_bins = even_bins 156 else: 157 raise IRDecodeException("Both even/odd pulses differ") 158 159 if len(pulse_bins) == 1: 160 raise IRDecodeException("Pulses do not differ") 161 if len(pulse_bins) > 2: 162 raise IRDecodeException("Only mark & space handled") 163 164 mark = min(pulse_bins[0][0], pulse_bins[1][0]) 165 space = max(pulse_bins[0][0], pulse_bins[1][0]) 166 167 if outliers: 168 # skip outliers 169 pulses = [ 170 p 171 for p in pulses 172 if not (outliers[0] * 0.75) <= p <= (outliers[0] * 1.25) 173 ] 174 # convert marks/spaces to 0 and 1 175 for i, pulse_length in enumerate(pulses): 176 if (space * 0.75) <= pulse_length <= (space * 1.25): 177 pulses[i] = False 178 elif (mark * 0.75) <= pulse_length <= (mark * 1.25): 179 pulses[i] = True 180 else: 181 raise IRDecodeException("Pulses outside mark/space") 182 183 # convert bits to bytes! 184 output = [0] * ((len(pulses) + 7) // 8) 185 for i, pulse_length in enumerate(pulses): 186 output[i // 8] = output[i // 8] << 1 187 if pulse_length: 188 output[i // 8] |= 1 189 return output 190 191 def _read_pulses_non_blocking( 192 self, input_pulses, max_pulse=10000, pulse_window=0.10 193 ): 194 """Read out a burst of pulses without blocking until pulses stop for a specified 195 period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``. 196 197 :param ~pulseio.PulseIn input_pulses: Object to read pulses from 198 :param int max_pulse: Pulse duration to end a burst 199 :param float pulse_window: pulses are collected for this period of time 200 """ 201 received = None 202 recent_count = 0 203 pruning = False 204 while True: 205 while input_pulses: 206 pulse = input_pulses.popleft() 207 recent_count += 1 208 if pulse > max_pulse: 209 if received is None: 210 continue 211 pruning = True 212 if not pruning: 213 if received is None: 214 received = [] 215 received.append(pulse) 216 217 if recent_count == 0: 218 return received 219 recent_count = 0 220 time.sleep(pulse_window) 221 222 def read_pulses( 223 self, 224 input_pulses, 225 *, 226 max_pulse=10000, 227 blocking=True, 228 pulse_window=0.10, 229 blocking_delay=0.10 230 ): 231 """Read out a burst of pulses until pulses stop for a specified 232 period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``. 233 234 :param ~pulseio.PulseIn input_pulses: Object to read pulses from 235 :param int max_pulse: Pulse duration to end a burst 236 :param bool blocking: If True, will block until pulses found. 237 If False, will return None if no pulses. 238 Defaults to True for backwards compatibility 239 :param float pulse_window: pulses are collected for this period of time 240 :param float blocking_delay: delay between pulse checks when blocking 241 """ 242 while True: 243 pulses = self._read_pulses_non_blocking( 244 input_pulses, max_pulse, pulse_window 245 ) 246 if blocking and pulses is None: 247 time.sleep(blocking_delay) 248 continue 249 return pulses 250 251 252 class GenericTransmit: 253 """Generic infrared transmit class that handles encoding.""" 254 255 def __init__(self, header, one, zero, trail): 256 self.header = header 257 self.one = one 258 self.zero = zero 259 self.trail = trail 260 261 def transmit(self, pulseout, data): 262 """Transmit the ``data`` using the ``pulseout``. 263 264 :param pulseio.PulseOut pulseout: PulseOut to transmit on 265 :param bytearray data: Data to transmit 266 """ 267 durations = array.array("H", [0] * (2 + len(data) * 8 * 2 + 1)) 268 durations[0] = self.header[0] 269 durations[1] = self.header[1] 270 durations[-1] = self.trail 271 out = 2 272 for byte_index, _ in enumerate(data): 273 for i in range(7, -1, -1): 274 if (data[byte_index] & 1 << i) > 0: 275 durations[out] = self.one[0] 276 durations[out + 1] = self.one[1] 277 else: 278 durations[out] = self.zero[0] 279 durations[out + 1] = self.zero[1] 280 out += 2 281 282 # print(durations) 283 pulseout.send(durations)