code.py
1 # SPDX-FileCopyrightText: 2020 Kevin J. Walters for Adafruit Industries 2 # 3 # SPDX-License-Identifier: MIT 4 5 # cpb-quick-draw v1.11 6 # CircuitPython (on CPBs) Quick Draw reaction game 7 # This is a two player game using two Circuit Playground Bluefruit boards 8 # to test the reaction time of the players in a "quick draw" with the 9 # synchronisation and draw times exchanged via Bluetooth Low Energy 10 # The switches must be set to DIFFERENT positions on the two CPBs 11 12 # Tested with Circuit Playground Bluefruit Alpha 13 # and CircuitPython and 5.0.0-beta.2 14 15 # Needs recent adafruit_ble and adafruit_circuitplayground.bluefruit libraries 16 17 # copy this file to CPB board as code.py 18 19 # MIT License 20 21 # Copyright (c) 2020 Kevin J. Walters 22 23 # Permission is hereby granted, free of charge, to any person obtaining a copy 24 # of this software and associated documentation files (the "Software"), to deal 25 # in the Software without restriction, including without limitation the rights 26 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 27 # copies of the Software, and to permit persons to whom the Software is 28 # furnished to do so, subject to the following conditions: 29 30 # The above copyright notice and this permission notice shall be included in 31 # all copies or substantial portions of the Software. 32 33 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 34 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 35 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 36 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 37 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 38 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 39 # SOFTWARE. 40 41 import time 42 import gc 43 import struct 44 import random # On a CPB this seeds from a hardware RNG in the CPU 45 46 # This is the new cp object which works on CPX and CPB 47 from adafruit_circuitplayground import cp 48 49 from adafruit_ble import BLERadio 50 from adafruit_ble.advertising.standard import ProvideServicesAdvertisement 51 from adafruit_ble.services.nordic import UARTService 52 53 from adafruit_bluefruit_connect.packet import Packet 54 55 debug = 3 56 57 # Bluetooth scanning timeout 58 BLESCAN_TIMEOUT = 5 59 60 TURNS = 10 61 # Integer number of seconds 62 SHORTEST_DELAY = 1 63 LONGEST_DELAY = 5 # This was 10 in the original game 64 65 # Misdraw time (100ms) 66 IMPOSSIBLE_DUR = 0.1 67 68 # The duration of the short blue flashes (in seconds) during time delay 69 # measurement in ping_for_rtt() and the long one at the end 70 SYNC_FLASH_DUR = 0.1 71 SYNCED_LONGFLASH_DUR = 2 72 73 # The pause between displaying each pixel in the result summary 74 SUMMARY_DUR = 0.5 75 76 # The number of "pings" sent by ping_for_rtt() 77 NUM_PINGS = 8 78 79 # Special values used to indicate failed exchange of reaction times 80 # and no value 81 ERROR_DUR = -1.0 82 TIME_NONE = -2.0 83 84 # A timeout value for the protocol 85 protocol_timeout = 14.0 86 87 # These application specific packets could be placed in an another file 88 # and then import'ed 89 class TimePacket(Packet): 90 """A packet for exchanging time information, 91 duration (last rtt) and time.monotonic() and lastrtt.""" 92 93 _FMT_PARSE = '<xxffx' 94 PACKET_LENGTH = struct.calcsize(_FMT_PARSE) 95 # _FMT_CONSTRUCT doesn't include the trailing checksum byte. 96 _FMT_CONSTRUCT = '<2sff' 97 98 # Using lower case in attempt to avoid clashing with standard packets 99 _TYPE_HEADER = b'!z' 100 101 # number of args must match _FMT_PARSE 102 # for Packet.parse_private() to work, hence the sendtime parameter 103 def __init__(self, duration, sendtime): 104 """Construct a TimePacket.""" 105 self._duration = duration 106 self._sendtime = sendtime # over-written later in to_bytes() 107 108 def to_bytes(self): 109 """Return the bytes needed to send this packet. 110 Unusually this also sets the sendtime to the current time indicating 111 when the data was serialised. 112 """ 113 self._sendtime = time.monotonic() # refresh _sendtime 114 partial_packet = struct.pack(self._FMT_CONSTRUCT, self._TYPE_HEADER, 115 self._duration, self._sendtime) 116 return self.add_checksum(partial_packet) 117 118 @property 119 def duration(self): 120 """The last rtt value or a negative number if n/a.""" 121 return self._duration 122 123 @property 124 def sendtime(self): 125 """The time packet was sent (when to_bytes() was last called).""" 126 return self._sendtime 127 128 TimePacket.register_packet_type() 129 130 131 class StartGame(Packet): # pylint: disable=too-few-public-methods 132 """A packet to indicate the receiver must start the game immediately.""" 133 134 _FMT_PARSE = '<xxx' 135 PACKET_LENGTH = struct.calcsize(_FMT_PARSE) 136 # _FMT_CONSTRUCT doesn't include the trailing checksum byte. 137 _FMT_CONSTRUCT = '<2s' 138 139 # Using lower case in attempt to avoid clashing with standard packets 140 _TYPE_HEADER = b'!y' 141 142 def to_bytes(self): 143 """Return the bytes needed to send this packet. 144 """ 145 partial_packet = struct.pack(self._FMT_CONSTRUCT, self._TYPE_HEADER) 146 return self.add_checksum(partial_packet) 147 148 149 StartGame.register_packet_type() 150 151 # This board's role is determine by the switch, the two CPBs must have 152 # the switch in different positions 153 # left is the master / client / central device 154 # right is the slave / server / peripheral device 155 master_device = cp.switch # True when switch is left (near ear symbol) 156 157 # The default brightness is 1.0 - leaving at that as it 158 # improves performance by removing need for a second buffer in memory 159 # 10 is number of NeoPixels on CPX/CPB 160 numpixels = 10 161 halfnumpixels = numpixels // 2 162 pixels = cp.pixels 163 164 faint_red = (1, 0, 0) 165 red = (40, 0, 0) 166 green = (0, 30, 0) 167 blue = (0, 0, 10) 168 brightblue = (0, 0, 100) 169 yellow = (40, 20, 0) 170 white = (30, 30, 30) 171 black = (0, 0, 0) 172 173 win_colour = green 174 win_pixels = [win_colour] * halfnumpixels 175 opponent_misdraw_colour = faint_red 176 misdraw_colour = red 177 misdraw_pixels = [misdraw_colour] * halfnumpixels 178 draw_colour = yellow 179 draw_pixels = [draw_colour] * halfnumpixels 180 lose_colour = black 181 182 if master_device: 183 # button A is on left (usb at top 184 player_button = lambda: cp.button_a 185 # player_button.switch_to_input(pull=digitalio.Pull.DOWN) 186 187 player_px = (0, halfnumpixels) 188 opponent_px = (halfnumpixels, numpixels) 189 else: 190 # button B is on right 191 player_button = lambda: cp.button_b 192 # player_button.switch_to_input(pull=digitalio.Pull.DOWN) 193 194 player_px = (halfnumpixels, numpixels) 195 opponent_px = (0, halfnumpixels) 196 197 198 def d_print(level, *args, **kwargs): 199 """A simple conditional print for debugging based on global debug level.""" 200 if not isinstance(level, int): 201 print(level, *args, **kwargs) 202 elif debug >= level: 203 print(*args, **kwargs) 204 205 206 def read_packet(timeout=None): 207 """Read a packet with an optional locally implemented timeout. 208 This is a workaround due to the timeout not being configurable.""" 209 if timeout is None: 210 return Packet.from_stream(uart) # Current fixed timeout is 1s 211 212 packet = None 213 read_start_t = time.monotonic() 214 while packet is None and time.monotonic() - read_start_t < timeout: 215 packet = Packet.from_stream(uart) 216 return packet 217 218 219 def connect(): 220 """Connect two boards using the first Nordic UARTService the client 221 finds over Bluetooth Low Energy. 222 No timeouts, will wait forever.""" 223 new_conn = None 224 new_uart = None 225 if master_device: 226 # Master code 227 while new_uart is None: 228 d_print("Disconnected, scanning") 229 for advertisement in ble.start_scan(ProvideServicesAdvertisement, 230 timeout=BLESCAN_TIMEOUT): 231 d_print(2, advertisement.address, advertisement.rssi, "dBm") 232 if UARTService not in advertisement.services: 233 continue 234 d_print(1, "Connecting to", advertisement.address) 235 ble.connect(advertisement) 236 break 237 for conns in ble.connections: 238 if UARTService in conns: 239 d_print("Found UARTService") 240 new_conn = conns 241 new_uart = conns[UARTService] 242 break 243 ble.stop_scan() 244 245 else: 246 # Slave code 247 new_uart = UARTService() 248 advertisement = ProvideServicesAdvertisement(new_uart) 249 d_print("Advertising") 250 ble.start_advertising(advertisement) 251 # Is there a conn object somewhere here?? 252 while not ble.connected: 253 pass 254 255 return (new_conn, new_uart) 256 257 258 def ping_for_rtt(): # pylint: disable=too-many-branches,too-many-statements 259 """Calculate the send time for Bluetooth Low Energy based from 260 a series of round-trip time measurements and assuming that 261 half of that is the send time. 262 This code must be run at approximately the same time 263 on each device as the timeout per packet is one second.""" 264 # The rtt is sent to server but for first packet client 265 # sent there's no value to send, -1.0 is specal first packet value 266 rtt = TIME_NONE 267 rtts = [] 268 offsets = [] 269 270 if master_device: 271 # Master code 272 while True: 273 gc.collect() # an opportune moment 274 request = TimePacket(rtt, TIME_NONE) 275 d_print(2, "TimePacket TX") 276 uart.write(request.to_bytes()) 277 response = Packet.from_stream(uart) 278 t2 = time.monotonic() 279 if isinstance(response, TimePacket): 280 d_print(2, "TimePacket RX", response.sendtime) 281 rtt = t2 - request.sendtime 282 rtts.append(rtt) 283 time_remote_cpb = response.sendtime + rtt / 2.0 284 offset = time_remote_cpb - t2 285 offsets.append(offset) 286 d_print(3, 287 "RTT plus a bit={:f},".format(rtt), 288 "remote_time={:f},".format(time_remote_cpb), 289 "offset={:f}".format(offset)) 290 if len(rtts) >= NUM_PINGS: 291 break 292 293 pixels.fill(blue) 294 time.sleep(SYNC_FLASH_DUR) 295 pixels.fill(black) 296 # This second sleep is very important to ensure that the 297 # server is already awaiting the next packet before client 298 # sends it to avoid server instantly reading buffered packets 299 time.sleep(SYNC_FLASH_DUR) 300 301 else: 302 responses = 0 303 while True: 304 gc.collect() # an opportune moment 305 packet = Packet.from_stream(uart) 306 if isinstance(packet, TimePacket): 307 d_print(2, "TimePacket RX", packet.sendtime) 308 # Send response 309 uart.write(TimePacket(TIME_NONE, TIME_NONE).to_bytes()) 310 responses += 1 311 rtts.append(packet.duration) 312 pixels.fill(blue) 313 time.sleep(SYNC_FLASH_DUR) 314 pixels.fill(black) 315 elif packet is None: 316 # This could be a timeout or an indication of a disconnect 317 d_print(2, "None from from_stream()") 318 else: 319 print("Unexpected packet type", packet) 320 if responses >= NUM_PINGS: 321 break 322 323 # indicate a good rtt calculate, skip first one 324 # as it's not present on slave 325 if debug >= 3: 326 print("RTTs:", rtts) 327 if master_device: 328 rtt_start = 1 329 rtt_end = len(rtts) - 1 330 else: 331 rtt_start = 2 332 rtt_end = len(rtts) 333 334 # Use quickest ones and hope any outlier times don't reoccur! 335 quicker_rtts = sorted(rtts[rtt_start:rtt_end])[0:(NUM_PINGS // 2) + 1] 336 mean_rtt = sum(quicker_rtts) / len(quicker_rtts) 337 # Assuming symmetry between send and receive times 338 # this may not be perfectly true, parsing is one factor here 339 send_time = mean_rtt / 2.0 340 341 d_print(2, "send_time=", send_time) 342 343 # Indicate sync with a longer 2 second blue flash 344 pixels.fill(brightblue) 345 time.sleep(SYNCED_LONGFLASH_DUR) 346 pixels.fill(black) 347 return send_time 348 349 350 def random_pause(): 351 """This is the pause before the players draw. 352 It only runs on the master (BLE client) as it should be followed 353 by a synchronising barrier.""" 354 if master_device: 355 time.sleep(random.randint(SHORTEST_DELAY, LONGEST_DELAY)) 356 357 358 def barrier(packet_send_time): 359 """Master send a Start message and then waits for a reply. 360 Slave waits for Start message, then sends reply, then pauses 361 for packet_send_time so both master and slave return from 362 barrier() at the same time.""" 363 364 if master_device: 365 uart.write(StartGame().to_bytes()) 366 d_print(2, "StartGame TX") 367 packet = read_packet(timeout=protocol_timeout) 368 if isinstance(packet, StartGame): 369 d_print(2, "StartGame RX") 370 else: 371 print("Unexpected packet type", packet) 372 373 else: 374 packet = read_packet(timeout=protocol_timeout) 375 if isinstance(packet, StartGame): 376 d_print(2, "StartGame RX") 377 uart.write(StartGame().to_bytes()) 378 d_print(2, "StartGame TX") 379 else: 380 print("Unexpected packet type", packet) 381 382 print("Sleeping to sync up", packet_send_time) 383 time.sleep(packet_send_time) 384 385 386 def sync_test(): 387 """For testing synchronisation. Warning - this is flashes a lot!""" 388 for _ in range(40): 389 pixels.fill(white) 390 time.sleep(0.1) 391 pixels.fill(black) 392 time.sleep(0.1) 393 394 395 def get_opponent_reactiontime(player_reaction): 396 """Send reaction time data to the other player and receive theirs. 397 Reusing the TimePacket() for this.""" 398 opponent_reaction = ERROR_DUR 399 if master_device: 400 uart.write(TimePacket(player_reaction, 401 TIME_NONE).to_bytes()) 402 print("TimePacket TX") 403 packet = read_packet(timeout=protocol_timeout) 404 if isinstance(packet, TimePacket): 405 d_print(2, "TimePacket RX") 406 opponent_reaction = packet.duration 407 else: 408 d_print(2, "Unexpected packet type", packet) 409 410 else: 411 packet = read_packet(timeout=protocol_timeout) 412 if isinstance(packet, TimePacket): 413 d_print(2, "TimePacket RX") 414 opponent_reaction = packet.duration 415 uart.write(TimePacket(player_reaction, 416 TIME_NONE).to_bytes()) 417 d_print(2, "TimePacket TX") 418 else: 419 print("Unexpected packet type", packet) 420 return opponent_reaction 421 422 423 def show_winner(player_reaction, opponent_reaction): 424 """Show the winner on the appropriate set of NeoPixels. 425 Returns win, misdraw, draw, colour) - 3 booleans and a result colour.""" 426 l_win = False 427 l_misdraw = False 428 l_draw = False 429 l_colour = lose_colour 430 431 if player_reaction < IMPOSSIBLE_DUR or opponent_reaction < IMPOSSIBLE_DUR: 432 if opponent_reaction != ERROR_DUR and opponent_reaction < IMPOSSIBLE_DUR: 433 pixels[opponent_px[0]:opponent_px[1]] = misdraw_pixels 434 l_colour = opponent_misdraw_colour 435 436 # This must come after previous if to get the most appropriate colour 437 if player_reaction != ERROR_DUR and player_reaction < IMPOSSIBLE_DUR: 438 l_misdraw = True 439 pixels[player_px[0]:player_px[1]] = misdraw_pixels 440 l_colour = misdraw_colour # overwrite any opponent_misdraw_colour 441 442 else: 443 if player_reaction < opponent_reaction: 444 l_win = True 445 pixels[player_px[0]:player_px[1]] = win_pixels 446 l_colour = win_colour 447 elif opponent_reaction < player_reaction: 448 pixels[opponent_px[0]:opponent_px[1]] = win_pixels 449 else: 450 # Equality! Very unlikely to reach here 451 l_draw = False 452 pixels[player_px[0]:player_px[1]] = draw_pixels 453 pixels[opponent_px[0]:opponent_px[1]] = draw_pixels 454 l_colour = draw_colour 455 456 return (l_win, l_misdraw, l_draw, l_colour) 457 458 459 def show_summary(result_colours): 460 """Show the results on the NeoPixels.""" 461 # trim anything beyond 10 462 for idx, p_colour in enumerate(result_colours[0:numpixels]): 463 pixels[idx] = p_colour 464 time.sleep(SUMMARY_DUR) 465 466 # CPB auto-seeds from hardware random number generation on the nRF52840 chip 467 # Note: original code for CPX uses A4-A7 analog inputs, 468 # CPB cannot use A7 for analog in 469 470 wins = 0 471 misdraws = 0 472 losses = 0 473 draws = 0 474 475 # default timeout is 1.0 and on latest library with UARTService this 476 # cannot be changed 477 ble = BLERadio() 478 479 # Connect the two boards over Bluetooth Low Energy 480 # Switch on left for master / client, switch on right for slave / server 481 d_print("connect()") 482 (conn, uart) = connect() 483 484 # Calculate round-trip time (rtt) delay between the two CPB boards 485 # flashing blue to indicate the packets and longer 2s flash when done 486 ble_send_time = None 487 d_print("ping_for_rtt()") 488 ble_send_time = ping_for_rtt() 489 490 my_results = [] 491 492 # play the game for a number of TURNS then show results 493 for _ in range(TURNS): 494 # This is an attempt to force a reconnection but may not take into 495 # account all disconnection scenarios 496 if uart is None: 497 (conn, uart) = connect() 498 499 # This is a good time to garbage collect 500 gc.collect() 501 502 # Random pause to stop players preempting the draw 503 random_pause() 504 505 try: 506 # Synchronise the two boards by exchanging a Start message 507 d_print("barrier()") 508 barrier(ble_send_time) 509 510 if debug >= 4: 511 sync_test() 512 513 # Show white on all NeoPixels to indicate draw now 514 # This will execute at the same time on both boards 515 pixels.fill(white) 516 517 # Wait for and time how long it takes for player to press button 518 start_t = time.monotonic() 519 while not player_button(): 520 pass 521 finish_t = time.monotonic() 522 523 # Turn-off NeoPixels 524 pixels.fill(black) 525 526 # Play the shooting sound 527 # 16k mono 8bit normalised version of 528 # https://freesound.org/people/Diboz/sounds/213925/ 529 cp.play_file("PistolRicochet.wav") 530 531 # The CPBs are no longer synchronised due to reaction time varying 532 # per player 533 # Exchange draw times 534 player_reaction_dur = finish_t - start_t 535 opponent_reaction_dur = get_opponent_reactiontime(player_reaction_dur) 536 537 # Show green for winner and red for any misdraws 538 (win, misdraw, draw, colour) = show_winner(player_reaction_dur, 539 opponent_reaction_dur) 540 my_results.append(colour) 541 if misdraw: 542 misdraw += 1 543 elif draw: 544 draws += 1 545 elif win: 546 wins += 1 547 else: 548 losses += 1 549 550 # Output reaction times to serial console in Mu friendly format 551 print("({:d}, {:d}, {:f}, {:f})".format(wins, misdraws, 552 player_reaction_dur, 553 opponent_reaction_dur)) 554 555 # Keep NeoPixel result colour for 5 seconds then turn-off and repeat 556 time.sleep(5) 557 except Exception as err: # pylint: disable=broad-except 558 print("Caught exception", err) 559 if conn is not None: 560 conn.disconnect() 561 conn = None 562 uart = None 563 564 pixels.fill(black) 565 566 # show results summary on NeoPixels 567 show_summary(my_results) 568 569 # infinite pause to stop the code completing which would turn off NeoPixels 570 while True: 571 pass