/ adafruit_ble_midi.py
adafruit_ble_midi.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2020 Scott Shawcroft for Adafruit Industries LLC
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_ble_midi`
 24  ================================================================================
 25  
 26  BLE MIDI service for CircuitPython
 27  
 28  """
 29  
 30  import time
 31  
 32  import _bleio
 33  
 34  from adafruit_ble.attributes import Attribute
 35  from adafruit_ble.characteristics import Characteristic, ComplexCharacteristic
 36  from adafruit_ble.uuid import VendorUUID
 37  from adafruit_ble.services import Service
 38  
 39  __version__ = "0.0.0-auto.0"
 40  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE_MIDI.git"
 41  
 42  
 43  class _MidiCharacteristic(ComplexCharacteristic):
 44      """Endpoint for sending commands to a media player. The value read will list all available
 45  
 46         commands."""
 47  
 48      uuid = VendorUUID("7772E5DB-3868-4112-A1A9-F2669D106BF3")
 49  
 50      def __init__(self):
 51          super().__init__(
 52              properties=Characteristic.WRITE_NO_RESPONSE
 53              | Characteristic.READ
 54              | Characteristic.NOTIFY,
 55              read_perm=Attribute.ENCRYPT_NO_MITM,
 56              write_perm=Attribute.ENCRYPT_NO_MITM,
 57              max_length=512,
 58              fixed_length=False,
 59          )
 60  
 61      def bind(self, service):
 62          """Binds the characteristic to the given Service."""
 63          bound_characteristic = super().bind(service)
 64          return _bleio.PacketBuffer(bound_characteristic, buffer_size=4)
 65  
 66  
 67  class MIDIService(Service):
 68      """BLE MIDI service. It acts just like a USB MIDI PortIn and PortOut and can be used as a drop
 69         in replacement.
 70  
 71         BLE MIDI's protocol includes timestamps for MIDI messages. This class automatically adds them
 72         to MIDI data written out and strips them from MIDI data read in."""
 73  
 74      uuid = VendorUUID("03B80E5A-EDE8-4B33-A751-6CE34EC4C700")
 75      _raw = _MidiCharacteristic()
 76      # _raw gets shadowed for each MIDIService instance by a PacketBuffer. PyLint doesn't know this
 77      # so it complains about missing members.
 78      # pylint: disable=no-member
 79  
 80      def __init__(self, **kwargs):
 81          super().__init__(**kwargs)
 82          # Defer creating _in_buffer until we're definitely connected.
 83          self._in_buffer = None
 84          self._out_buffer = None
 85          shared_buffer = memoryview(bytearray(4))
 86          self._buffers = [
 87              None,
 88              shared_buffer[:1],
 89              shared_buffer[:2],
 90              shared_buffer[:3],
 91              shared_buffer[:4],
 92          ]
 93          self._header = bytearray(1)
 94          self._in_sysex = False
 95          self._message_target_length = None
 96          self._message_length = 0
 97          self._pending_realtime = None
 98          self._in_length = 0
 99          self._in_index = 1
100          self._last_data = True
101  
102      def readinto(self, buf, length):
103          """Reads up to ``length`` bytes into ``buf`` starting at index 0.
104  
105             Returns the number of bytes written into ``buf``."""
106          if self._in_buffer is None:
107              self._in_buffer = bytearray(self._raw.packet_size)
108          i = 0
109          while i < length:
110              if self._in_index < self._in_length:
111                  byte = self._in_buffer[self._in_index]
112                  if self._last_data and byte & 0x80 != 0:
113                      # Maybe manage timing here. Not done now because we're likely slower than we
114                      # need to be already.
115                      # low_ms = byte & 0x7f
116                      # print("low", low_ms)
117                      self._in_index += 1
118                      self._last_data = False
119                      continue
120                  self._in_index += 1
121                  self._last_data = True
122                  buf[i] = byte
123                  i += 1
124              else:
125                  self._in_length = self._raw.readinto(self._in_buffer)
126                  if self._in_length == 0:
127                      break
128                  # high_ms = self._in_buffer[0] & 0x3f
129                  # print("high", high_ms)
130                  self._in_index = 1
131                  self._last_data = True
132  
133          return i
134  
135      def read(self, length):
136          """Reads up to ``length`` bytes and returns them."""
137          result = bytearray(length)
138          i = self.readinto(result, length)
139          return result[:i]
140  
141      def write(self, buf, length):
142          """Writes ``length`` bytes out."""
143          # pylint: disable=too-many-branches
144          timestamp_ms = time.monotonic_ns() // 1000000
145          self._header[0] = (timestamp_ms >> 7 & 0x3F) | 0x80
146          i = 0
147          while i < length:
148              data = buf[i]
149              command = data & 0x80 != 0
150              if self._in_sysex:
151                  if command:  # End of sysex or real time
152                      b = self._buffers[2]
153                      b[0] = 0x80 | (timestamp_ms & 0x7F)
154                      b[1] = 0xF7
155                      self._raw.write(b, header=self._header)
156                      self._in_sysex = data == 0xF7
157                  else:
158                      b = self._buffers[1]
159                      b[0] = data
160                      self._raw.write(b, header=self._header)
161              elif command:
162                  self._in_sysex = data == 0xF0
163                  b = self._buffers[2]
164                  b[0] = 0x80 | (timestamp_ms & 0x7F)
165                  b[1] = data
166                  if (
167                      0xF6 <= data <= 0xFF or self._in_sysex
168                  ):  # Real time, command only or start sysex
169                      if self._message_target_length:
170                          self._pending_realtime = b
171                      else:
172                          self._raw.write(b, header=self._header)
173                  else:
174                      if (
175                          0x80 <= data <= 0xBF or 0xE0 <= data <= 0xEF or data == 0xF2
176                      ):  # Two following bytes
177                          self._message_target_length = 4
178                      else:
179                          self._message_target_length = 3
180                      b = self._buffers[self._message_target_length]
181                      # All of the buffers share memory so the timestamp and data have already been
182                      # set.
183                      self._message_length = 2
184                      self._out_buffer = b
185              else:
186                  self._out_buffer[self._message_length] = data
187                  self._message_length += 1
188                  if self._message_target_length == self._message_length:
189                      self._raw.write(self._out_buffer, header=self._header)
190                      if self._pending_realtime:
191                          self._raw.write(self._pending_realtime, header=self._header)
192                          self._pending_realtime = None
193                      self._message_target_length = None
194              i += 1