/ adafruit_ble_apple_media.py
adafruit_ble_apple_media.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2020 Scott Shawcroft for Adafruit Industries LLC
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_ble_apple_media`
 24  ================================================================================
 25  
 26  Support for the Apple Media Service which provides media playback info and control.
 27  
 28  Documented by Apple here:
 29  https://developer.apple.com/library/archive/documentation/CoreBluetooth/Reference/AppleMediaService_Reference/Introduction/Introduction.html#//apple_ref/doc/uid/TP40014716-CH2-SW1
 30  
 31  """
 32  import struct
 33  import time
 34  
 35  import _bleio
 36  
 37  from adafruit_ble.attributes import Attribute
 38  from adafruit_ble.characteristics import Characteristic, ComplexCharacteristic
 39  from adafruit_ble.uuid import VendorUUID
 40  from adafruit_ble.services import Service
 41  
 42  __version__ = "0.0.0-auto.0"
 43  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE_Apple_Media.git"
 44  
 45  # Disable protected access checks since our private classes are tightly coupled.
 46  # pylint: disable=protected-access
 47  
 48  
 49  class _RemoteCommand(ComplexCharacteristic):
 50      """Endpoint for sending commands to a media player. The value read will list all available
 51  
 52         commands."""
 53  
 54      uuid = VendorUUID("9B3C81D8-57B1-4A8A-B8DF-0E56F7CA51C2")
 55  
 56      def __init__(self):
 57          super().__init__(
 58              properties=Characteristic.WRITE_NO_RESPONSE | Characteristic.NOTIFY,
 59              read_perm=Attribute.OPEN,
 60              write_perm=Attribute.OPEN,
 61              max_length=13,
 62              fixed_length=False,
 63          )
 64  
 65      def bind(self, service):
 66          """Binds the characteristic to the given Service."""
 67          bound_characteristic = super().bind(service)
 68          return _bleio.PacketBuffer(bound_characteristic, buffer_size=1)
 69  
 70  
 71  class _EntityUpdate(ComplexCharacteristic):
 72      """UTF-8 Encoded string characteristic."""
 73  
 74      uuid = VendorUUID("2F7CABCE-808D-411F-9A0C-BB92BA96C102")
 75  
 76      def __init__(self):
 77          super().__init__(
 78              properties=Characteristic.WRITE | Characteristic.NOTIFY,
 79              read_perm=Attribute.OPEN,
 80              write_perm=Attribute.OPEN,
 81              max_length=128,
 82              fixed_length=False,
 83          )
 84  
 85      def bind(self, service):
 86          """Binds the characteristic to the given Service."""
 87          bound_characteristic = super().bind(service)
 88          return _bleio.PacketBuffer(bound_characteristic, buffer_size=8)
 89  
 90  
 91  class _EntityAttribute(Characteristic):  # pylint: disable=too-few-public-methods
 92      """UTF-8 Encoded string characteristic."""
 93  
 94      uuid = VendorUUID("C6B2F38C-23AB-46D8-A6AB-A3A870BBD5D7")
 95  
 96      def __init__(self):
 97          super().__init__(
 98              properties=Characteristic.WRITE | Characteristic.READ,
 99              read_perm=Attribute.OPEN,
100              write_perm=Attribute.OPEN,
101              fixed_length=False,
102          )
103  
104  
105  class _MediaAttribute:
106      def __init__(self, entity_id, attribute_id):
107          self.key = (entity_id, attribute_id)
108  
109      @staticmethod
110      def _update(obj):
111          if not obj._buffer:
112              obj._buffer = bytearray(128)
113          length_read = obj._entity_update.readinto(obj._buffer)
114          if length_read > 0:
115              if length_read < 4:
116                  raise RuntimeError("packet too short")
117              # Even though flags is currently unused, if it were removed, it would cause there to be
118              # too many values to unpack which would raise a ValueError
119              (
120                  entity_id,
121                  attribute_id,
122                  flags,  # pylint: disable=unused-variable
123              ) = struct.unpack_from("<BBB", obj._buffer)
124              value = str(obj._buffer[3:length_read], "utf-8")
125              obj._attribute_cache[(entity_id, attribute_id)] = value
126  
127      def __get__(self, obj, cls):
128          self._update(obj)
129          if self.key not in obj._attribute_cache:
130              siblings = [self.key[1]]
131              for k in obj._attribute_cache:
132                  if k[0] == self.key[0] and k[1] not in siblings:
133                      siblings.append(k[1])
134              buf = struct.pack("<B" + "B" * len(siblings), self.key[0], *siblings)
135              obj._entity_update.write(buf)
136              obj._attribute_cache[self.key] = None
137              time.sleep(0.05)
138              self._update(obj)
139          return obj._attribute_cache[self.key]
140  
141  
142  class _MediaAttributePlaybackState:
143      def __init__(self, playback_value):
144          self._playback_value = playback_value
145  
146      def __get__(self, obj, cls):
147          info = obj._playback_info
148          if info:
149              return int(info.split(",")[0]) == self._playback_value
150          return False
151  
152  
153  class _MediaAttributePlaybackInfo:
154      def __init__(self, position):
155          self._position = position
156  
157      def __get__(self, obj, cls):
158          info = obj._playback_info
159          if info:
160              return float(info.split(",")[self._position])
161          return 0
162  
163  
164  class UnsupportedCommand(Exception):
165      """Raised when the command isn't available with current media player app."""
166  
167  
168  class AppleMediaService(Service):
169      """View and control currently playing media.
170  
171      Exact functionality varies with different media apps. For example, Spotify will include the
172      album name and artist name in `title` when controlling playback on a remote device.
173      `artist` includes a description of the remote playback.
174  
175      """
176  
177      uuid = VendorUUID("89D3502B-0F36-433A-8EF4-C502AD55F8DC")
178  
179      _remote_command = _RemoteCommand()
180      _entity_update = _EntityUpdate()
181      _entity_attribute = _EntityAttribute()
182  
183      player_name = _MediaAttribute(0, 0)
184      """Name of the media player app"""
185      _playback_info = _MediaAttribute(0, 1)
186      paused = _MediaAttributePlaybackState(0)
187      """True when playback is paused. False otherwise."""
188      playing = _MediaAttributePlaybackState(1)
189      """True when playback is playing. False otherwise."""
190      rewinding = _MediaAttributePlaybackState(2)
191      """True when playback is rewinding. False otherwise."""
192      fast_forwarding = _MediaAttributePlaybackState(3)
193      """True when playback is fast-forwarding. False otherwise."""
194      playback_rate = _MediaAttributePlaybackInfo(1)
195      """Playback rate as a decimal of normal speed."""
196      elapsed_time = _MediaAttributePlaybackInfo(2)
197      """Time elapsed in the current track. Not updated as the track plays. Use (the amount of time
198         since read elapsed time) * `playback_rate` to estimate the current `elapsed_time`."""
199      volume = _MediaAttribute(0, 2)
200      """Current volume"""
201  
202      queue_index = _MediaAttribute(1, 0)
203      """Current track's index in the queue."""
204      queue_length = _MediaAttribute(1, 1)
205      """Count of tracks in the queue."""
206      shuffle_mode = _MediaAttribute(1, 2)
207      """Current shuffle mode as an integer. Off (0), One (1), and All (2)"""
208      repeat_mode = _MediaAttribute(1, 3)
209      """Current repeat mode as an integer. Off (0), One (1), and All (2)"""
210  
211      artist = _MediaAttribute(2, 0)
212      """Current track's artist name."""
213      album = _MediaAttribute(2, 1)
214      """Current track's album name."""
215      title = _MediaAttribute(2, 2)
216      """Current track's title."""
217      duration = _MediaAttribute(2, 3)
218      """Current track's duration as a string."""
219  
220      def __init__(self, **kwargs):
221          super().__init__(**kwargs)
222          self._buffer = None
223          self._cmd = None
224          self._register_buffer = None
225          self._attribute_cache = {}
226          self._supported_commands = []
227          self._command_buffer = None
228  
229      def _send_command(self, command_id):
230          if not self._command_buffer:
231              self._command_buffer = bytearray(13)
232          i = self._remote_command.readinto(  # pylint: disable=no-member
233              self._command_buffer
234          )
235          if i > 0:
236              self._supported_commands = list(self._command_buffer[:i])
237          if command_id not in self._supported_commands:
238              if not self._supported_commands:
239                  return
240              raise UnsupportedCommand()
241          if not self._cmd:
242              self._cmd = bytearray(1)
243          self._cmd[0] = command_id
244          self._remote_command.write(self._cmd)  # pylint: disable=no-member
245  
246      def play(self):
247          """Plays the current track. Does nothing if already playing."""
248          self._send_command(0)
249  
250      def pause(self):
251          """Pauses the current track. Does nothing if already paused."""
252          self._send_command(1)
253  
254      def toggle_play_pause(self):
255          """Plays the current track if it is paused. Otherwise it pauses the track."""
256          self._send_command(2)
257  
258      def next_track(self):
259          """Stops playing the current track and plays the next one."""
260          self._send_command(3)
261  
262      def previous_track(self):
263          """Stops playing the current track and plays the previous track."""
264          self._send_command(4)
265  
266      def volume_up(self):
267          """Increases the playback volume."""
268          self._send_command(5)
269  
270      def volume_down(self):
271          """Decreases the playback volume."""
272          self._send_command(6)
273  
274      def advance_repeat_mode(self):
275          """Advances the repeat mode. Modes are: Off, One and All"""
276          self._send_command(7)
277  
278      def advance_shuffle_mode(self):
279          """Advances the shuffle mode. Modes are: Off, One and All"""
280          self._send_command(8)
281  
282      def skip_forward(self):
283          """Skips forwards in the current track"""
284          self._send_command(9)
285  
286      def skip_backward(self):
287          """Skips backwards in the current track"""
288          self._send_command(10)
289  
290      def like_track(self):
291          """Likes the current track"""
292          self._send_command(11)
293  
294      def dislike_track(self):
295          """Dislikes the current track"""
296          self._send_command(12)
297  
298      def bookmark_track(self):
299          """Bookmarks the current track"""
300          self._send_command(13)