/ Pedal_For_Youtube / code.py
code.py
1 # SPDX-FileCopyrightText: 2020 Jeff Epler for Adafruit Industries 2 # 3 # SPDX-License-Identifier: MIT 4 5 #!/usr/bin/env python3 6 from enum import Enum 7 import time 8 import tkinter 9 10 import adafruit_ble 11 from adafruit_ble.advertising.standard import ProvideServicesAdvertisement 12 from adafruit_ble.services.standard.device_info import DeviceInfoService 13 from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService 14 15 import Xlib.X as X 16 import Xlib.XK as XK 17 import Xlib.display as display 18 import Xlib.ext.xtest as xtest 19 20 # Customize me! Set the minimum RPM to start the video, target RPM, and the 21 # grace time (number of seconds) you can be below the minimum RPM before it 22 # stops. 23 MINIMUM_RPM = 60 24 TARGET_RPM = 72 25 GRACE_TIME = 2 26 27 class Keystroke: 28 """Use a connection to the X Server (linux display server) to send # fake 29 keystrokes to the Chromium browser window.""" 30 def __init__(self): 31 self.display = display.Display() 32 self.root = self.display.screen().root 33 self._keycodes = {} 34 35 def _keycode(self, sym): 36 if isinstance(sym, str): 37 sym = XK.string_to_keysym(sym) 38 result = self._keycodes.get(sym, None) 39 if result is None: 40 self._keycodes[sym] = result = self.display.keysym_to_keycode(sym) 41 return result 42 43 def send_keysym(self, keysym): 44 keycode = self._keycode(keysym) 45 print("sending", keycode, keysym) 46 xtest.fake_input(self.root, X.KeyPress, keycode) 47 self.display.sync() 48 time.sleep(.01) 49 xtest.fake_input(self.root, X.KeyRelease, keycode) 50 self.display.sync() 51 52 @property 53 def current_window_class(self): 54 window = self.display.get_input_focus().focus 55 while window: 56 class_ = window.get_wm_class() 57 if class_: 58 return class_[1] 59 window = window.query_tree().parent 60 return '' 61 62 class OSD: 63 """Use Tkinter to display a simple OSD window on top of all regular windows""" 64 def __init__(self, width=12, text='', geometry='-0+48', font=('Arial', 36)): 65 self.app = tkinter.Tk() 66 self.app.wm_geometry(geometry) 67 self.app.wm_overrideredirect(1) 68 self._label = tkinter.Label(self.app, width=width, text=text, font=font) 69 self._label.pack() 70 71 @property 72 def label(self): 73 return self._label['text'] 74 75 @label.setter 76 def label(self, text): 77 self._label['text'] = text 78 self.update() 79 80 def mainloop(self): 81 self.app.mainloop() 82 83 def destroy(self): 84 self.app.destroy() 85 86 def update(self): 87 self.app.update() 88 89 @property 90 def background(self): 91 return self.label['background'] 92 93 @background.setter 94 def background(self, color): 95 self._label['background'] = color 96 97 def send_pause(): 98 """Send the key 'p', to send a video to the paused state""" 99 if keystroke.current_window_class != 'Chromium-browser': 100 return 101 print('actually send play') 102 keystroke.send_keysym('p') 103 104 def send_play(): 105 """Send the keys 'pk', to send a video into the playing state""" 106 if keystroke.current_window_class != 'Chromium-browser': 107 return 108 print('actually send play') 109 keystroke.send_keysym('p') 110 keystroke.send_keysym('k') 111 112 def delta16(v1, v2): 113 """Return the delta (difference) between two increasing 16-bit counters, 114 accounting for the wraparound from 65535 back to 0""" 115 diff = v2 - v1 116 if diff < 0: 117 diff += (1<<16) 118 return diff 119 120 # PyLint can't find BLERadio for some reason so special case it here. 121 ble = adafruit_ble.BLERadio() # pylint: disable=no-member 122 keystroke = Keystroke() 123 osd = OSD() 124 class VideoState(Enum): 125 PAUSED = 0 126 PLAYING = 1 127 128 while True: 129 state = VideoState.PAUSED 130 131 osd.label = "Scanning" 132 osd.background = '#ffffff' 133 print("Scanning...") 134 # Save advertisements, indexed by address 135 advs = {} 136 for adv in ble.start_scan(ProvideServicesAdvertisement, timeout=5): 137 if CyclingSpeedAndCadenceService in adv.services: 138 print("found a CyclingSpeedAndCadenceService advertisement") 139 # Save advertisement. Overwrite duplicates from same address (device). 140 advs[adv.address] = adv 141 142 ble.stop_scan() 143 print("Stopped scanning") 144 if not advs: 145 # Nothing found. Go back and keep looking. 146 continue 147 148 osd.label = "Connecting" 149 # Connect to all available CSC sensors. 150 cyc_connections = [] 151 for adv in advs.values(): 152 cyc_connections.append(ble.connect(adv)) 153 print("Connected", len(cyc_connections)) 154 155 # Print out info about each sensors. 156 for conn in cyc_connections: 157 if conn.connected: 158 if DeviceInfoService in conn: 159 dis = conn[DeviceInfoService] 160 try: 161 manufacturer = dis.manufacturer 162 except AttributeError: 163 manufacturer = "(Manufacturer Not specified)" 164 print("Device:", manufacturer) 165 else: 166 print("No device information") 167 168 osd.label = "Polling" 169 print("Waiting for data... (could be 10-20 seconds or more)") 170 # Get CSC Service from each sensor. 171 cyc_services = [] 172 for conn in cyc_connections: 173 cyc_services.append(conn[CyclingSpeedAndCadenceService]) 174 # Read data from each sensor once a second. 175 # Stop if we lose connection to all sensors. 176 177 last_crank_time_ms = 0 178 last_crank_revs = 0 179 grace_period_end = 0 180 est_rpm = 0 181 182 while True: 183 still_connected = False 184 crank_revs = None 185 crank_time_ms = None 186 for conn, svc in zip(cyc_connections, cyc_services): 187 if conn.connected: 188 still_connected = True 189 values = svc.measurement_values 190 if values is not None: 191 if values.cumulative_crank_revolutions: 192 crank_revs = values.cumulative_crank_revolutions 193 crank_time_ms = values.last_crank_event_time 194 if not still_connected: 195 break 196 197 if crank_revs is None: 198 continue 199 200 if crank_time_ms == last_crank_time_ms: 201 est_rpm = 0 202 else: 203 # If we were stopped prior to this, jump to MINIMUM_RPM 204 # it gives a faster restart after paused 205 if est_rpm == 0 and state == VideoState.PAUSED: 206 est_rpm = MINIMUM_RPM 207 else: 208 delta_revs = delta16(last_crank_revs, crank_revs) 209 delta_t = delta16(last_crank_time_ms, crank_time_ms) / 1000 210 est_rpm = 60 * delta_revs / delta_t 211 if est_rpm >= MINIMUM_RPM: 212 grace_period_end = time.monotonic() + GRACE_TIME 213 if state == VideoState.PAUSED: 214 send_play() 215 state = VideoState.PLAYING 216 elif time.monotonic() > grace_period_end: 217 if state == VideoState.PLAYING: 218 send_pause() 219 state = VideoState.PAUSED 220 221 last_crank_revs = crank_revs 222 last_crank_time_ms = crank_time_ms 223 print(f"Crank: {crank_revs}") 224 print(f"Crank RPM: {est_rpm:.1f}") 225 226 if est_rpm < MINIMUM_RPM: 227 osd.background = '#ff0000' 228 elif est_rpm < TARGET_RPM: 229 osd.background = '#ffff00' 230 else: 231 osd.background = '#00ff00' 232 osd.label = f"RPM: {est_rpm:.1f}" 233 time.sleep(0.1)