/ CircuitPython_Sip_and_Puff / puff_detector.py
puff_detector.py
  1  # SPDX-FileCopyrightText: 2020 Bryan Siepert for Adafruit Industries
  2  #
  3  # SPDX-License-Identifier: MIT
  4  
  5  import time
  6  import os
  7  import json
  8  import board
  9  import terminalio
 10  from adafruit_display_text import label
 11  from displayio import Group
 12  import displayio
 13  import adafruit_displayio_ssd1306
 14  import adafruit_lps35hw
 15  
 16  CONSOLE = False
 17  DEBUG = True
 18  
 19  MIN_PRESSURE = 8
 20  HIGH_PRESSURE = 40
 21  WAITING = 0
 22  STARTED = 1
 23  DETECTED = 2
 24  
 25  SOFT_SIP = 0
 26  HARD_SIP = 1
 27  SOFT_PUFF = 2
 28  HARD_PUFF = 3
 29  
 30  SOFT = 1
 31  STRONG = 2
 32  
 33  COLOR = 0xFFFFFF
 34  FONT = terminalio.FONT
 35  
 36  DISPLAY_WIDTH = 128
 37  DISPLAY_HEIGHT = 64
 38  Y_OFFSET = 3
 39  TEXT_HEIGHT = 8
 40  BOTTOM_ROW = DISPLAY_HEIGHT - TEXT_HEIGHT
 41  BANNER_STRING = "ST LPS33HW Sip & Puff"
 42  pressure_string = " "
 43  input_type_string = " "
 44  # pylint:disable=too-many-locals,exec-used,eval-used
 45  
 46  
 47  class PuffDetector:
 48      def __init__(
 49          self,
 50          min_pressure=MIN_PRESSURE,
 51          high_pressure=HIGH_PRESSURE,
 52          config_filename="settings.json",
 53          display_timeout=1,
 54      ):
 55          # misc detection state
 56          self.current_pressure = 0
 57          self.current_polarity = 0
 58          self.current_time = time.monotonic()
 59          self.start_polarity = 0
 60          self.peak_level = 0
 61          self.puff_start = 0
 62          self.duration = 0
 63          self.state = WAITING
 64          self.prev_state = self.state
 65  
 66          # settings
 67          self.settings_dict = {}
 68          self.high_pressure = high_pressure
 69          self.min_pressure = min_pressure
 70          self._config_filename = config_filename
 71          self._load_config()
 72  
 73          # callbacks
 74          self._on_sip_callbacks = []
 75          self._on_puff_callbacks = []
 76  
 77          # display and display state
 78          self.display = None
 79          self.state_display_start = self.current_time
 80          self.detection_result_str = " "
 81          self.duration_str = " "
 82          self.min_press_str = " "
 83          self.high_press_str = " "
 84          self.state_str = " "
 85          self.press_str = " "
 86          self.display_timeout = display_timeout
 87          self._init_stuff()
 88  
 89      def _init_stuff(self):
 90  
 91          # decouple display
 92          self.state_display_timeout = 1.0
 93          self.state_display_start = 0
 94          displayio.release_displays()
 95          i2c = board.I2C()
 96  
 97          display_bus = displayio.I2CDisplay(i2c, device_address=0x3D)
 98          self.display = adafruit_displayio_ssd1306.SSD1306(
 99              display_bus, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT
100          )
101  
102          self.min_press_str = "min: %d" % self.min_pressure
103          self.high_press_str = "hi: %d" % self.high_pressure
104  
105          self.pressure_sensor = adafruit_lps35hw.LPS35HW(i2c)
106          self.pressure_sensor.zero_pressure()
107          self.pressure_sensor.data_rate = adafruit_lps35hw.DataRate.RATE_75_HZ
108  
109          self.pressure_sensor.filter_enabled = True
110          self.pressure_sensor.filter_config = True
111  
112      def _load_config(self):
113          if not self._config_filename in os.listdir("/"):
114              return
115          try:
116              with open(self._config_filename, "r") as file:
117                  self.settings_dict = json.load(file)
118          except (ValueError, OSError) as error:
119              print("Error loading config file")
120              print(type(error))
121  
122          if self.settings_dict:
123              if "MIN_PRESSURE" in self.settings_dict.keys():
124                  self.min_pressure = self.settings_dict["MIN_PRESSURE"]
125              if "HIGH_PRESSURE" in self.settings_dict.keys():
126                  self.high_pressure = self.settings_dict["HIGH_PRESSURE"]
127              if "DISPLAY_TIMEOUT" in self.settings_dict.keys():
128                  self.display_timeout = self.settings_dict["DISPLAY_TIMEOUT"]
129  
130      def check_for_events(self):
131          self.current_time = time.monotonic()
132          self.current_pressure = self.pressure_sensor.pressure
133          self._update_state()
134          self._notify_callbacks()
135          self._update_display()
136  
137      def run(self):
138          while True:
139              self.check_for_events()
140  
141      def _categorize_pressure(self, pressure):
142          """determine the strength and polarity of the pressure reading"""
143          level = 0
144          polarity = 0
145          abs_pressure = abs(pressure)
146  
147          if abs_pressure > self.min_pressure:
148              level = 1
149          if abs_pressure > self.high_pressure:
150              level = 2
151  
152          if level != 0:
153              if pressure > 0:
154                  polarity = 1
155              else:
156                  polarity = -1
157  
158          return (polarity, level)
159  
160      def on_sip(self, func):
161          self.add_on_sip(func)
162          return func
163  
164      def on_puff(self, func):
165          self.add_on_puff(func)
166          return func
167  
168      def add_on_sip(self, new_callback):
169          self._on_sip_callbacks.append(new_callback)
170  
171      def add_on_puff(self, new_callback):
172          self._on_puff_callbacks.append(new_callback)
173  
174      def _update_state(self):
175          """Updates the internal state to detect if a sip/puff has been started or stopped"""
176  
177          self.current_polarity, level = self._categorize_pressure(self.current_pressure)
178  
179          if self.state == DETECTED:
180              self.state = WAITING
181  
182              self.start_polarity = 0
183              self.peak_level = 0
184              self.duration = 0
185  
186          if (self.state == WAITING) and level != 0 and (self.start_polarity == 0):
187              self.state = STARTED
188              self.start_polarity = self.current_polarity
189              self.puff_start = time.monotonic()
190  
191          if self.state == STARTED:
192              if level > self.peak_level:
193                  self.peak_level = level
194  
195              if level == 0:
196                  self.state = DETECTED
197                  self.duration = time.monotonic() - self.puff_start
198  
199      def _notify_callbacks(self):
200          state_changed = self.prev_state != self.state
201          self.prev_state = self.state
202          if not state_changed:
203              return
204  
205          if self.state == DETECTED:
206  
207              # if this is a sip
208              if self.start_polarity == -1:
209                  for on_sip_callback in self._on_sip_callbacks:
210                      on_sip_callback(self.peak_level, self.duration)
211  
212              # if this is a sip
213              if self.start_polarity == 1:
214                  for on_puff_callback in self._on_puff_callbacks:
215                      on_puff_callback(self.peak_level, self.duration)
216  
217      def _update_display_strings(self):
218  
219          self.press_str = "Press: %0.3f" % self.current_pressure
220  
221          if self.state == DETECTED:
222              self.duration_str = "Duration: %0.2f" % self.duration
223  
224              self.state_str = "DETECTED:"
225              if self.start_polarity == -1:
226                  if self.peak_level == STRONG:
227                      self.detection_result_str = "STRONG SIP"
228                  if self.peak_level == SOFT:
229                      self.detection_result_str = "SOFT SIP"
230  
231              if self.start_polarity == 1:
232                  if self.peak_level == STRONG:
233                      self.detection_result_str = "STRONG PUFF"
234                  if self.peak_level == SOFT:
235                      self.detection_result_str = "SOFT PUFF"
236  
237              self.state_display_start = self.current_time
238  
239          elif self.state == WAITING:
240              display_elapsed = self.current_time - self.state_display_start
241              if display_elapsed > self.display_timeout:
242                  self.detection_result_str = " "
243                  self.duration_str = " "
244                  self.detection_result_str = " "
245                  self.state_str = "WAITING FOR INPUT"
246          elif self.state == STARTED:
247              if self.start_polarity == -1:
248                  self.state_str = "SIP STARTED..."
249  
250              if self.start_polarity == 1:
251                  self.state_str = "PUFF STARTED..."
252  
253      def _update_display(self):
254          self._update_display_strings()
255          banner = label.Label(FONT, text=BANNER_STRING, color=COLOR)
256          state = label.Label(FONT, text=self.state_str, color=COLOR)
257          detector_result = label.Label(FONT, text=self.detection_result_str, color=COLOR)
258          duration = label.Label(FONT, text=self.duration_str, color=COLOR)
259          min_pressure_label = label.Label(FONT, text=self.min_press_str, color=COLOR)
260          high_pressure_label = label.Label(FONT, text=self.high_press_str, color=COLOR)
261          pressure_label = label.Label(FONT, text=self.press_str, color=COLOR)
262  
263          banner.x = 0
264          banner.y = 0 + Y_OFFSET
265  
266          state.x = 10
267          state.y = 10 + Y_OFFSET
268  
269          detector_result.x = 10
270          detector_result.y = 20 + Y_OFFSET
271  
272          duration.x = 10
273          duration.y = 30 + Y_OFFSET
274  
275          min_pressure_label.x = 0
276          min_pressure_label.y = BOTTOM_ROW - 10
277  
278          pressure_label.x = DISPLAY_WIDTH - pressure_label.bounding_box[2]
279          pressure_label.y = BOTTOM_ROW
280  
281          high_pressure_label.x = 0
282          high_pressure_label.y = BOTTOM_ROW
283  
284          splash = Group()
285          splash.append(banner)
286          splash.append(state)
287          splash.append(detector_result)
288          splash.append(duration)
289          splash.append(min_pressure_label)
290          splash.append(high_pressure_label)
291          splash.append(pressure_label)
292  
293          self.display.show(splash)