code.py
  1  # SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
  2  #
  3  # SPDX-License-Identifier: MIT
  4  
  5  import time
  6  import ssl
  7  import wifi
  8  import terminalio
  9  import socketpool
 10  import displayio
 11  import board
 12  from adafruit_display_text import bitmap_label,  wrap_text_to_lines
 13  import adafruit_requests
 14  from digitalio import DigitalInOut, Direction, Pull
 15  from adafruit_debouncer import Debouncer
 16  
 17  alarm_out = DigitalInOut(board.A1)
 18  alarm_out.direction = Direction.OUTPUT
 19  alarm_out.value = False
 20  
 21  button_in = DigitalInOut(board.BUTTON)  # on-board Boot button on Feather ESP32-S2 TFT
 22  button_in.pull = Pull.UP
 23  button = Debouncer(button_in)
 24  
 25  
 26  # Get wifi details and more from a secrets.py file
 27  try:
 28      from secrets import secrets
 29  except ImportError:
 30      print("WiFi secrets are kept in secrets.py, please add them there!")
 31      raise
 32  
 33  print("Adafruit Raspberry Pi In Stock Tweet Listener")
 34  
 35  #  import your bearer token
 36  bear = secrets['bearer_token']
 37  
 38  #  query URL for tweets. looking for hashtag partyparrot sent to a specific username
 39  #  disabling line-too-long because queries for tweet_query & TIME_URL cannot have line breaks
 40  #  pylint: disable=line-too-long
 41  tweet_query = 'https://api.twitter.com/2/tweets/search/recent?query=In Stock at Adafruit from:rpilocator&tweet.fields=created_at'
 42  
 43  headers = {'Authorization': 'Bearer ' + bear}
 44  
 45  print("Connecting to %s"%secrets["ssid"])
 46  wifi.radio.connect(secrets["ssid"], secrets["password"])
 47  print("Connected to %s!"%secrets["ssid"])
 48  print("My IP address is", wifi.radio.ipv4_address)
 49  
 50  pool = socketpool.SocketPool(wifi.radio)
 51  requests = adafruit_requests.Session(pool, ssl.create_default_context())
 52  
 53  #  gets and formats time from adafruit.io
 54  aio_username = secrets["aio_username"]
 55  aio_key = secrets["aio_key"]
 56  location = secrets.get("timezone", None)
 57  TIME_URL = "https://io.adafruit.com/api/v2/%s/integrations/time/strftime?x-aio-key=%s" % (aio_username, aio_key)
 58  TIME_URL += "&fmt=%25Y-%25m-%25dT%25H%3A%25M%3A%25S.%25L%25j%25u%25z%25Z"
 59  
 60  display = board.DISPLAY
 61  
 62  group = displayio.Group()
 63  font = terminalio.FONT
 64  text = ''
 65  text_area = bitmap_label.Label(font, text=text, scale=2, color=0xFFFFFF)
 66  text_area.x = 0
 67  text_area.y = 15
 68  clock = ''
 69  clock_area = bitmap_label.Label(font, text=clock, color=0xFFFFFF)
 70  clock_area.x = 125
 71  clock_area.y = 128
 72  group.append(text_area)
 73  group.append(clock_area)
 74  display.show(group)
 75  
 76  last_value = 0 #  checks last tweet's ID
 77  check = 0 #  time.monotonic() holder
 78  
 79  #  fetching current time
 80  print("Fetching text from", TIME_URL)
 81  current_time = requests.get(TIME_URL)
 82  print("-" * 40)
 83  print(current_time.text)
 84  print("-" * 40)
 85  
 86  alarm_out.value = False  # this starts the alarm so you can test turning it off w the boot button
 87  
 88  
 89  while True:
 90      button.update()
 91      if button.fell:
 92          print("Alarm off")
 93          alarm_out.value = False
 94  
 95      #  every 30 seconds...
 96      if (check + 30) < time.monotonic():
 97          #  updates current time
 98          current_time = requests.get(TIME_URL)
 99          print(current_time.text)
100          #  get tweets from rpilocator containing in stock at adafruit
101          the_tweet = requests.request("GET", url=tweet_query, headers=headers)
102          #  gets data portion of json
103          data = the_tweet.json()['data']
104          #  tweet ID number
105          value = data[0]['id']
106          #  tweet text
107          stock_check = data[0]['text']
108          #  timestamp
109          timestamp = data[0]['created_at']
110          #  reset time count
111          check = time.monotonic()
112          #  compare last tweet ID and current tweet ID
113          if last_value != value:
114              #  compares date of tweet timestamp with current date
115              if timestamp.startswith(current_time.text[0 : 10]):
116                  print("match")
117                  #  grabs the hour of the tweet timestamp
118                  tweet_hour = int(timestamp[11:13])
119                  print(tweet_hour)
120                  #  grabs the current hour
121                  current_hour = int(current_time.text[11:13])
122                  print(current_hour)
123                  #  if it's been less than an hour since the tweet...
124                  if abs(current_hour - tweet_hour) < 1:
125                      print("in the last hour")
126                      #  displays tweet text and time on screen
127                      text_area.text = "\n".join(wrap_text_to_lines(stock_check, 21))
128                      print(stock_check)
129                      clock_area.text = timestamp
130                      print("Raspberry Pi in stock at Adafruit!")
131                      alarm_out.value = True
132  
133                  else:
134                      #  if it's not new, then the wait continues
135                      no_tweet_text = ("No stock in last hour :( Last stock: %s" % (timestamp))
136                      text_area.text = "\n".join(wrap_text_to_lines(no_tweet_text, 21))
137                      print("no new in stock notifications :(")
138              #  updates tweet ID
139              last_value = value
140          #  if the tweet wasn't today
141          else:
142              #  if it's not new, then the wait continues
143              no_tweet_text = ("No stock in last hour :( Last stock: %s" % (timestamp))
144              text_area.text = "\n".join(wrap_text_to_lines(no_tweet_text, 21))
145              print("no new in stock notifications :(")