/ Pi_In_Stock_Notifier / code.py
code.py
1 # SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries 2 # 3 # SPDX-License-Identifier: MIT 4 5 import time 6 import ssl 7 import wifi 8 import terminalio 9 import socketpool 10 import displayio 11 import board 12 from adafruit_display_text import bitmap_label, wrap_text_to_lines 13 import adafruit_requests 14 from digitalio import DigitalInOut, Direction, Pull 15 from adafruit_debouncer import Debouncer 16 17 alarm_out = DigitalInOut(board.A1) 18 alarm_out.direction = Direction.OUTPUT 19 alarm_out.value = False 20 21 button_in = DigitalInOut(board.BUTTON) # on-board Boot button on Feather ESP32-S2 TFT 22 button_in.pull = Pull.UP 23 button = Debouncer(button_in) 24 25 26 # Get wifi details and more from a secrets.py file 27 try: 28 from secrets import secrets 29 except ImportError: 30 print("WiFi secrets are kept in secrets.py, please add them there!") 31 raise 32 33 print("Adafruit Raspberry Pi In Stock Tweet Listener") 34 35 # import your bearer token 36 bear = secrets['bearer_token'] 37 38 # query URL for tweets. looking for hashtag partyparrot sent to a specific username 39 # disabling line-too-long because queries for tweet_query & TIME_URL cannot have line breaks 40 # pylint: disable=line-too-long 41 tweet_query = 'https://api.twitter.com/2/tweets/search/recent?query=In Stock at Adafruit from:rpilocator&tweet.fields=created_at' 42 43 headers = {'Authorization': 'Bearer ' + bear} 44 45 print("Connecting to %s"%secrets["ssid"]) 46 wifi.radio.connect(secrets["ssid"], secrets["password"]) 47 print("Connected to %s!"%secrets["ssid"]) 48 print("My IP address is", wifi.radio.ipv4_address) 49 50 pool = socketpool.SocketPool(wifi.radio) 51 requests = adafruit_requests.Session(pool, ssl.create_default_context()) 52 53 # gets and formats time from adafruit.io 54 aio_username = secrets["aio_username"] 55 aio_key = secrets["aio_key"] 56 location = secrets.get("timezone", None) 57 TIME_URL = "https://io.adafruit.com/api/v2/%s/integrations/time/strftime?x-aio-key=%s" % (aio_username, aio_key) 58 TIME_URL += "&fmt=%25Y-%25m-%25dT%25H%3A%25M%3A%25S.%25L%25j%25u%25z%25Z" 59 60 display = board.DISPLAY 61 62 group = displayio.Group() 63 font = terminalio.FONT 64 text = '' 65 text_area = bitmap_label.Label(font, text=text, scale=2, color=0xFFFFFF) 66 text_area.x = 0 67 text_area.y = 15 68 clock = '' 69 clock_area = bitmap_label.Label(font, text=clock, color=0xFFFFFF) 70 clock_area.x = 125 71 clock_area.y = 128 72 group.append(text_area) 73 group.append(clock_area) 74 display.show(group) 75 76 last_value = 0 # checks last tweet's ID 77 check = 0 # time.monotonic() holder 78 79 # fetching current time 80 print("Fetching text from", TIME_URL) 81 current_time = requests.get(TIME_URL) 82 print("-" * 40) 83 print(current_time.text) 84 print("-" * 40) 85 86 alarm_out.value = False # this starts the alarm so you can test turning it off w the boot button 87 88 89 while True: 90 button.update() 91 if button.fell: 92 print("Alarm off") 93 alarm_out.value = False 94 95 # every 30 seconds... 96 if (check + 30) < time.monotonic(): 97 # updates current time 98 current_time = requests.get(TIME_URL) 99 print(current_time.text) 100 # get tweets from rpilocator containing in stock at adafruit 101 the_tweet = requests.request("GET", url=tweet_query, headers=headers) 102 # gets data portion of json 103 data = the_tweet.json()['data'] 104 # tweet ID number 105 value = data[0]['id'] 106 # tweet text 107 stock_check = data[0]['text'] 108 # timestamp 109 timestamp = data[0]['created_at'] 110 # reset time count 111 check = time.monotonic() 112 # compare last tweet ID and current tweet ID 113 if last_value != value: 114 # compares date of tweet timestamp with current date 115 if timestamp.startswith(current_time.text[0 : 10]): 116 print("match") 117 # grabs the hour of the tweet timestamp 118 tweet_hour = int(timestamp[11:13]) 119 print(tweet_hour) 120 # grabs the current hour 121 current_hour = int(current_time.text[11:13]) 122 print(current_hour) 123 # if it's been less than an hour since the tweet... 124 if abs(current_hour - tweet_hour) < 1: 125 print("in the last hour") 126 # displays tweet text and time on screen 127 text_area.text = "\n".join(wrap_text_to_lines(stock_check, 21)) 128 print(stock_check) 129 clock_area.text = timestamp 130 print("Raspberry Pi in stock at Adafruit!") 131 alarm_out.value = True 132 133 else: 134 # if it's not new, then the wait continues 135 no_tweet_text = ("No stock in last hour :( Last stock: %s" % (timestamp)) 136 text_area.text = "\n".join(wrap_text_to_lines(no_tweet_text, 21)) 137 print("no new in stock notifications :(") 138 # updates tweet ID 139 last_value = value 140 # if the tweet wasn't today 141 else: 142 # if it's not new, then the wait continues 143 no_tweet_text = ("No stock in last hour :( Last stock: %s" % (timestamp)) 144 text_area.text = "\n".join(wrap_text_to_lines(no_tweet_text, 21)) 145 print("no new in stock notifications :(")