pyloton.py
  1  # SPDX-FileCopyrightText: 2020 Eva Herrada for Adafruit Industries
  2  #
  3  # SPDX-License-Identifier: MIT
  4  
  5  """
  6  A library for completing the Pyloton bike computer learn guide utilizing the Adafruit CLUE.
  7  """
  8  
  9  import time
 10  import adafruit_ble
 11  from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
 12  from adafruit_ble.advertising.standard import SolicitServicesAdvertisement
 13  import board
 14  import digitalio
 15  import displayio
 16  import adafruit_imageload
 17  from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService
 18  from adafruit_ble_heart_rate import HeartRateService
 19  from adafruit_bitmap_font import bitmap_font
 20  from adafruit_display_shapes.rect import Rect
 21  from adafruit_display_text import label
 22  from adafruit_ble_apple_media import AppleMediaService
 23  from adafruit_ble_apple_media import UnsupportedCommand
 24  import gamepad
 25  import touchio
 26  
 27  class Clue:
 28      """
 29      A very minimal version of the CLUE library.
 30      The library requires the use of many sensor-specific
 31      libraries this project doesn't use, and they were
 32      taking up a lot of RAM.
 33      """
 34      def __init__(self):
 35          self._i2c = board.I2C()
 36          self._touches = [board.D0, board.D1, board.D2]
 37          self._touch_threshold_adjustment = 0
 38          self._a = digitalio.DigitalInOut(board.BUTTON_A)
 39          self._a.switch_to_input(pull=digitalio.Pull.UP)
 40          self._b = digitalio.DigitalInOut(board.BUTTON_B)
 41          self._b.switch_to_input(pull=digitalio.Pull.UP)
 42          self._gamepad = gamepad.GamePad(self._a, self._b)
 43  
 44      @property
 45      def were_pressed(self):
 46          """
 47          Returns a set of buttons that have been pressed since the last time were_pressed was run.
 48          """
 49          ret = set()
 50          pressed = self._gamepad.get_pressed()
 51          for button, mask in (('A', 0x01), ('B', 0x02)):
 52              if mask & pressed:
 53                  ret.add(button)
 54          return ret
 55  
 56      @property
 57      def button_a(self):
 58          """``True`` when Button A is pressed. ``False`` if not."""
 59          return not self._a.value
 60  
 61      @property
 62      def button_b(self):
 63          """``True`` when Button B is pressed. ``False`` if not."""
 64          return not self._b.value
 65  
 66      def _touch(self, i):
 67          if not isinstance(self._touches[i], touchio.TouchIn):
 68              self._touches[i] = touchio.TouchIn(self._touches[i])
 69              self._touches[i].threshold += self._touch_threshold_adjustment
 70          return self._touches[i].value
 71  
 72      @property
 73      def touch_0(self):
 74          """
 75          Returns True when capacitive touchpad 0 is currently being pressed.
 76          """
 77          return self._touch(0)
 78  
 79      @property
 80      def touch_1(self):
 81          """
 82          Returns True when capacitive touchpad 1 is currently being pressed.
 83          """
 84          return self._touch(1)
 85  
 86      @property
 87      def touch_2(self):
 88          """
 89          Returns True when capacitive touchpad 2 is currently being pressed.
 90          """
 91          return self._touch(2)
 92  
 93  
 94  class Pyloton:
 95      """
 96      Contains the various functions necessary for doing the Pyloton learn guide.
 97      """
 98      #pylint: disable=too-many-instance-attributes
 99  
