/ adafruit_ble_apple_media.py
adafruit_ble_apple_media.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2020 Scott Shawcroft for Adafruit Industries LLC 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `adafruit_ble_apple_media` 24 ================================================================================ 25 26 Support for the Apple Media Service which provides media playback info and control. 27 28 Documented by Apple here: 29 https://developer.apple.com/library/archive/documentation/CoreBluetooth/Reference/AppleMediaService_Reference/Introduction/Introduction.html#//apple_ref/doc/uid/TP40014716-CH2-SW1 30 31 """ 32 import struct 33 import time 34 35 import _bleio 36 37 from adafruit_ble.attributes import Attribute 38 from adafruit_ble.characteristics import Characteristic, ComplexCharacteristic 39 from adafruit_ble.uuid import VendorUUID 40 from adafruit_ble.services import Service 41 42 __version__ = "0.0.0-auto.0" 43 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE_Apple_Media.git" 44 45 # Disable protected access checks since our private classes are tightly coupled. 46 # pylint: disable=protected-access 47 48 49 class _RemoteCommand(ComplexCharacteristic): 50 """Endpoint for sending commands to a media player. The value read will list all available 51 52 commands.""" 53 54 uuid = VendorUUID("9B3C81D8-57B1-4A8A-B8DF-0E56F7CA51C2") 55 56 def __init__(self): 57 super().__init__( 58 properties=Characteristic.WRITE_NO_RESPONSE | Characteristic.NOTIFY, 59 read_perm=Attribute.OPEN, 60 write_perm=Attribute.OPEN, 61 max_length=13, 62 fixed_length=False, 63 ) 64 65 def bind(self, service): 66 """Binds the characteristic to the given Service.""" 67 bound_characteristic = super().bind(service) 68 return _bleio.PacketBuffer(bound_characteristic, buffer_size=1) 69 70 71 class _EntityUpdate(ComplexCharacteristic): 72 """UTF-8 Encoded string characteristic.""" 73 74 uuid = VendorUUID("2F7CABCE-808D-411F-9A0C-BB92BA96C102") 75 76 def __init__(self): 77 super().__init__( 78 properties=Characteristic.WRITE | Characteristic.NOTIFY, 79 read_perm=Attribute.OPEN, 80 write_perm=Attribute.OPEN, 81 max_length=128, 82 fixed_length=False, 83 ) 84 85 def bind(self, service): 86 """Binds the characteristic to the given Service.""" 87 bound_characteristic = super().bind(service) 88 return _bleio.PacketBuffer(bound_characteristic, buffer_size=8) 89 90 91 class _EntityAttribute(Characteristic): # pylint: disable=too-few-public-methods 92 """UTF-8 Encoded string characteristic.""" 93 94 uuid = VendorUUID("C6B2F38C-23AB-46D8-A6AB-A3A870BBD5D7") 95 96 def __init__(self): 97 super().__init__( 98 properties=Characteristic.WRITE | Characteristic.READ, 99 read_perm=Attribute.OPEN, 100 write_perm=Attribute.OPEN, 101 fixed_length=False, 102 ) 103 104 105 class _MediaAttribute: 106 def __init__(self, entity_id, attribute_id): 107 self.key = (entity_id, attribute_id) 108 109 @staticmethod 110 def _update(obj): 111 if not obj._buffer: 112 obj._buffer = bytearray(128) 113 length_read = obj._entity_update.readinto(obj._buffer) 114 if length_read > 0: 115 if length_read < 4: 116 raise RuntimeError("packet too short") 117 # Even though flags is currently unused, if it were removed, it would cause there to be 118 # too many values to unpack which would raise a ValueError 119 ( 120 entity_id, 121 attribute_id, 122 flags, # pylint: disable=unused-variable 123 ) = struct.unpack_from("<BBB", obj._buffer) 124 value = str(obj._buffer[3:length_read], "utf-8") 125 obj._attribute_cache[(entity_id, attribute_id)] = value 126 127 def __get__(self, obj, cls): 128 self._update(obj) 129 if self.key not in obj._attribute_cache: 130 siblings = [self.key[1]] 131 for k in obj._attribute_cache: 132 if k[0] == self.key[0] and k[1] not in siblings: 133 siblings.append(k[1]) 134 buf = struct.pack("<B" + "B" * len(siblings), self.key[0], *siblings) 135 obj._entity_update.write(buf) 136 obj._attribute_cache[self.key] = None 137 time.sleep(0.05) 138 self._update(obj) 139 return obj._attribute_cache[self.key] 140 141 142 class _MediaAttributePlaybackState: 143 def __init__(self, playback_value): 144 self._playback_value = playback_value 145 146 def __get__(self, obj, cls): 147 info = obj._playback_info 148 if info: 149 return int(info.split(",")[0]) == self._playback_value 150 return False 151 152 153 class _MediaAttributePlaybackInfo: 154 def __init__(self, position): 155 self._position = position 156 157 def __get__(self, obj, cls): 158 info = obj._playback_info 159 if info: 160 return float(info.split(",")[self._position]) 161 return 0 162 163 164 class UnsupportedCommand(Exception): 165 """Raised when the command isn't available with current media player app.""" 166 167 168 class AppleMediaService(Service): 169 """View and control currently playing media. 170 171 Exact functionality varies with different media apps. For example, Spotify will include the 172 album name and artist name in `title` when controlling playback on a remote device. 173 `artist` includes a description of the remote playback. 174 175 """ 176 177 uuid = VendorUUID("89D3502B-0F36-433A-8EF4-C502AD55F8DC") 178 179 _remote_command = _RemoteCommand() 180 _entity_update = _EntityUpdate() 181 _entity_attribute = _EntityAttribute() 182 183 player_name = _MediaAttribute(0, 0) 184 """Name of the media player app""" 185 _playback_info = _MediaAttribute(0, 1) 186 paused = _MediaAttributePlaybackState(0) 187 """True when playback is paused. False otherwise.""" 188 playing = _MediaAttributePlaybackState(1) 189 """True when playback is playing. False otherwise.""" 190 rewinding = _MediaAttributePlaybackState(2) 191 """True when playback is rewinding. False otherwise.""" 192 fast_forwarding = _MediaAttributePlaybackState(3) 193 """True when playback is fast-forwarding. False otherwise.""" 194 playback_rate = _MediaAttributePlaybackInfo(1) 195 """Playback rate as a decimal of normal speed.""" 196 elapsed_time = _MediaAttributePlaybackInfo(2) 197 """Time elapsed in the current track. Not updated as the track plays. Use (the amount of time 198 since read elapsed time) * `playback_rate` to estimate the current `elapsed_time`.""" 199 volume = _MediaAttribute(0, 2) 200 """Current volume""" 201 202 queue_index = _MediaAttribute(1, 0) 203 """Current track's index in the queue.""" 204 queue_length = _MediaAttribute(1, 1) 205 """Count of tracks in the queue.""" 206 shuffle_mode = _MediaAttribute(1, 2) 207 """Current shuffle mode as an integer. Off (0), One (1), and All (2)""" 208 repeat_mode = _MediaAttribute(1, 3) 209 """Current repeat mode as an integer. Off (0), One (1), and All (2)""" 210 211 artist = _MediaAttribute(2, 0) 212 """Current track's artist name.""" 213 album = _MediaAttribute(2, 1) 214 """Current track's album name.""" 215 title = _MediaAttribute(2, 2) 216 """Current track's title.""" 217 duration = _MediaAttribute(2, 3) 218 """Current track's duration as a string.""" 219 220 def __init__(self, **kwargs): 221 super().__init__(**kwargs) 222 self._buffer = None 223 self._cmd = None 224 self._register_buffer = None 225 self._attribute_cache = {} 226 self._supported_commands = [] 227 self._command_buffer = None 228 229 def _send_command(self, command_id): 230 if not self._command_buffer: 231 self._command_buffer = bytearray(13) 232 i = self._remote_command.readinto( # pylint: disable=no-member 233 self._command_buffer 234 ) 235 if i > 0: 236 self._supported_commands = list(self._command_buffer[:i]) 237 if command_id not in self._supported_commands: 238 if not self._supported_commands: 239 return 240 raise UnsupportedCommand() 241 if not self._cmd: 242 self._cmd = bytearray(1) 243 self._cmd[0] = command_id 244 self._remote_command.write(self._cmd) # pylint: disable=no-member 245 246 def play(self): 247 """Plays the current track. Does nothing if already playing.""" 248 self._send_command(0) 249 250 def pause(self): 251 """Pauses the current track. Does nothing if already paused.""" 252 self._send_command(1) 253 254 def toggle_play_pause(self): 255 """Plays the current track if it is paused. Otherwise it pauses the track.""" 256 self._send_command(2) 257 258 def next_track(self): 259 """Stops playing the current track and plays the next one.""" 260 self._send_command(3) 261 262 def previous_track(self): 263 """Stops playing the current track and plays the previous track.""" 264 self._send_command(4) 265 266 def volume_up(self): 267 """Increases the playback volume.""" 268 self._send_command(5) 269 270 def volume_down(self): 271 """Decreases the playback volume.""" 272 self._send_command(6) 273 274 def advance_repeat_mode(self): 275 """Advances the repeat mode. Modes are: Off, One and All""" 276 self._send_command(7) 277 278 def advance_shuffle_mode(self): 279 """Advances the shuffle mode. Modes are: Off, One and All""" 280 self._send_command(8) 281 282 def skip_forward(self): 283 """Skips forwards in the current track""" 284 self._send_command(9) 285 286 def skip_backward(self): 287 """Skips backwards in the current track""" 288 self._send_command(10) 289 290 def like_track(self): 291 """Likes the current track""" 292 self._send_command(11) 293 294 def dislike_track(self): 295 """Dislikes the current track""" 296 self._send_command(12) 297 298 def bookmark_track(self): 299 """Bookmarks the current track""" 300 self._send_command(13)