/ adafruit_ble_midi.py
adafruit_ble_midi.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2020 Scott Shawcroft for Adafruit Industries LLC 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `adafruit_ble_midi` 24 ================================================================================ 25 26 BLE MIDI service for CircuitPython 27 28 """ 29 30 import time 31 32 import _bleio 33 34 from adafruit_ble.attributes import Attribute 35 from adafruit_ble.characteristics import Characteristic, ComplexCharacteristic 36 from adafruit_ble.uuid import VendorUUID 37 from adafruit_ble.services import Service 38 39 __version__ = "0.0.0-auto.0" 40 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE_MIDI.git" 41 42 43 class _MidiCharacteristic(ComplexCharacteristic): 44 """Endpoint for sending commands to a media player. The value read will list all available 45 46 commands.""" 47 48 uuid = VendorUUID("7772E5DB-3868-4112-A1A9-F2669D106BF3") 49 50 def __init__(self): 51 super().__init__( 52 properties=Characteristic.WRITE_NO_RESPONSE 53 | Characteristic.READ 54 | Characteristic.NOTIFY, 55 read_perm=Attribute.ENCRYPT_NO_MITM, 56 write_perm=Attribute.ENCRYPT_NO_MITM, 57 max_length=512, 58 fixed_length=False, 59 ) 60 61 def bind(self, service): 62 """Binds the characteristic to the given Service.""" 63 bound_characteristic = super().bind(service) 64 return _bleio.PacketBuffer(bound_characteristic, buffer_size=4) 65 66 67 class MIDIService(Service): 68 """BLE MIDI service. It acts just like a USB MIDI PortIn and PortOut and can be used as a drop 69 in replacement. 70 71 BLE MIDI's protocol includes timestamps for MIDI messages. This class automatically adds them 72 to MIDI data written out and strips them from MIDI data read in.""" 73 74 uuid = VendorUUID("03B80E5A-EDE8-4B33-A751-6CE34EC4C700") 75 _raw = _MidiCharacteristic() 76 # _raw gets shadowed for each MIDIService instance by a PacketBuffer. PyLint doesn't know this 77 # so it complains about missing members. 78 # pylint: disable=no-member 79 80 def __init__(self, **kwargs): 81 super().__init__(**kwargs) 82 # Defer creating _in_buffer until we're definitely connected. 83 self._in_buffer = None 84 self._out_buffer = None 85 shared_buffer = memoryview(bytearray(4)) 86 self._buffers = [ 87 None, 88 shared_buffer[:1], 89 shared_buffer[:2], 90 shared_buffer[:3], 91 shared_buffer[:4], 92 ] 93 self._header = bytearray(1) 94 self._in_sysex = False 95 self._message_target_length = None 96 self._message_length = 0 97 self._pending_realtime = None 98 self._in_length = 0 99 self._in_index = 1 100 self._last_data = True 101 102 def readinto(self, buf, length): 103 """Reads up to ``length`` bytes into ``buf`` starting at index 0. 104 105 Returns the number of bytes written into ``buf``.""" 106 if self._in_buffer is None: 107 self._in_buffer = bytearray(self._raw.packet_size) 108 i = 0 109 while i < length: 110 if self._in_index < self._in_length: 111 byte = self._in_buffer[self._in_index] 112 if self._last_data and byte & 0x80 != 0: 113 # Maybe manage timing here. Not done now because we're likely slower than we 114 # need to be already. 115 # low_ms = byte & 0x7f 116 # print("low", low_ms) 117 self._in_index += 1 118 self._last_data = False 119 continue 120 self._in_index += 1 121 self._last_data = True 122 buf[i] = byte 123 i += 1 124 else: 125 self._in_length = self._raw.readinto(self._in_buffer) 126 if self._in_length == 0: 127 break 128 # high_ms = self._in_buffer[0] & 0x3f 129 # print("high", high_ms) 130 self._in_index = 1 131 self._last_data = True 132 133 return i 134 135 def read(self, length): 136 """Reads up to ``length`` bytes and returns them.""" 137 result = bytearray(length) 138 i = self.readinto(result, length) 139 return result[:i] 140 141 def write(self, buf, length): 142 """Writes ``length`` bytes out.""" 143 # pylint: disable=too-many-branches 144 timestamp_ms = time.monotonic_ns() // 1000000 145 self._header[0] = (timestamp_ms >> 7 & 0x3F) | 0x80 146 i = 0 147 while i < length: 148 data = buf[i] 149 command = data & 0x80 != 0 150 if self._in_sysex: 151 if command: # End of sysex or real time 152 b = self._buffers[2] 153 b[0] = 0x80 | (timestamp_ms & 0x7F) 154 b[1] = 0xF7 155 self._raw.write(b, header=self._header) 156 self._in_sysex = data == 0xF7 157 else: 158 b = self._buffers[1] 159 b[0] = data 160 self._raw.write(b, header=self._header) 161 elif command: 162 self._in_sysex = data == 0xF0 163 b = self._buffers[2] 164 b[0] = 0x80 | (timestamp_ms & 0x7F) 165 b[1] = data 166 if ( 167 0xF6 <= data <= 0xFF or self._in_sysex 168 ): # Real time, command only or start sysex 169 if self._message_target_length: 170 self._pending_realtime = b 171 else: 172 self._raw.write(b, header=self._header) 173 else: 174 if ( 175 0x80 <= data <= 0xBF or 0xE0 <= data <= 0xEF or data == 0xF2 176 ): # Two following bytes 177 self._message_target_length = 4 178 else: 179 self._message_target_length = 3 180 b = self._buffers[self._message_target_length] 181 # All of the buffers share memory so the timestamp and data have already been 182 # set. 183 self._message_length = 2 184 self._out_buffer = b 185 else: 186 self._out_buffer[self._message_length] = data 187 self._message_length += 1 188 if self._message_target_length == self._message_length: 189 self._raw.write(self._out_buffer, header=self._header) 190 if self._pending_realtime: 191 self._raw.write(self._pending_realtime, header=self._header) 192 self._pending_realtime = None 193 self._message_target_length = None 194 i += 1