/ FunHouse_HA_Companion / code.py
code.py
1 # SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries 2 # SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries 3 # 4 # SPDX-License-Identifier: MIT 5 import time 6 import json 7 from adafruit_display_shapes.circle import Circle 8 from adafruit_funhouse import FunHouse 9 10 PUBLISH_DELAY = 60 11 ENVIRONMENT_CHECK_DELAY = 5 12 ENABLE_PIR = True 13 MQTT_TOPIC = "funhouse/state" 14 LIGHT_STATE_TOPIC = "funhouse/light/state" 15 LIGHT_COMMAND_TOPIC = "funhouse/light/set" 16 INITIAL_LIGHT_COLOR = 0x008000 17 USE_FAHRENHEIT = True 18 19 try: 20 from secrets import secrets 21 except ImportError: 22 print("WiFi secrets are kept in secrets.py, please add them there!") 23 raise 24 25 funhouse = FunHouse(default_bg=0x0F0F00) 26 funhouse.peripherals.dotstars.fill(INITIAL_LIGHT_COLOR) 27 28 # Don't display the splash yet to avoid 29 # redrawing labels after each one is added 30 funhouse.display.show(None) 31 32 # Add the labels 33 funhouse.add_text( 34 text="Temperature:", 35 text_position=(20, 30), 36 text_color=0xFF8888, 37 text_font="fonts/Arial-Bold-24.pcf", 38 ) 39 temp_label = funhouse.add_text( 40 text_position=(120, 60), 41 text_anchor_point=(0.5, 0.5), 42 text_color=0xFFFF00, 43 text_font="fonts/Arial-Bold-24.pcf", 44 ) 45 funhouse.add_text( 46 text="Humidity:", 47 text_position=(20, 100), 48 text_color=0x8888FF, 49 text_font="fonts/Arial-Bold-24.pcf", 50 ) 51 hum_label = funhouse.add_text( 52 text_position=(120, 130), 53 text_anchor_point=(0.5, 0.5), 54 text_color=0xFFFF00, 55 text_font="fonts/Arial-Bold-24.pcf", 56 ) 57 funhouse.add_text( 58 text="Pressure:", 59 text_position=(20, 170), 60 text_color=0xFF88FF, 61 text_font="fonts/Arial-Bold-24.pcf", 62 ) 63 pres_label = funhouse.add_text( 64 text_position=(120, 200), 65 text_anchor_point=(0.5, 0.5), 66 text_color=0xFFFF00, 67 text_font="fonts/Arial-Bold-24.pcf", 68 ) 69 70 # Now display the splash to draw all labels at once 71 funhouse.display.show(funhouse.splash) 72 73 status = Circle(229, 10, 10, fill=0xFF0000, outline=0x880000) 74 funhouse.splash.append(status) 75 76 def update_enviro(): 77 global environment 78 79 temp = funhouse.peripherals.temperature 80 unit = "C" 81 if USE_FAHRENHEIT: 82 temp = temp * (9 / 5) + 32 83 unit = "F" 84 85 environment["temperature"] = temp 86 environment["pressure"] = funhouse.peripherals.pressure 87 environment["humidity"] = funhouse.peripherals.relative_humidity 88 environment["light"] = funhouse.peripherals.light 89 90 funhouse.set_text("{:.1f}{}".format(environment["temperature"], unit), temp_label) 91 funhouse.set_text("{:.1f}%".format(environment["humidity"]), hum_label) 92 funhouse.set_text("{}hPa".format(environment["pressure"]), pres_label) 93 94 95 def connected(client, userdata, result, payload): 96 status.fill = 0x00FF00 97 status.outline = 0x008800 98 print("Connected to MQTT! Subscribing...") 99 client.subscribe(LIGHT_COMMAND_TOPIC) 100 101 102 def disconnected(client): 103 status.fill = 0xFF0000 104 status.outline = 0x880000 105 106 107 def message(client, topic, payload): 108 print("Topic {0} received new value: {1}".format(topic, payload)) 109 if topic == LIGHT_COMMAND_TOPIC: 110 settings = json.loads(payload) 111 if settings["state"] == "on": 112 if "brightness" in settings: 113 funhouse.peripherals.dotstars.brightness = settings["brightness"] / 255 114 else: 115 funhouse.peripherals.dotstars.brightness = 0.3 116 if "color" in settings: 117 funhouse.peripherals.dotstars.fill(settings["color"]) 118 else: 119 funhouse.peripherals.dotstars.brightness = 0 120 publish_light_state() 121 122 123 def publish_light_state(): 124 funhouse.peripherals.led = True 125 output = { 126 "brightness": round(funhouse.peripherals.dotstars.brightness * 255), 127 "state": "on" if funhouse.peripherals.dotstars.brightness > 0 else "off", 128 "color": funhouse.peripherals.dotstars[0], 129 } 130 # Publish the Dotstar State 131 print("Publishing to {}".format(LIGHT_STATE_TOPIC)) 132 funhouse.network.mqtt_publish(LIGHT_STATE_TOPIC, json.dumps(output)) 133 funhouse.peripherals.led = False 134 135 136 # Initialize a new MQTT Client object 137 funhouse.network.init_mqtt( 138 secrets["mqtt_broker"], 139 secrets["mqtt_port"], 140 secrets["mqtt_username"], 141 secrets["mqtt_password"], 142 ) 143 funhouse.network.on_mqtt_connect = connected 144 funhouse.network.on_mqtt_disconnect = disconnected 145 funhouse.network.on_mqtt_message = message 146 147 print("Attempting to connect to {}".format(secrets["mqtt_broker"])) 148 funhouse.network.mqtt_connect() 149 150 last_publish_timestamp = None 151 152 last_peripheral_state = { 153 "button_up": funhouse.peripherals.button_up, 154 "button_down": funhouse.peripherals.button_down, 155 "button_sel": funhouse.peripherals.button_sel, 156 "captouch6": funhouse.peripherals.captouch6, 157 "captouch7": funhouse.peripherals.captouch7, 158 "captouch8": funhouse.peripherals.captouch8, 159 } 160 161 if ENABLE_PIR: 162 last_peripheral_state["pir_sensor"] = funhouse.peripherals.pir_sensor 163 164 environment = {} 165 update_enviro() 166 last_environment_timestamp = time.monotonic() 167 168 # Provide Initial light state 169 publish_light_state() 170 171 while True: 172 if not environment or ( 173 time.monotonic() - last_environment_timestamp > ENVIRONMENT_CHECK_DELAY 174 ): 175 update_enviro() 176 last_environment_timestamp = time.monotonic() 177 output = environment 178 179 peripheral_state_changed = False 180 for peripheral in last_peripheral_state: 181 current_item_state = getattr(funhouse.peripherals, peripheral) 182 output[peripheral] = "on" if current_item_state else "off" 183 if last_peripheral_state[peripheral] != current_item_state: 184 peripheral_state_changed = True 185 last_peripheral_state[peripheral] = current_item_state 186 187 if funhouse.peripherals.slider is not None: 188 output["slider"] = funhouse.peripherals.slider 189 peripheral_state_changed = True 190 191 # Every PUBLISH_DELAY, write temp/hum/press/light or if a peripheral changed 192 if ( 193 last_publish_timestamp is None 194 or peripheral_state_changed 195 or (time.monotonic() - last_publish_timestamp) > PUBLISH_DELAY 196 ): 197 funhouse.peripherals.led = True 198 print("Publishing to {}".format(MQTT_TOPIC)) 199 funhouse.network.mqtt_publish(MQTT_TOPIC, json.dumps(output)) 200 funhouse.peripherals.led = False 201 last_publish_timestamp = time.monotonic() 202 203 # Check any topics we are subscribed to 204 funhouse.network.mqtt_loop(0.5)