code.py
  1  # SPDX-FileCopyrightText: 2020 Jeff Epler for Adafruit Industries
  2  #
  3  # SPDX-License-Identifier: MIT
  4  
  5  #!/usr/bin/env python3
  6  from enum import Enum
  7  import time
  8  import tkinter
  9  
 10  import adafruit_ble
 11  from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
 12  from adafruit_ble.services.standard.device_info import DeviceInfoService
 13  from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService
 14  
 15  import Xlib.X as X
 16  import Xlib.XK as XK
 17  import Xlib.display as display
 18  import Xlib.ext.xtest as xtest
 19  
 20  # Customize me!  Set the minimum RPM to start the video, target RPM, and the
 21  # grace time (number of seconds) you can be below the minimum RPM before it
 22  # stops.
 23  MINIMUM_RPM = 60
 24  TARGET_RPM = 72
 25  GRACE_TIME = 2
 26  
 27  class Keystroke:
 28      """Use a connection to the X Server (linux display server) to send # fake
 29      keystrokes to the Chromium browser window."""
 30      def __init__(self):
 31          self.display = display.Display()
 32          self.root = self.display.screen().root
 33          self._keycodes = {}
 34  
 35      def _keycode(self, sym):
 36          if isinstance(sym, str):
 37              sym = XK.string_to_keysym(sym)
 38          result = self._keycodes.get(sym, None)
 39          if result is None:
 40              self._keycodes[sym] = result = self.display.keysym_to_keycode(sym)
 41          return result
 42  
 43      def send_keysym(self, keysym):
 44          keycode = self._keycode(keysym)
 45          print("sending", keycode, keysym)
 46          xtest.fake_input(self.root, X.KeyPress, keycode)
 47          self.display.sync()
 48          time.sleep(.01)
 49          xtest.fake_input(self.root, X.KeyRelease, keycode)
 50          self.display.sync()
 51  
 52      @property
 53      def current_window_class(self):
 54          window = self.display.get_input_focus().focus
 55          while window:
 56              class_ = window.get_wm_class()
 57              if class_:
 58                  return class_[1]
 59              window = window.query_tree().parent
 60          return ''
 61  
 62  class OSD:
 63      """Use Tkinter to display a simple OSD window on top of all regular windows"""
 64      def __init__(self, width=12, text='', geometry='-0+48', font=('Arial', 36)):
 65          self.app = tkinter.Tk()
 66          self.app.wm_geometry(geometry)
 67          self.app.wm_overrideredirect(1)
 68          self._label = tkinter.Label(self.app, width=width, text=text, font=font)
 69          self._label.pack()
 70  
 71      @property
 72      def label(self):
 73          return self._label['text']
 74  
 75      @label.setter
 76      def label(self, text):
 77          self._label['text'] = text
 78          self.update()
 79  
 80      def mainloop(self):
 81          self.app.mainloop()
 82  
 83      def destroy(self):
 84          self.app.destroy()
 85  
 86      def update(self):
 87          self.app.update()
 88  
 89      @property
 90      def background(self):
 91          return self.label['background']
 92  
 93      @background.setter
 94      def background(self, color):
 95          self._label['background'] = color
 96  
 97  def send_pause():
 98      """Send the key 'p', to send a video to the paused state"""
 99      if keystroke.current_window_class != 'Chromium-browser':
