/ FunHouse_Pet_Bowl_Sensor / code.py
code.py
1 # SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries 2 # SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries 3 # 4 # SPDX-License-Identifier: MIT 5 6 import time 7 import board 8 import digitalio 9 import analogio 10 from adafruit_display_shapes.circle import Circle 11 from adafruit_funhouse import FunHouse 12 13 BOWL_STATE_TOPIC = "funhouse/catbowl/state" 14 LOW_VALUE = 4000 15 EMPTY_VALUE = 2000 16 UPDATE_INTERVAL = 1800 # Every 30 minutes 17 18 try: 19 from secrets import secrets 20 except ImportError: 21 print("WiFi secrets are kept in secrets.py, please add them there!") 22 raise 23 24 # Text labels for the Display 25 states = { 26 "empty": "Add Water", 27 "low": "Low", 28 "full": "Full", 29 } 30 31 def publish_bowl_state(bowl_state): 32 funhouse.peripherals.led = True 33 # Publish the Bowl Level State 34 print("Publishing to {}".format(BOWL_STATE_TOPIC)) 35 funhouse.network.mqtt_publish(BOWL_STATE_TOPIC, bowl_state) 36 funhouse.peripherals.led = False 37 38 def connected(client, userdata, result, payload): 39 status.fill = 0x00FF00 40 status.outline = 0x008800 41 42 def disconnected(client): 43 status.fill = 0xFF0000 44 status.outline = 0x880000 45 46 def get_bowl_reading(): 47 water_enable.value = True 48 level = water_level_sensor.value 49 water_enable.value = False 50 return level 51 52 def get_bowl_state(level): 53 if level <= EMPTY_VALUE: 54 return "empty" 55 elif level <= LOW_VALUE: 56 return "low" 57 return "full" 58 59 def bowl_level_display(water_level): 60 if funhouse.peripherals.button_sel: 61 return water_level 62 return states[get_bowl_state(water_level)] 63 64 # Set Initial States 65 funhouse = FunHouse(default_bg=0x0F0F00) 66 funhouse.peripherals.dotstars.fill(0) 67 water_enable = digitalio.DigitalInOut(board.A0) 68 water_enable.switch_to_output() 69 water_level_sensor = analogio.AnalogIn(board.A1) 70 funhouse.display.show(None) 71 funhouse.add_text( 72 text="Bowl Level:", 73 text_position=(120, 60), 74 text_anchor_point=(0.5, 0.5), 75 text_color=0xFF0000, 76 text_font="fonts/Arial-Bold-24.pcf", 77 ) 78 level_label = funhouse.add_text( 79 text_position=(120, 100), 80 text_anchor_point=(0.5, 0.5), 81 text_color=0xFFFF00, 82 text_font="fonts/Arial-Bold-24.pcf", 83 ) 84 funhouse.display.show(funhouse.splash) 85 86 status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000) 87 funhouse.splash.append(status) 88 89 # Initialize a new MQTT Client object 90 funhouse.network.init_mqtt( 91 secrets["mqtt_broker"], 92 secrets["mqtt_port"], 93 secrets["mqtt_username"], 94 secrets["mqtt_password"], 95 ) 96 funhouse.network.on_mqtt_connect = connected 97 funhouse.network.on_mqtt_disconnect = disconnected 98 99 print("Attempting to connect to {}".format(secrets["mqtt_broker"])) 100 funhouse.network.mqtt_connect() 101 102 last_reading_timestamp = None 103 last_bowl_state = None 104 105 while True: 106 if last_reading_timestamp is None or time.monotonic() > last_reading_timestamp + UPDATE_INTERVAL: 107 # Take Reading 108 water_level = get_bowl_reading() 109 # Update Display 110 funhouse.set_text(bowl_level_display(water_level), level_label) 111 # If changed, publish new result 112 bowl_state = get_bowl_state(water_level) 113 if bowl_state != last_bowl_state: 114 publish_bowl_state(bowl_state) 115 last_bowl_state = bowl_state 116 last_reading_timestamp = time.monotonic()