100      YELLOW = 0xFCFF00
101      PURPLE = 0x64337E
102      WHITE = 0xFFFFFF
103  
104      clue = Clue()
105  
106      def __init__(self, ble, display, circ, heart=True, speed=True, cad=True, ams=True, debug=False): #pylint: disable=too-many-arguments
107          self.debug = debug
108          self.ble = ble
109          self.display = display
110          self.circumference = circ
111  
112          self.heart_enabled = heart
113          self.speed_enabled = speed
114          self.cadence_enabled = cad
115          self.ams_enabled = ams
116          self.hr_connection = None
117  
118          self.num_enabled = heart + speed + cad + ams
119  
120          self._previous_wheel = 0
121          self._previous_crank = 0
122          self._previous_revolutions = 0
123          self._previous_rev = 0
124          self._previous_speed = 0
125          self._previous_cadence = 0
126          self._previous_heart = 0
127          self._speed_failed = 0
128          self._cadence_failed = 0
129          self._setup = 0
130          self._hr_label = None
131          self._sp_label = None
132          self._cadence_label = None
133          self._ams_label = None
134          self._hr_service = None
135          self._heart_y = None
136          self._speed_y = None
137          self._cadence_y = None
138          self._ams_y = None
139          self.ams = None
140          self.cyc_connections = None
141          self.cyc_services = None
142          self.track_artist = True
143  
144          self.start = time.time()
145  
146          self.splash = displayio.Group()
147          self.loading_group = displayio.Group()
148  
149          self._load_fonts()
150  
151          self.sprite_sheet, self.palette = adafruit_imageload.load("/sprite_sheet.bmp",
152                                                                    bitmap=displayio.Bitmap,
153                                                                    palette=displayio.Palette)
154  
155          self.text_group = displayio.Group()
156          self.status = label.Label(font=self.arial12, x=10, y=200,
157                                    text='', color=self.YELLOW)
158          self.status1 = label.Label(font=self.arial12, x=10, y=220,
159                                     text='', color=self.YELLOW)
160  
161          self.text_group.append(self.status)
162          self.text_group.append(self.status1)
163  
164  
165      def show_splash(self):
166          """
167          Shows the loading screen
168          """
169          if self.debug:
170              return
171  
172          blinka_bitmap = "blinka-pyloton.bmp"
173  
174          # Compatible with CircuitPython 6 & 7
175          with open(blinka_bitmap, 'rb') as bitmap_file:
176              bitmap1 = displayio.OnDiskBitmap(bitmap_file)
177              tile_grid = displayio.TileGrid(bitmap1, pixel_shader=getattr(bitmap1, 'pixel_shader', displayio.ColorConverter()))
178              self.loading_group.append(tile_grid)
179              self.display.show(self.loading_group)
180              status_heading = label.Label(font=self.arial16, x=80, y=175,
181                                           text="Status", color=self.YELLOW)
182              rect = Rect(0, 165, 240, 75, fill=self.PURPLE)
183              self.loading_group.append(rect)
184              self.loading_group.append(status_heading)
185  
186          # # Compatible with CircuitPython 7+
187          # bitmap1 = displayio.OnDiskBitmap(blinka_bitmap)
188          # tile_grid = displayio.TileGrid(bitmap1, pixel_shader=bitmap1.pixel_shader)
189          # self.loading_group.append(tile_grid)
190          # self.display.show(self.loading_group)
191          # status_heading = label.Label(font=self.arial16, x=80, y=175,
192          #                              text="Status", color=self.YELLOW)
193          # rect = Rect(0, 165, 240, 75, fill=self.PURPLE)
194          # self.loading_group.append(rect)
195          # self.loading_group.append(status_heading)
196  
197      def _load_fonts(self):
198          """
199          Loads fonts
200          """
201          self.arial12 = bitmap_font.load_font("/fonts/Arial-12.bdf")
202          self.arial16 = bitmap_font.load_font("/fonts/Arial-16.bdf")
203          self.arial24 = bitmap_font.load_font("/fonts/Arial-Bold-24.bdf")
204  
205          glyphs = b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-!,. "\'?!'
206          self.arial12.load_glyphs(glyphs)
207          self.arial16.load_glyphs(glyphs)
208          self.arial24.load_glyphs(glyphs)
209  
210  
211      def _status_update(self, message):
212          """
213          Displays status updates
214          """
215          if self.debug:
216              print(message)
217              return
218          if self.text_group not in self.loading_group:
219              self.loading_group.append(self.text_group)
220          self.status.text = message[:25]
221          self.status1.text = message[25:50]
222  
223  
224      def timeout(self):
225          """
226          Displays Timeout on screen when pyloton has been searching for a sensor for too long
227          """
228          self._status_update("Pyloton: Timeout")
229          time.sleep(3)
230  
231  
232      def heart_connect(self):
233          """
234          Connects to heart rate sensor
235          """
236          self._status_update("Heart Rate: Scanning...")
237          for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5):
238              if HeartRateService in adv.services:
239                  self._status_update("Heart Rate: Found an advertisement")
240                  self.hr_connection = self.ble.connect(adv)
241                  self._status_update("Heart Rate: Connected")
242                  break
243          self.ble.stop_scan()
244          if self.hr_connection:
245              self._hr_service = self.hr_connection[HeartRateService]
246          return self.hr_connection
247  
248  
249      @staticmethod
250      def _has_timed_out(start, timeout):
251          if time.time() - start >= timeout:
252              return True
253          return False
254  
255      def ams_connect(self, start=time.time(), timeout=30):
256          """
257          Connect to an Apple device using the ble_apple_media library
258          """
259          self._status_update("AppleMediaService: Connect your phone now")
260          radio = adafruit_ble.BLERadio()
261          a = SolicitServicesAdvertisement()
262          a.solicited_services.append(AppleMediaService)
263          radio.start_advertising(a)
264  
265          while not radio.connected and not self._has_timed_out(start, timeout):
266              pass
267  
268          self._status_update("AppleMediaService: Connected")
269          for connection in radio.connections:
270              if not connection.paired:
271                  connection.pair()
272                  self._status_update("AppleMediaService: Paired")
273              self.ams = connection[AppleMediaService]
274  
275          return radio
276  
277  
278      def speed_cadence_connect(self):
279          """
280          Connects to speed and cadence sensor
281          """
282          self._status_update("Speed and Cadence: Scanning...")
283          # Save advertisements, indexed by address
284          advs = {}
285          for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5):
286              if CyclingSpeedAndCadenceService in adv.services:
287                  self._status_update("Speed and Cadence: Found an advertisement")
288                  # Save advertisement. Overwrite duplicates from same address (device).
289                  advs[adv.address] = adv
290  
291          self.ble.stop_scan()
292          self._status_update("Speed and Cadence: Stopped scanning")
293          if not advs:
294              # Nothing found. Go back and keep looking.
295              return []
296  
297          # Connect to all available CSC sensors.
298          self.cyc_connections = []
299          for adv in advs.values():
300              self.cyc_connections.append(self.ble.connect(adv))
301              self._status_update("Speed and Cadence: Connected {}".format(len(self.cyc_connections)))
302  
303          self.cyc_services = []
304          for conn in self.cyc_connections:
305              self.cyc_services.append(conn[CyclingSpeedAndCadenceService])
306          self._status_update("Pyloton: Finishing up...")
307  
308          return self.cyc_connections
309  
310  
311      def _compute_speed(self, values, speed):
312          wheel_diff = values.last_wheel_event_time - self._previous_wheel
313          rev_diff = values.cumulative_wheel_revolutions - self._previous_revolutions
314          if wheel_diff:
315              # Rotations per minute is 60 times the amount of revolutions since
316              # the last update over the time since the last update
317              rpm = 60*(rev_diff/(wheel_diff/1024))
318              # We then mutiply it by the wheel's circumference and convert it to mph
319              speed = round((rpm * self.circumference) * (60/63360), 1)
320              if speed < 0:
321                  speed = self._previous_speed
322              self._previous_speed = speed
323              self._previous_revolutions = values.cumulative_wheel_revolutions
324              self._speed_failed = 0
325          else:
326              self._speed_failed += 1
327              if self._speed_failed >= 3:
328                  speed = 0
329          self._previous_wheel = values.last_wheel_event_time
330  
331          return speed
332  
333  
334      def _compute_cadence(self, values, cadence):
335          crank_diff = values.last_crank_event_time - self._previous_crank
336          crank_rev_diff = values.cumulative_crank_revolutions-self._previous_rev
337  
338          if crank_rev_diff:
339              # Rotations per minute is 60 times the amount of revolutions since the
340              # last update over the time since the last update
341              cadence = round(60*(crank_rev_diff/(crank_diff/1024)), 1)
342              if cadence < 0:
343                  cadence = self._previous_cadence
344              self._previous_cadence = cadence
345              self._previous_rev = values.cumulative_crank_revolutions
346              self._cadence_failed = 0
347          else:
348              self._cadence_failed += 1
349              if self._cadence_failed >= 3:
350                  cadence = 0
351          self._previous_crank = values.last_crank_event_time
352  
353          return cadence
354  
355  
356      def read_s_and_c(self):
357          """
358          Reads data from the speed and cadence sensor
359          """
360          speed = self._previous_speed
361          cadence = self._previous_cadence
362          for conn, svc in zip(self.cyc_connections, self.cyc_services):
363              if not conn.connected:
364                  speed = cadence = 0
365                  continue
366              values = svc.measurement_values
367              if not values:
368                  if self._cadence_failed >= 3 or self._speed_failed >= 3:
369                      if self._cadence_failed > 3:
370                          cadence = 0
371                      if self._speed_failed > 3:
372                          speed = 0
373                  continue
374              if not values.last_wheel_event_time:
375                  continue
376              speed = self._compute_speed(values, speed)
377              if not values.last_crank_event_time:
378                  continue
379              cadence = self._compute_cadence(values, cadence)
380  
381          if speed:
382              speed = str(speed)[:8]
383          if cadence:
384              cadence = str(cadence)[:8]
385  
386          return speed, cadence
387  
388  
389      def read_heart(self):
390          """
391          Reads date from the heart rate sensor
392          """
393          measurement = self._hr_service.measurement_values
394          if measurement is None:
395              heart = self._previous_heart
396          else:
397              heart = measurement.heart_rate
398              self._previous_heart = measurement.heart_rate
399  
400          if heart:
401              heart = str(heart)[:4]
402  
403          return heart
404  
405  
406      def read_ams(self):
407          """
408          Reads data from AppleMediaServices
409          """
410          current = time.time()
411          try:
412              if current - self.start > 3:
413                  self.track_artist = not self.track_artist
414                  self.start = time.time()
415              if self.track_artist:
416                  data = self.ams.artist
417              if not self.track_artist:
418                  data = self.ams.title
419          except (RuntimeError, UnicodeError):
420              data = None
421  
422          if data:
423              data = data[:16] + (data[16:] and '..')
424  
425          return data
426  
427  
428      def icon_maker(self, n, icon_x, icon_y):
429          """
430          Generates icons as sprites
431          """
432          sprite = displayio.TileGrid(self.sprite_sheet, pixel_shader=self.palette, width=1,
433                                      height=1, tile_width=40, tile_height=40, default_tile=n,
434                                      x=icon_x, y=icon_y)
435          return sprite
436  
437  
438      def _label_maker(self, text, x, y, font=None):
439          """
440          Generates labels
441          """
442          if not font:
443              font = self.arial24
444          return label.Label(font=font, x=x, y=y, text=text, color=self.WHITE)
445  
446  
447      def _get_y(self):
448          """
449          Helper function for setup_display. Gets the y values used for sprites and labels.
450          """
451          enabled = self.num_enabled
452  
453          if self.heart_enabled:
454              self._heart_y = 45*(self.num_enabled - enabled) + 75
455              enabled -= 1
456          if self.speed_enabled:
457              self._speed_y = 45*(self.num_enabled - enabled) + 75
458              enabled -= 1
459          if self.cadence_enabled:
460              self._cadence_y = 45*(self.num_enabled - enabled) + 75
461              enabled -= 1
462          if self.ams_enabled:
463              self._ams_y = 45*(self.num_enabled - enabled) + 75
464              enabled -= 1
465  
466  
467      def setup_display(self):
468          """
469          Prepares the display to show sensor values: Adds a header, a heading, and various sprites.
470          """
471          self._get_y()
472          sprites = displayio.Group()
473  
474          rect = Rect(0, 0, 240, 50, fill=self.PURPLE)
475          self.splash.append(rect)
476  
477          heading = label.Label(font=self.arial24, x=55, y=25, text="Pyloton", color=self.YELLOW)
478          self.splash.append(heading)
479  
480          if self.heart_enabled:
481              heart_sprite = self.icon_maker(0, 2, self._heart_y - 20)
482              sprites.append(heart_sprite)
483  
484          if self.speed_enabled:
485              speed_sprite = self.icon_maker(1, 2, self._speed_y - 20)
486              sprites.append(speed_sprite)
487  
488          if self.cadence_enabled:
489              cadence_sprite = self.icon_maker(2, 2, self._cadence_y - 20)
490              sprites.append(cadence_sprite)
491  
492          if self.ams_enabled:
493              ams_sprite = self.icon_maker(3, 2, self._ams_y - 20)
494              sprites.append(ams_sprite)
495  
496          self.splash.append(sprites)
497  
498          self.display.show(self.splash)
499          while self.loading_group:
500              self.loading_group.pop()
501  
502  
503      def update_display(self): #pylint: disable=too-many-branches
504          """
505          Updates the display to display the most recent values
506          """
507          if self.speed_enabled or self.cadence_enabled:
508              speed, cadence = self.read_s_and_c()
509  
510          if self.heart_enabled:
511              heart = self.read_heart()
512              if not self._setup:
513                  self._hr_label = self._label_maker('{} bpm'.format(heart), 50, self._heart_y) # 75
514                  self.splash.append(self._hr_label)
515              else:
516                  self._hr_label.text = '{} bpm'.format(heart)
517  
518          if self.speed_enabled:
519              if not self._setup:
520                  self._sp_label = self._label_maker('{} mph'.format(speed), 50, self._speed_y) # 120
521                  self.splash.append(self._sp_label)
522              else:
523                  self._sp_label.text = '{} mph'.format(speed)
524  
525          if self.cadence_enabled:
526              if not self._setup:
527                  self._cadence_label = self._label_maker('{} rpm'.format(cadence), 50,
528                                                          self._cadence_y)
529                  self.splash.append(self._cadence_label)
530              else:
531                  self._cadence_label.text = '{} rpm'.format(cadence)
532  
533          if self.ams_enabled:
534              ams = self.read_ams()
535              if not self._setup:
536                  self._ams_label = self._label_maker('{}'.format(ams), 50, self._ams_y,
537                                                      font=self.arial16)
538                  self.splash.append(self._ams_label)
539              else:
540                  self._ams_label.text = '{}'.format(ams)
541  
542          self._setup = True
543  
544  
545      def ams_remote(self):
546          """
547          Allows the 2 buttons and 3 capacitive touch pads in the CLUE to function as a media remote.
548          """
549          try:
550              # Capacitive touch pad marked 0 goes to the previous track
551              if self.clue.touch_0:
552                  self.ams.previous_track()
553                  time.sleep(0.25)
554  
555              # Capacitive touch pad marked 1 toggles pause/play
556              if self.clue.touch_1:
557                  self.ams.toggle_play_pause()
558                  time.sleep(0.25)
559  
560              # Capacitive touch pad marked 2 advances to the next track
561              if self.clue.touch_2:
562                  self.ams.next_track()
563                  time.sleep(0.25)
564  
565              # If button B (on the right) is pressed, it increases the volume
566              if self.clue.button_b:
567                  self.ams.volume_up()
568                  time.sleep(0.1)
569  
570              # If button A (on the left) is pressed, the volume decreases
571              if self.clue.button_a:
572                  self.ams.volume_down()
573                  time.sleep(0.1)
574          except (RuntimeError, UnsupportedCommand, AttributeError):
575              return