code.py
  1  # SPDX-FileCopyrightText: 2020 John Park for Adafruit Industries
  2  #
  3  # SPDX-License-Identifier: MIT
  4  
  5  # Purple Air AQI Display
  6  # for Metro M4 Airlift with RGB Matrix Shield
  7  # or Matrix Portal
  8  # and 64 x 32 RGB LED Matrix
  9  
 10  import time
 11  import board
 12  import terminalio
 13  from adafruit_matrixportal.matrixportal import MatrixPortal
 14  
 15  def aqi_transform(val):
 16      aqi = pm_to_aqi(val)  # derive Air Quality Index from Particulate Matter 2.5 value
 17      return "AQI: %d" % aqi
 18  
 19  def message_transform(val):  # picks message based on thresholds
 20      index = aqi_to_list_index(pm_to_aqi(val))
 21      messages = (
 22          "Hazardous",
 23          "Very Unhealthy",
 24          "Unhealthy",
 25          "Unhealthy for Sensitive Groups",
 26          "Moderate",
 27          "Good",
 28      )
 29  
 30      if index is not None:
 31          return messages[index]
 32      return "Unknown"
 33  
 34  SENSOR_ID = 3085 # Poughkeepsie  # 30183  LA outdoor  / 37823  oregon  / 21441   NYC
 35  SENSOR_REFRESH_PERIOD = 30  # seconds
 36  DATA_SOURCE = "https://www.purpleair.com/json?show=" + str(SENSOR_ID)
 37  SCROLL_DELAY = 0.02
 38  DATA_LOCATION = ["results", 0, "PM2_5Value"]  # navigate the JSON response
 39  
 40  # --- Display setup ---
 41  matrixportal = MatrixPortal(
 42      status_neopixel=board.NEOPIXEL,
 43      debug=True,
 44      url=DATA_SOURCE,
 45      json_path=(DATA_LOCATION, DATA_LOCATION),
 46  )
 47  
 48  # Create a static label to show AQI
 49  matrixportal.add_text(
 50      text_font=terminalio.FONT,
 51      text_position=(8, 7),
 52      text_transform=aqi_transform,
 53  )
 54  
 55  # Create a scrolling label to show level message
 56  matrixportal.add_text(
 57      text_font=terminalio.FONT,
 58      text_position=(0, 23),
 59      scrolling=True,
 60      text_transform=message_transform,
 61  )
 62  # pylint: disable=too-many-return-statements
 63  def aqi_to_list_index(aqi):
 64      aqi_groups = (301, 201, 151, 101, 51, 0)
 65      for index, group in enumerate(aqi_groups):
 66          if aqi >= group:
 67              return index
 68      return None
 69  
 70  def calculate_aqi(Cp, Ih, Il, BPh, BPl):  # wikipedia.org/wiki/Air_quality_index#Computing_the_AQI
 71      return round(((Ih - Il)/(BPh - BPl)) * (Cp - BPl) + Il)
 72  
 73  def pm_to_aqi(pm):
 74      pm = float(pm)
 75      if pm < 0:
 76          return pm
 77      if pm > 1000:
 78          return 1000
 79      if pm > 350.5:
 80          return calculate_aqi(pm, 500, 401, 500, 350.5)
 81      elif pm > 250.5:
 82          return calculate_aqi(pm, 400, 301, 350.4, 250.5)
 83      elif pm > 150.5:
 84          return calculate_aqi(pm, 300, 201, 250.4, 150.5)
 85      elif pm > 55.5:
 86          return calculate_aqi(pm, 200, 151, 150.4, 55.5)
 87      elif pm > 35.5:
 88          return calculate_aqi(pm, 150, 101, 55.4, 35.5)
 89      elif pm > 12.1:
 90          return calculate_aqi(pm, 100, 51, 35.4, 12.1)
 91      elif pm >= 0:
 92          return calculate_aqi(pm, 50, 0, 12, 0)
 93      else:
 94          return None
 95  
 96  def get_color(aqi):
 97      index = aqi_to_list_index(aqi)
 98      colors = (
 99          (115, 20, 37),
100          (140, 26, 75),
101          (234, 51, 36),
102          (239, 133, 51),
103          (255, 255, 85),
104          (104, 225, 67),
105      )
106      if index is not None:
107          return colors[index]
108      return (150, 150, 150)
109  
110  sensor_refresh = None
111  while True:
112      # only query the weather every 10 minutes (and on first run)
113      if (not sensor_refresh) or (time.monotonic() - sensor_refresh) > SENSOR_REFRESH_PERIOD:
114          try:
115              value = matrixportal.fetch()
116              print("Response is", value)
117              matrixportal.set_text_color(get_color(pm_to_aqi(value[0])))
118              sensor_refresh = time.monotonic()
119          except RuntimeError as e:
120              print("Some error occured, retrying! -", e)
121              continue
122  
123      # Scroll it
124      matrixportal.scroll_text(SCROLL_DELAY)