/ CircuitPython_Pyloton / pyloton.py
pyloton.py
1 # SPDX-FileCopyrightText: 2020 Eva Herrada for Adafruit Industries 2 # 3 # SPDX-License-Identifier: MIT 4 5 """ 6 A library for completing the Pyloton bike computer learn guide utilizing the Adafruit CLUE. 7 """ 8 9 import time 10 import adafruit_ble 11 from adafruit_ble.advertising.standard import ProvideServicesAdvertisement 12 from adafruit_ble.advertising.standard import SolicitServicesAdvertisement 13 import board 14 import digitalio 15 import displayio 16 import adafruit_imageload 17 from adafruit_ble_cycling_speed_and_cadence import CyclingSpeedAndCadenceService 18 from adafruit_ble_heart_rate import HeartRateService 19 from adafruit_bitmap_font import bitmap_font 20 from adafruit_display_shapes.rect import Rect 21 from adafruit_display_text import label 22 from adafruit_ble_apple_media import AppleMediaService 23 from adafruit_ble_apple_media import UnsupportedCommand 24 import gamepad 25 import touchio 26 27 class Clue: 28 """ 29 A very minimal version of the CLUE library. 30 The library requires the use of many sensor-specific 31 libraries this project doesn't use, and they were 32 taking up a lot of RAM. 33 """ 34 def __init__(self): 35 self._i2c = board.I2C() 36 self._touches = [board.D0, board.D1, board.D2] 37 self._touch_threshold_adjustment = 0 38 self._a = digitalio.DigitalInOut(board.BUTTON_A) 39 self._a.switch_to_input(pull=digitalio.Pull.UP) 40 self._b = digitalio.DigitalInOut(board.BUTTON_B) 41 self._b.switch_to_input(pull=digitalio.Pull.UP) 42 self._gamepad = gamepad.GamePad(self._a, self._b) 43 44 @property 45 def were_pressed(self): 46 """ 47 Returns a set of buttons that have been pressed since the last time were_pressed was run. 48 """ 49 ret = set() 50 pressed = self._gamepad.get_pressed() 51 for button, mask in (('A', 0x01), ('B', 0x02)): 52 if mask & pressed: 53 ret.add(button) 54 return ret 55 56 @property 57 def button_a(self): 58 """``True`` when Button A is pressed. ``False`` if not.""" 59 return not self._a.value 60 61 @property 62 def button_b(self): 63 """``True`` when Button B is pressed. ``False`` if not.""" 64 return not self._b.value 65 66 def _touch(self, i): 67 if not isinstance(self._touches[i], touchio.TouchIn): 68 self._touches[i] = touchio.TouchIn(self._touches[i]) 69 self._touches[i].threshold += self._touch_threshold_adjustment 70 return self._touches[i].value 71 72 @property 73 def touch_0(self): 74 """ 75 Returns True when capacitive touchpad 0 is currently being pressed. 76 """ 77 return self._touch(0) 78 79 @property 80 def touch_1(self): 81 """ 82 Returns True when capacitive touchpad 1 is currently being pressed. 83 """ 84 return self._touch(1) 85 86 @property 87 def touch_2(self): 88 """ 89 Returns True when capacitive touchpad 2 is currently being pressed. 90 """ 91 return self._touch(2) 92 93 94 class Pyloton: 95 """ 96 Contains the various functions necessary for doing the Pyloton learn guide. 97 """ 98 #pylint: disable=too-many-instance-attributes 99 100 YELLOW = 0xFCFF00 101 PURPLE = 0x64337E 102 WHITE = 0xFFFFFF 103 104 clue = Clue() 105 106 def __init__(self, ble, display, circ, heart=True, speed=True, cad=True, ams=True, debug=False): #pylint: disable=too-many-arguments 107 self.debug = debug 108 self.ble = ble 109 self.display = display 110 self.circumference = circ 111 112 self.heart_enabled = heart 113 self.speed_enabled = speed 114 self.cadence_enabled = cad 115 self.ams_enabled = ams 116 self.hr_connection = None 117 118 self.num_enabled = heart + speed + cad + ams 119 120 self._previous_wheel = 0 121 self._previous_crank = 0 122 self._previous_revolutions = 0 123 self._previous_rev = 0 124 self._previous_speed = 0 125 self._previous_cadence = 0 126 self._previous_heart = 0 127 self._speed_failed = 0 128 self._cadence_failed = 0 129 self._setup = 0 130 self._hr_label = None 131 self._sp_label = None 132 self._cadence_label = None 133 self._ams_label = None 134 self._hr_service = None 135 self._heart_y = None 136 self._speed_y = None 137 self._cadence_y = None 138 self._ams_y = None 139 self.ams = None 140 self.cyc_connections = None 141 self.cyc_services = None 142 self.track_artist = True 143 144 self.start = time.time() 145 146 self.splash = displayio.Group() 147 self.loading_group = displayio.Group() 148 149 self._load_fonts() 150 151 self.sprite_sheet, self.palette = adafruit_imageload.load("/sprite_sheet.bmp", 152 bitmap=displayio.Bitmap, 153 palette=displayio.Palette) 154 155 self.text_group = displayio.Group() 156 self.status = label.Label(font=self.arial12, x=10, y=200, 157 text='', color=self.YELLOW) 158 self.status1 = label.Label(font=self.arial12, x=10, y=220, 159 text='', color=self.YELLOW) 160 161 self.text_group.append(self.status) 162 self.text_group.append(self.status1) 163 164 165 def show_splash(self): 166 """ 167 Shows the loading screen 168 """ 169 if self.debug: 170 return 171 172 blinka_bitmap = "blinka-pyloton.bmp" 173 174 # Compatible with CircuitPython 6 & 7 175 with open(blinka_bitmap, 'rb') as bitmap_file: 176 bitmap1 = displayio.OnDiskBitmap(bitmap_file) 177 tile_grid = displayio.TileGrid(bitmap1, pixel_shader=getattr(bitmap1, 'pixel_shader', displayio.ColorConverter())) 178 self.loading_group.append(tile_grid) 179 self.display.show(self.loading_group) 180 status_heading = label.Label(font=self.arial16, x=80, y=175, 181 text="Status", color=self.YELLOW) 182 rect = Rect(0, 165, 240, 75, fill=self.PURPLE) 183 self.loading_group.append(rect) 184 self.loading_group.append(status_heading) 185 186 # # Compatible with CircuitPython 7+ 187 # bitmap1 = displayio.OnDiskBitmap(blinka_bitmap) 188 # tile_grid = displayio.TileGrid(bitmap1, pixel_shader=bitmap1.pixel_shader) 189 # self.loading_group.append(tile_grid) 190 # self.display.show(self.loading_group) 191 # status_heading = label.Label(font=self.arial16, x=80, y=175, 192 # text="Status", color=self.YELLOW) 193 # rect = Rect(0, 165, 240, 75, fill=self.PURPLE) 194 # self.loading_group.append(rect) 195 # self.loading_group.append(status_heading) 196 197 def _load_fonts(self): 198 """ 199 Loads fonts 200 """ 201 self.arial12 = bitmap_font.load_font("/fonts/Arial-12.bdf") 202 self.arial16 = bitmap_font.load_font("/fonts/Arial-16.bdf") 203 self.arial24 = bitmap_font.load_font("/fonts/Arial-Bold-24.bdf") 204 205 glyphs = b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-!,. "\'?!' 206 self.arial12.load_glyphs(glyphs) 207 self.arial16.load_glyphs(glyphs) 208 self.arial24.load_glyphs(glyphs) 209 210 211 def _status_update(self, message): 212 """ 213 Displays status updates 214 """ 215 if self.debug: 216 print(message) 217 return 218 if self.text_group not in self.loading_group: 219 self.loading_group.append(self.text_group) 220 self.status.text = message[:25] 221 self.status1.text = message[25:50] 222 223 224 def timeout(self): 225 """ 226 Displays Timeout on screen when pyloton has been searching for a sensor for too long 227 """ 228 self._status_update("Pyloton: Timeout") 229 time.sleep(3) 230 231 232 def heart_connect(self): 233 """ 234 Connects to heart rate sensor 235 """ 236 self._status_update("Heart Rate: Scanning...") 237 for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5): 238 if HeartRateService in adv.services: 239 self._status_update("Heart Rate: Found an advertisement") 240 self.hr_connection = self.ble.connect(adv) 241 self._status_update("Heart Rate: Connected") 242 break 243 self.ble.stop_scan() 244 if self.hr_connection: 245 self._hr_service = self.hr_connection[HeartRateService] 246 return self.hr_connection 247 248 249 @staticmethod 250 def _has_timed_out(start, timeout): 251 if time.time() - start >= timeout: 252 return True 253 return False 254 255 def ams_connect(self, start=time.time(), timeout=30): 256 """ 257 Connect to an Apple device using the ble_apple_media library 258 """ 259 self._status_update("AppleMediaService: Connect your phone now") 260 radio = adafruit_ble.BLERadio() 261 a = SolicitServicesAdvertisement() 262 a.solicited_services.append(AppleMediaService) 263 radio.start_advertising(a) 264 265 while not radio.connected and not self._has_timed_out(start, timeout): 266 pass 267 268 self._status_update("AppleMediaService: Connected") 269 for connection in radio.connections: 270 if not connection.paired: 271 connection.pair() 272 self._status_update("AppleMediaService: Paired") 273 self.ams = connection[AppleMediaService] 274 275 return radio 276 277 278 def speed_cadence_connect(self): 279 """ 280 Connects to speed and cadence sensor 281 """ 282 self._status_update("Speed and Cadence: Scanning...") 283 # Save advertisements, indexed by address 284 advs = {} 285 for adv in self.ble.start_scan(ProvideServicesAdvertisement, timeout=5): 286 if CyclingSpeedAndCadenceService in adv.services: 287 self._status_update("Speed and Cadence: Found an advertisement") 288 # Save advertisement. Overwrite duplicates from same address (device). 289 advs[adv.address] = adv 290 291 self.ble.stop_scan() 292 self._status_update("Speed and Cadence: Stopped scanning") 293 if not advs: 294 # Nothing found. Go back and keep looking. 295 return [] 296 297 # Connect to all available CSC sensors. 298 self.cyc_connections = [] 299 for adv in advs.values(): 300 self.cyc_connections.append(self.ble.connect(adv)) 301 self._status_update("Speed and Cadence: Connected {}".format(len(self.cyc_connections))) 302 303 self.cyc_services = [] 304 for conn in self.cyc_connections: 305 self.cyc_services.append(conn[CyclingSpeedAndCadenceService]) 306 self._status_update("Pyloton: Finishing up...") 307 308 return self.cyc_connections 309 310 311 def _compute_speed(self, values, speed): 312 wheel_diff = values.last_wheel_event_time - self._previous_wheel 313 rev_diff = values.cumulative_wheel_revolutions - self._previous_revolutions 314 if wheel_diff: 315 # Rotations per minute is 60 times the amount of revolutions since 316 # the last update over the time since the last update 317 rpm = 60*(rev_diff/(wheel_diff/1024)) 318 # We then mutiply it by the wheel's circumference and convert it to mph 319 speed = round((rpm * self.circumference) * (60/63360), 1) 320 if speed < 0: 321 speed = self._previous_speed 322 self._previous_speed = speed 323 self._previous_revolutions = values.cumulative_wheel_revolutions 324 self._speed_failed = 0 325 else: 326 self._speed_failed += 1 327 if self._speed_failed >= 3: 328 speed = 0 329 self._previous_wheel = values.last_wheel_event_time 330 331 return speed 332 333 334 def _compute_cadence(self, values, cadence): 335 crank_diff = values.last_crank_event_time - self._previous_crank 336 crank_rev_diff = values.cumulative_crank_revolutions-self._previous_rev 337 338 if crank_rev_diff: 339 # Rotations per minute is 60 times the amount of revolutions since the 340 # last update over the time since the last update 341 cadence = round(60*(crank_rev_diff/(crank_diff/1024)), 1) 342 if cadence < 0: 343 cadence = self._previous_cadence 344 self._previous_cadence = cadence 345 self._previous_rev = values.cumulative_crank_revolutions 346 self._cadence_failed = 0 347 else: 348 self._cadence_failed += 1 349 if self._cadence_failed >= 3: 350 cadence = 0 351 self._previous_crank = values.last_crank_event_time 352 353 return cadence 354 355 356 def read_s_and_c(self): 357 """ 358 Reads data from the speed and cadence sensor 359 """ 360 speed = self._previous_speed 361 cadence = self._previous_cadence 362 for conn, svc in zip(self.cyc_connections, self.cyc_services): 363 if not conn.connected: 364 speed = cadence = 0 365 continue 366 values = svc.measurement_values 367 if not values: 368 if self._cadence_failed >= 3 or self._speed_failed >= 3: 369 if self._cadence_failed > 3: 370 cadence = 0 371 if self._speed_failed > 3: 372 speed = 0 373 continue 374 if not values.last_wheel_event_time: 375 continue 376 speed = self._compute_speed(values, speed) 377 if not values.last_crank_event_time: 378 continue 379 cadence = self._compute_cadence(values, cadence) 380 381 if speed: 382 speed = str(speed)[:8] 383 if cadence: 384 cadence = str(cadence)[:8] 385 386 return speed, cadence 387 388 389 def read_heart(self): 390 """ 391 Reads date from the heart rate sensor 392 """ 393 measurement = self._hr_service.measurement_values 394 if measurement is None: 395 heart = self._previous_heart 396 else: 397 heart = measurement.heart_rate 398 self._previous_heart = measurement.heart_rate 399 400 if heart: 401 heart = str(heart)[:4] 402 403 return heart 404 405 406 def read_ams(self): 407 """ 408 Reads data from AppleMediaServices 409 """ 410 current = time.time() 411 try: 412 if current - self.start > 3: 413 self.track_artist = not self.track_artist 414 self.start = time.time() 415 if self.track_artist: 416 data = self.ams.artist 417 if not self.track_artist: 418 data = self.ams.title 419 except (RuntimeError, UnicodeError): 420 data = None 421 422 if data: 423 data = data[:16] + (data[16:] and '..') 424 425 return data 426 427 428 def icon_maker(self, n, icon_x, icon_y): 429 """ 430 Generates icons as sprites 431 """ 432 sprite = displayio.TileGrid(self.sprite_sheet, pixel_shader=self.palette, width=1, 433 height=1, tile_width=40, tile_height=40, default_tile=n, 434 x=icon_x, y=icon_y) 435 return sprite 436 437 438 def _label_maker(self, text, x, y, font=None): 439 """ 440 Generates labels 441 """ 442 if not font: 443 font = self.arial24 444 return label.Label(font=font, x=x, y=y, text=text, color=self.WHITE) 445 446 447 def _get_y(self): 448 """ 449 Helper function for setup_display. Gets the y values used for sprites and labels. 450 """ 451 enabled = self.num_enabled 452 453 if self.heart_enabled: 454 self._heart_y = 45*(self.num_enabled - enabled) + 75 455 enabled -= 1 456 if self.speed_enabled: 457 self._speed_y = 45*(self.num_enabled - enabled) + 75 458 enabled -= 1 459 if self.cadence_enabled: 460 self._cadence_y = 45*(self.num_enabled - enabled) + 75 461 enabled -= 1 462 if self.ams_enabled: 463 self._ams_y = 45*(self.num_enabled - enabled) + 75 464 enabled -= 1 465 466 467 def setup_display(self): 468 """ 469 Prepares the display to show sensor values: Adds a header, a heading, and various sprites. 470 """ 471 self._get_y() 472 sprites = displayio.Group() 473 474 rect = Rect(0, 0, 240, 50, fill=self.PURPLE) 475 self.splash.append(rect) 476 477 heading = label.Label(font=self.arial24, x=55, y=25, text="Pyloton", color=self.YELLOW) 478 self.splash.append(heading) 479 480 if self.heart_enabled: 481 heart_sprite = self.icon_maker(0, 2, self._heart_y - 20) 482 sprites.append(heart_sprite) 483 484 if self.speed_enabled: 485 speed_sprite = self.icon_maker(1, 2, self._speed_y - 20) 486 sprites.append(speed_sprite) 487 488 if self.cadence_enabled: 489 cadence_sprite = self.icon_maker(2, 2, self._cadence_y - 20) 490 sprites.append(cadence_sprite) 491 492 if self.ams_enabled: 493 ams_sprite = self.icon_maker(3, 2, self._ams_y - 20) 494 sprites.append(ams_sprite) 495 496 self.splash.append(sprites) 497 498 self.display.show(self.splash) 499 while self.loading_group: 500 self.loading_group.pop() 501 502 503 def update_display(self): #pylint: disable=too-many-branches 504 """ 505 Updates the display to display the most recent values 506 """ 507 if self.speed_enabled or self.cadence_enabled: 508 speed, cadence = self.read_s_and_c() 509 510 if self.heart_enabled: 511 heart = self.read_heart() 512 if not self._setup: 513 self._hr_label = self._label_maker('{} bpm'.format(heart), 50, self._heart_y) # 75 514 self.splash.append(self._hr_label) 515 else: 516 self._hr_label.text = '{} bpm'.format(heart) 517 518 if self.speed_enabled: 519 if not self._setup: 520 self._sp_label = self._label_maker('{} mph'.format(speed), 50, self._speed_y) # 120 521 self.splash.append(self._sp_label) 522 else: 523 self._sp_label.text = '{} mph'.format(speed) 524 525 if self.cadence_enabled: 526 if not self._setup: 527 self._cadence_label = self._label_maker('{} rpm'.format(cadence), 50, 528 self._cadence_y) 529 self.splash.append(self._cadence_label) 530 else: 531 self._cadence_label.text = '{} rpm'.format(cadence) 532 533 if self.ams_enabled: 534 ams = self.read_ams() 535 if not self._setup: 536 self._ams_label = self._label_maker('{}'.format(ams), 50, self._ams_y, 537 font=self.arial16) 538 self.splash.append(self._ams_label) 539 else: 540 self._ams_label.text = '{}'.format(ams) 541 542 self._setup = True 543 544 545 def ams_remote(self): 546 """ 547 Allows the 2 buttons and 3 capacitive touch pads in the CLUE to function as a media remote. 548 """ 549 try: 550 # Capacitive touch pad marked 0 goes to the previous track 551 if self.clue.touch_0: 552 self.ams.previous_track() 553 time.sleep(0.25) 554 555 # Capacitive touch pad marked 1 toggles pause/play 556 if self.clue.touch_1: 557 self.ams.toggle_play_pause() 558 time.sleep(0.25) 559 560 # Capacitive touch pad marked 2 advances to the next track 561 if self.clue.touch_2: 562 self.ams.next_track() 563 time.sleep(0.25) 564 565 # If button B (on the right) is pressed, it increases the volume 566 if self.clue.button_b: 567 self.ams.volume_up() 568 time.sleep(0.1) 569 570 # If button A (on the left) is pressed, the volume decreases 571 if self.clue.button_a: 572 self.ams.volume_down() 573 time.sleep(0.1) 574 except (RuntimeError, UnsupportedCommand, AttributeError): 575 return