code.py
  1  # SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
  2  # SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
  3  #
  4  # SPDX-License-Identifier: MIT
  5  
  6  import time
  7  import board
  8  import digitalio
  9  import analogio
 10  from adafruit_display_shapes.circle import Circle
 11  from adafruit_funhouse import FunHouse
 12  
 13  BOWL_STATE_TOPIC = "funhouse/catbowl/state"
 14  LOW_VALUE = 4000
 15  EMPTY_VALUE = 2000
 16  UPDATE_INTERVAL = 1800	# Every 30 minutes
 17  
 18  try:
 19      from secrets import secrets
 20  except ImportError:
 21      print("WiFi secrets are kept in secrets.py, please add them there!")
 22      raise
 23  
 24  # Text labels for the Display
 25  states = {
 26      "empty": "Add Water",
 27      "low": "Low",
 28      "full": "Full",
 29  }
 30  
 31  def publish_bowl_state(bowl_state):
 32      funhouse.peripherals.led = True
 33      # Publish the Bowl Level State
 34      print("Publishing to {}".format(BOWL_STATE_TOPIC))
 35      funhouse.network.mqtt_publish(BOWL_STATE_TOPIC, bowl_state)
 36      funhouse.peripherals.led = False
 37  
 38  def connected(client, userdata, result, payload):
 39      status.fill = 0x00FF00
 40      status.outline = 0x008800
 41  
 42  def disconnected(client):
 43      status.fill = 0xFF0000
 44      status.outline = 0x880000
 45  
 46  def get_bowl_reading():
 47      water_enable.value = True
 48      level = water_level_sensor.value
 49      water_enable.value = False
 50      return level
 51  
 52  def get_bowl_state(level):
 53      if level <= EMPTY_VALUE:
 54          return "empty"
 55      elif level <= LOW_VALUE:
 56          return "low"
 57      return "full"
 58  
 59  def bowl_level_display(water_level):
 60      if funhouse.peripherals.button_sel:
 61          return water_level
 62      return states[get_bowl_state(water_level)]
 63  
 64  # Set Initial States
 65  funhouse = FunHouse(default_bg=0x0F0F00)
 66  funhouse.peripherals.dotstars.fill(0)
 67  water_enable = digitalio.DigitalInOut(board.A0)
 68  water_enable.switch_to_output()
 69  water_level_sensor = analogio.AnalogIn(board.A1)
 70  funhouse.display.show(None)
 71  funhouse.add_text(
 72      text="Bowl Level:",
 73      text_position=(120, 60),
 74      text_anchor_point=(0.5, 0.5),
 75      text_color=0xFF0000,
 76      text_font="fonts/Arial-Bold-24.pcf",
 77  )
 78  level_label = funhouse.add_text(
 79      text_position=(120, 100),
 80      text_anchor_point=(0.5, 0.5),
 81      text_color=0xFFFF00,
 82      text_font="fonts/Arial-Bold-24.pcf",
 83  )
 84  funhouse.display.show(funhouse.splash)
 85  
 86  status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000)
 87  funhouse.splash.append(status)
 88  
 89  # Initialize a new MQTT Client object
 90  funhouse.network.init_mqtt(
 91      secrets["mqtt_broker"],
 92      secrets["mqtt_port"],
 93      secrets["mqtt_username"],
 94      secrets["mqtt_password"],
 95  )
 96  funhouse.network.on_mqtt_connect = connected
 97  funhouse.network.on_mqtt_disconnect = disconnected
 98  
 99  print("Attempting to connect to {}".format(secrets["mqtt_broker"]))
100  funhouse.network.mqtt_connect()
101  
102  last_reading_timestamp = None
103  last_bowl_state = None
104  
105  while True:
106      if last_reading_timestamp is None or time.monotonic() > last_reading_timestamp + UPDATE_INTERVAL:
107          # Take Reading
108          water_level = get_bowl_reading()
109          # Update Display
110          funhouse.set_text(bowl_level_display(water_level), level_label)
111          # If changed, publish new result
112          bowl_state = get_bowl_state(water_level)
113          if bowl_state != last_bowl_state:
114              publish_bowl_state(bowl_state)
115              last_bowl_state = bowl_state
116          last_reading_timestamp = time.monotonic()