100          return
101      print('actually send play')
102      keystroke.send_keysym('p')
103  
104  def send_play():
105      """Send the keys 'pk', to send a video into the playing state"""
106      if keystroke.current_window_class != 'Chromium-browser':
107          return
108      print('actually send play')
109      keystroke.send_keysym('p')
110      keystroke.send_keysym('k')
111  
112  def delta16(v1, v2):
113      """Return the delta (difference) between two increasing 16-bit counters,
114      accounting for the wraparound from 65535 back to 0"""
115      diff = v2 - v1
116      if diff < 0:
117          diff += (1<<16)
118      return diff
119  
120  # PyLint can't find BLERadio for some reason so special case it here.
121  ble = adafruit_ble.BLERadio()  # pylint: disable=no-member
122  keystroke = Keystroke()
123  osd = OSD()
124  class VideoState(Enum):
125      PAUSED = 0
126      PLAYING = 1
127  
128  while True:
129      state = VideoState.PAUSED
130  
131      osd.label = "Scanning"
132      osd.background = '#ffffff'
133      print("Scanning...")
134      # Save advertisements, indexed by address
135      advs = {}
136      for adv in ble.start_scan(ProvideServicesAdvertisement, timeout=5):
137          if CyclingSpeedAndCadenceService in adv.services:
138              print("found a CyclingSpeedAndCadenceService advertisement")
139              # Save advertisement. Overwrite duplicates from same address (device).
140              advs[adv.address] = adv
141  
142      ble.stop_scan()
143      print("Stopped scanning")
144      if not advs:
145          # Nothing found. Go back and keep looking.
146          continue
147  
148      osd.label = "Connecting"
149      # Connect to all available CSC sensors.
150      cyc_connections = []
151      for adv in advs.values():
152          cyc_connections.append(ble.connect(adv))
153          print("Connected", len(cyc_connections))
154  
155      # Print out info about each sensors.
156      for conn in cyc_connections:
157          if conn.connected:
158              if DeviceInfoService in conn:
159                  dis = conn[DeviceInfoService]
160                  try:
161                      manufacturer = dis.manufacturer
162                  except AttributeError:
163                      manufacturer = "(Manufacturer Not specified)"
164                  print("Device:", manufacturer)
165              else:
166                  print("No device information")
167  
168      osd.label = "Polling"
169      print("Waiting for data... (could be 10-20 seconds or more)")
170      # Get CSC Service from each sensor.
171      cyc_services = []
172      for conn in cyc_connections:
173          cyc_services.append(conn[CyclingSpeedAndCadenceService])
174      # Read data from each sensor once a second.
175      # Stop if we lose connection to all sensors.
176  
177      last_crank_time_ms = 0
178      last_crank_revs = 0
179      grace_period_end = 0
180      est_rpm = 0
181  
182      while True:
183          still_connected = False
184          crank_revs = None
185          crank_time_ms = None
186          for conn, svc in zip(cyc_connections, cyc_services):
187              if conn.connected:
188                  still_connected = True
189                  values = svc.measurement_values
190                  if values is not None:
191                      if values.cumulative_crank_revolutions:
192                          crank_revs = values.cumulative_crank_revolutions
193                          crank_time_ms = values.last_crank_event_time
194          if not still_connected:
195              break
196  
197          if crank_revs is None:
198              continue
199  
200          if crank_time_ms == last_crank_time_ms:
201              est_rpm = 0
202          else:
203              # If we were stopped prior to this, jump to MINIMUM_RPM
204              # it gives a faster restart after paused
205              if est_rpm == 0 and state == VideoState.PAUSED:
206                  est_rpm = MINIMUM_RPM
207              else:
208                  delta_revs = delta16(last_crank_revs, crank_revs)
209                  delta_t = delta16(last_crank_time_ms, crank_time_ms) / 1000
210                  est_rpm = 60 * delta_revs / delta_t
211          if est_rpm >= MINIMUM_RPM:
212              grace_period_end = time.monotonic() + GRACE_TIME
213              if state == VideoState.PAUSED:
214                  send_play()
215                  state = VideoState.PLAYING
216          elif time.monotonic() > grace_period_end:
217              if state == VideoState.PLAYING:
218                  send_pause()
219              state = VideoState.PAUSED
220  
221          last_crank_revs = crank_revs
222          last_crank_time_ms = crank_time_ms
223          print(f"Crank: {crank_revs}")
224          print(f"Crank RPM: {est_rpm:.1f}")
225  
226          if est_rpm < MINIMUM_RPM:
227              osd.background = '#ff0000'
228          elif est_rpm < TARGET_RPM:
229              osd.background = '#ffff00'
230          else:
231              osd.background = '#00ff00'
232          osd.label = f"RPM: {est_rpm:.1f}"
233          time.sleep(0.1)