/ CircuitPython_Sip_and_Puff / puff_detector.py
puff_detector.py
1 # SPDX-FileCopyrightText: 2020 Bryan Siepert for Adafruit Industries 2 # 3 # SPDX-License-Identifier: MIT 4 5 import time 6 import os 7 import json 8 import board 9 import terminalio 10 from adafruit_display_text import label 11 from displayio import Group 12 import displayio 13 import adafruit_displayio_ssd1306 14 import adafruit_lps35hw 15 16 CONSOLE = False 17 DEBUG = True 18 19 MIN_PRESSURE = 8 20 HIGH_PRESSURE = 40 21 WAITING = 0 22 STARTED = 1 23 DETECTED = 2 24 25 SOFT_SIP = 0 26 HARD_SIP = 1 27 SOFT_PUFF = 2 28 HARD_PUFF = 3 29 30 SOFT = 1 31 STRONG = 2 32 33 COLOR = 0xFFFFFF 34 FONT = terminalio.FONT 35 36 DISPLAY_WIDTH = 128 37 DISPLAY_HEIGHT = 64 38 Y_OFFSET = 3 39 TEXT_HEIGHT = 8 40 BOTTOM_ROW = DISPLAY_HEIGHT - TEXT_HEIGHT 41 BANNER_STRING = "ST LPS33HW Sip & Puff" 42 pressure_string = " " 43 input_type_string = " " 44 # pylint:disable=too-many-locals,exec-used,eval-used 45 46 47 class PuffDetector: 48 def __init__( 49 self, 50 min_pressure=MIN_PRESSURE, 51 high_pressure=HIGH_PRESSURE, 52 config_filename="settings.json", 53 display_timeout=1, 54 ): 55 # misc detection state 56 self.current_pressure = 0 57 self.current_polarity = 0 58 self.current_time = time.monotonic() 59 self.start_polarity = 0 60 self.peak_level = 0 61 self.puff_start = 0 62 self.duration = 0 63 self.state = WAITING 64 self.prev_state = self.state 65 66 # settings 67 self.settings_dict = {} 68 self.high_pressure = high_pressure 69 self.min_pressure = min_pressure 70 self._config_filename = config_filename 71 self._load_config() 72 73 # callbacks 74 self._on_sip_callbacks = [] 75 self._on_puff_callbacks = [] 76 77 # display and display state 78 self.display = None 79 self.state_display_start = self.current_time 80 self.detection_result_str = " " 81 self.duration_str = " " 82 self.min_press_str = " " 83 self.high_press_str = " " 84 self.state_str = " " 85 self.press_str = " " 86 self.display_timeout = display_timeout 87 self._init_stuff() 88 89 def _init_stuff(self): 90 91 # decouple display 92 self.state_display_timeout = 1.0 93 self.state_display_start = 0 94 displayio.release_displays() 95 i2c = board.I2C() 96 97 display_bus = displayio.I2CDisplay(i2c, device_address=0x3D) 98 self.display = adafruit_displayio_ssd1306.SSD1306( 99 display_bus, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT 100 ) 101 102 self.min_press_str = "min: %d" % self.min_pressure 103 self.high_press_str = "hi: %d" % self.high_pressure 104 105 self.pressure_sensor = adafruit_lps35hw.LPS35HW(i2c) 106 self.pressure_sensor.zero_pressure() 107 self.pressure_sensor.data_rate = adafruit_lps35hw.DataRate.RATE_75_HZ 108 109 self.pressure_sensor.filter_enabled = True 110 self.pressure_sensor.filter_config = True 111 112 def _load_config(self): 113 if not self._config_filename in os.listdir("/"): 114 return 115 try: 116 with open(self._config_filename, "r") as file: 117 self.settings_dict = json.load(file) 118 except (ValueError, OSError) as error: 119 print("Error loading config file") 120 print(type(error)) 121 122 if self.settings_dict: 123 if "MIN_PRESSURE" in self.settings_dict.keys(): 124 self.min_pressure = self.settings_dict["MIN_PRESSURE"] 125 if "HIGH_PRESSURE" in self.settings_dict.keys(): 126 self.high_pressure = self.settings_dict["HIGH_PRESSURE"] 127 if "DISPLAY_TIMEOUT" in self.settings_dict.keys(): 128 self.display_timeout = self.settings_dict["DISPLAY_TIMEOUT"] 129 130 def check_for_events(self): 131 self.current_time = time.monotonic() 132 self.current_pressure = self.pressure_sensor.pressure 133 self._update_state() 134 self._notify_callbacks() 135 self._update_display() 136 137 def run(self): 138 while True: 139 self.check_for_events() 140 141 def _categorize_pressure(self, pressure): 142 """determine the strength and polarity of the pressure reading""" 143 level = 0 144 polarity = 0 145 abs_pressure = abs(pressure) 146 147 if abs_pressure > self.min_pressure: 148 level = 1 149 if abs_pressure > self.high_pressure: 150 level = 2 151 152 if level != 0: 153 if pressure > 0: 154 polarity = 1 155 else: 156 polarity = -1 157 158 return (polarity, level) 159 160 def on_sip(self, func): 161 self.add_on_sip(func) 162 return func 163 164 def on_puff(self, func): 165 self.add_on_puff(func) 166 return func 167 168 def add_on_sip(self, new_callback): 169 self._on_sip_callbacks.append(new_callback) 170 171 def add_on_puff(self, new_callback): 172 self._on_puff_callbacks.append(new_callback) 173 174 def _update_state(self): 175 """Updates the internal state to detect if a sip/puff has been started or stopped""" 176 177 self.current_polarity, level = self._categorize_pressure(self.current_pressure) 178 179 if self.state == DETECTED: 180 self.state = WAITING 181 182 self.start_polarity = 0 183 self.peak_level = 0 184 self.duration = 0 185 186 if (self.state == WAITING) and level != 0 and (self.start_polarity == 0): 187 self.state = STARTED 188 self.start_polarity = self.current_polarity 189 self.puff_start = time.monotonic() 190 191 if self.state == STARTED: 192 if level > self.peak_level: 193 self.peak_level = level 194 195 if level == 0: 196 self.state = DETECTED 197 self.duration = time.monotonic() - self.puff_start 198 199 def _notify_callbacks(self): 200 state_changed = self.prev_state != self.state 201 self.prev_state = self.state 202 if not state_changed: 203 return 204 205 if self.state == DETECTED: 206 207 # if this is a sip 208 if self.start_polarity == -1: 209 for on_sip_callback in self._on_sip_callbacks: 210 on_sip_callback(self.peak_level, self.duration) 211 212 # if this is a sip 213 if self.start_polarity == 1: 214 for on_puff_callback in self._on_puff_callbacks: 215 on_puff_callback(self.peak_level, self.duration) 216 217 def _update_display_strings(self): 218 219 self.press_str = "Press: %0.3f" % self.current_pressure 220 221 if self.state == DETECTED: 222 self.duration_str = "Duration: %0.2f" % self.duration 223 224 self.state_str = "DETECTED:" 225 if self.start_polarity == -1: 226 if self.peak_level == STRONG: 227 self.detection_result_str = "STRONG SIP" 228 if self.peak_level == SOFT: 229 self.detection_result_str = "SOFT SIP" 230 231 if self.start_polarity == 1: 232 if self.peak_level == STRONG: 233 self.detection_result_str = "STRONG PUFF" 234 if self.peak_level == SOFT: 235 self.detection_result_str = "SOFT PUFF" 236 237 self.state_display_start = self.current_time 238 239 elif self.state == WAITING: 240 display_elapsed = self.current_time - self.state_display_start 241 if display_elapsed > self.display_timeout: 242 self.detection_result_str = " " 243 self.duration_str = " " 244 self.detection_result_str = " " 245 self.state_str = "WAITING FOR INPUT" 246 elif self.state == STARTED: 247 if self.start_polarity == -1: 248 self.state_str = "SIP STARTED..." 249 250 if self.start_polarity == 1: 251 self.state_str = "PUFF STARTED..." 252 253 def _update_display(self): 254 self._update_display_strings() 255 banner = label.Label(FONT, text=BANNER_STRING, color=COLOR) 256 state = label.Label(FONT, text=self.state_str, color=COLOR) 257 detector_result = label.Label(FONT, text=self.detection_result_str, color=COLOR) 258 duration = label.Label(FONT, text=self.duration_str, color=COLOR) 259 min_pressure_label = label.Label(FONT, text=self.min_press_str, color=COLOR) 260 high_pressure_label = label.Label(FONT, text=self.high_press_str, color=COLOR) 261 pressure_label = label.Label(FONT, text=self.press_str, color=COLOR) 262 263 banner.x = 0 264 banner.y = 0 + Y_OFFSET 265 266 state.x = 10 267 state.y = 10 + Y_OFFSET 268 269 detector_result.x = 10 270 detector_result.y = 20 + Y_OFFSET 271 272 duration.x = 10 273 duration.y = 30 + Y_OFFSET 274 275 min_pressure_label.x = 0 276 min_pressure_label.y = BOTTOM_ROW - 10 277 278 pressure_label.x = DISPLAY_WIDTH - pressure_label.bounding_box[2] 279 pressure_label.y = BOTTOM_ROW 280 281 high_pressure_label.x = 0 282 high_pressure_label.y = BOTTOM_ROW 283 284 splash = Group() 285 splash.append(banner) 286 splash.append(state) 287 splash.append(detector_result) 288 splash.append(duration) 289 splash.append(min_pressure_label) 290 splash.append(high_pressure_label) 291 splash.append(pressure_label) 292 293 self.display.show(splash)