/ Purple_Air_Display / code.py
code.py
1 # SPDX-FileCopyrightText: 2020 John Park for Adafruit Industries 2 # 3 # SPDX-License-Identifier: MIT 4 5 # Purple Air AQI Display 6 # for Metro M4 Airlift with RGB Matrix Shield 7 # or Matrix Portal 8 # and 64 x 32 RGB LED Matrix 9 10 import time 11 import board 12 import terminalio 13 from adafruit_matrixportal.matrixportal import MatrixPortal 14 15 def aqi_transform(val): 16 aqi = pm_to_aqi(val) # derive Air Quality Index from Particulate Matter 2.5 value 17 return "AQI: %d" % aqi 18 19 def message_transform(val): # picks message based on thresholds 20 index = aqi_to_list_index(pm_to_aqi(val)) 21 messages = ( 22 "Hazardous", 23 "Very Unhealthy", 24 "Unhealthy", 25 "Unhealthy for Sensitive Groups", 26 "Moderate", 27 "Good", 28 ) 29 30 if index is not None: 31 return messages[index] 32 return "Unknown" 33 34 SENSOR_ID = 3085 # Poughkeepsie # 30183 LA outdoor / 37823 oregon / 21441 NYC 35 SENSOR_REFRESH_PERIOD = 30 # seconds 36 DATA_SOURCE = "https://www.purpleair.com/json?show=" + str(SENSOR_ID) 37 SCROLL_DELAY = 0.02 38 DATA_LOCATION = ["results", 0, "PM2_5Value"] # navigate the JSON response 39 40 # --- Display setup --- 41 matrixportal = MatrixPortal( 42 status_neopixel=board.NEOPIXEL, 43 debug=True, 44 url=DATA_SOURCE, 45 json_path=(DATA_LOCATION, DATA_LOCATION), 46 ) 47 48 # Create a static label to show AQI 49 matrixportal.add_text( 50 text_font=terminalio.FONT, 51 text_position=(8, 7), 52 text_transform=aqi_transform, 53 ) 54 55 # Create a scrolling label to show level message 56 matrixportal.add_text( 57 text_font=terminalio.FONT, 58 text_position=(0, 23), 59 scrolling=True, 60 text_transform=message_transform, 61 ) 62 # pylint: disable=too-many-return-statements 63 def aqi_to_list_index(aqi): 64 aqi_groups = (301, 201, 151, 101, 51, 0) 65 for index, group in enumerate(aqi_groups): 66 if aqi >= group: 67 return index 68 return None 69 70 def calculate_aqi(Cp, Ih, Il, BPh, BPl): # wikipedia.org/wiki/Air_quality_index#Computing_the_AQI 71 return round(((Ih - Il)/(BPh - BPl)) * (Cp - BPl) + Il) 72 73 def pm_to_aqi(pm): 74 pm = float(pm) 75 if pm < 0: 76 return pm 77 if pm > 1000: 78 return 1000 79 if pm > 350.5: 80 return calculate_aqi(pm, 500, 401, 500, 350.5) 81 elif pm > 250.5: 82 return calculate_aqi(pm, 400, 301, 350.4, 250.5) 83 elif pm > 150.5: 84 return calculate_aqi(pm, 300, 201, 250.4, 150.5) 85 elif pm > 55.5: 86 return calculate_aqi(pm, 200, 151, 150.4, 55.5) 87 elif pm > 35.5: 88 return calculate_aqi(pm, 150, 101, 55.4, 35.5) 89 elif pm > 12.1: 90 return calculate_aqi(pm, 100, 51, 35.4, 12.1) 91 elif pm >= 0: 92 return calculate_aqi(pm, 50, 0, 12, 0) 93 else: 94 return None 95 96 def get_color(aqi): 97 index = aqi_to_list_index(aqi) 98 colors = ( 99 (115, 20, 37), 100 (140, 26, 75), 101 (234, 51, 36), 102 (239, 133, 51), 103 (255, 255, 85), 104 (104, 225, 67), 105 ) 106 if index is not None: 107 return colors[index] 108 return (150, 150, 150) 109 110 sensor_refresh = None 111 while True: 112 # only query the weather every 10 minutes (and on first run) 113 if (not sensor_refresh) or (time.monotonic() - sensor_refresh) > SENSOR_REFRESH_PERIOD: 114 try: 115 value = matrixportal.fetch() 116 print("Response is", value) 117 matrixportal.set_text_color(get_color(pm_to_aqi(value[0]))) 118 sensor_refresh = time.monotonic() 119 except RuntimeError as e: 120 print("Some error occured, retrying! -", e) 121 continue 122 123 # Scroll it 124 matrixportal.scroll_text(SCROLL_DELAY)