/ adafruit_ble / __init__.py
__init__.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2019 Dan Halbert for Adafruit Industries
  4  # Copyright (c) 2019 Scott Shawcroft for Adafruit Industries
  5  #
  6  # Permission is hereby granted, free of charge, to any person obtaining a copy
  7  # of this software and associated documentation files (the "Software"), to deal
  8  # in the Software without restriction, including without limitation the rights
  9  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 10  # copies of the Software, and to permit persons to whom the Software is
 11  # furnished to do so, subject to the following conditions:
 12  #
 13  # The above copyright notice and this permission notice shall be included in
 14  # all copies or substantial portions of the Software.
 15  #
 16  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 17  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 18  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 19  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 20  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 21  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 22  # THE SOFTWARE.
 23  """
 24  
 25  This module provides higher-level BLE (Bluetooth Low Energy) functionality,
 26  building on the native `_bleio` module.
 27  
 28  """
 29  # pylint: disable=wrong-import-position
 30  import sys
 31  
 32  if sys.implementation.name == "circuitpython" and sys.implementation.version[0] <= 4:
 33      raise ImportError(
 34          "This release is not compatible with CircuitPython 4.x; use library release 1.x.x"
 35      )
 36  # pylint: enable=wrong-import-position
 37  
 38  import _bleio
 39  
 40  from .services import Service
 41  from .advertising import Advertisement
 42  
 43  __version__ = "0.0.0-auto.0"
 44  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git"
 45  
 46  
 47  class BLEConnection:
 48      """
 49      Represents a connection to a peer BLE device.
 50      It acts as a map from a `Service` type to a `Service` instance for the connection.
 51  
 52      :param bleio_connection _bleio.Connection: the native `_bleio.Connection` object to wrap
 53  
 54      """
 55  
 56      def __init__(self, bleio_connection):
 57          self._bleio_connection = bleio_connection
 58          # _bleio.Service objects representing services found during discovery.
 59          self._discovered_bleio_services = {}
 60          # Service objects that wrap remote services.
 61          self._constructed_services = {}
 62  
 63      def _discover_remote(self, uuid):
 64          remote_service = None
 65          if uuid in self._discovered_bleio_services:
 66              remote_service = self._discovered_bleio_services[uuid]
 67          else:
 68              results = self._bleio_connection.discover_remote_services(
 69                  (uuid.bleio_uuid,)
 70              )
 71              if results:
 72                  remote_service = results[0]
 73                  self._discovered_bleio_services[uuid] = remote_service
 74          return remote_service
 75  
 76      def __contains__(self, key):
 77          """
 78          Allows easy testing for a particular Service class or a particular UUID
 79          associated with this connection.
 80  
 81          Example::
 82  
 83              if UARTService in connection:
 84                  # do something
 85  
 86              if StandardUUID(0x1234) in connection:
 87                  # do something
 88          """
 89          uuid = key
 90          if hasattr(key, "uuid"):
 91              uuid = key.uuid
 92          return self._discover_remote(uuid) is not None
 93  
 94      def __getitem__(self, key):
 95          """Return the Service for the given Service class or uuid, if any."""
 96          uuid = key
 97          maybe_service = False
 98          if hasattr(key, "uuid"):
 99              uuid = key.uuid
100              maybe_service = True
101  
102          if uuid in self._constructed_services:
103              return self._constructed_services[uuid]
104  
105          remote_service = self._discover_remote(uuid)
106          if remote_service:
107              constructed_service = None
108              if maybe_service:
109                  constructed_service = key(service=remote_service)
110                  self._constructed_services[uuid] = constructed_service
111              return constructed_service
112  
113          raise KeyError("{!r} object has no service {}".format(self, key))
114  
115      @property
116      def connected(self):
117          """True if the connection to the peer is still active."""
118          return self._bleio_connection.connected
119  
120      @property
121      def paired(self):
122          """True if the paired to the peer."""
123          return self._bleio_connection.paired
124  
125      @property
126      def connection_interval(self):
127          """Time between transmissions in milliseconds. Will be multiple of 1.25ms. Lower numbers
128             increase speed and decrease latency but increase power consumption.
129  
130             When setting connection_interval, the peer may reject the new interval and
131             `connection_interval` will then remain the same.
132  
133             Apple has additional guidelines that dictate should be a multiple of 15ms except if HID
134             is available. When HID is available Apple devices may accept 11.25ms intervals."""
135          return self._bleio_connection.connection_interval
136  
137      @connection_interval.setter
138      def connection_interval(self, value):
139          self._bleio_connection.connection_interval = value
140  
141      def pair(self, *, bond=True):
142          """Pair to the peer to increase security of the connection."""
143          return self._bleio_connection.pair(bond=bond)
144  
145      def disconnect(self):
146          """Disconnect from peer."""
147          self._bleio_connection.disconnect()
148  
149  
150  class BLERadio:
151      """
152      BLERadio provides the interfaces for BLE advertising,
153      scanning for advertisements, and connecting to peers. There may be
154      multiple connections active at once.
155  
156      It uses this library's `Advertisement` classes and the `BLEConnection` class."""
157  
158      def __init__(self, adapter=None):
159          if not adapter:
160              adapter = _bleio.adapter
161          self._adapter = adapter
162          self._current_advertisement = None
163          self._connection_cache = {}
164  
165      def start_advertising(
166          self, advertisement, scan_response=None, interval=0.1, timeout=None
167      ):
168          """
169          Starts advertising the given advertisement.
170  
171          :param buf scan_response: scan response data packet bytes.
172              If ``None``, a default scan response will be generated that includes
173              `BLERadio.name` and `BLERadio.tx_power`.
174          :param float interval:  advertising interval, in seconds
175          :param int timeout:  advertising timeout in seconds.
176              If None, no timeout.
177  
178          ``timeout`` is not available in CircuitPython 5.x and must be `None`.
179          """
180          advertisement_bytes = bytes(advertisement)
181          scan_response_bytes = b""
182          if not scan_response and len(advertisement_bytes) <= 31:
183              scan_response = Advertisement()
184              scan_response.complete_name = self.name
185              scan_response.tx_power = self.tx_power
186          if scan_response:
187              scan_response_bytes = bytes(scan_response)
188  
189          # Remove after 5.x is no longer supported.
190          if (
191              sys.implementation.name == "circuitpython"
192              and sys.implementation.version[0] <= 5
193          ):
194              if timeout is not None:
195                  raise NotImplementedError("timeout not available for CircuitPython 5.x")
196              self._adapter.start_advertising(
197                  advertisement_bytes,
198                  scan_response=scan_response_bytes,
199                  connectable=advertisement.connectable,
200                  interval=interval,
201              )
202          else:
203              self._adapter.start_advertising(
204                  advertisement_bytes,
205                  scan_response=scan_response_bytes,
206                  connectable=advertisement.connectable,
207                  interval=interval,
208                  timeout=0 if timeout is None else timeout,
209              )
210  
211      def stop_advertising(self):
212          """Stops advertising."""
213          self._adapter.stop_advertising()
214  
215      def start_scan(
216          self,
217          *advertisement_types,
218          buffer_size=512,
219          extended=False,
220          timeout=None,
221          interval=0.1,
222          window=0.1,
223          minimum_rssi=-80,
224          active=True
225      ):
226          """
227          Starts scanning. Returns an iterator of advertisement objects of the types given in
228          advertisement_types. The iterator will block until an advertisement is heard or the scan
229          times out.
230  
231          If any ``advertisement_types`` are given, only Advertisements of those types are produced
232          by the returned iterator. If none are given then `Advertisement` objects will be
233          returned.
234  
235          Advertisements and scan responses are filtered and returned separately.
236  
237          :param int buffer_size: the maximum number of advertising bytes to buffer.
238          :param bool extended: When True, support extended advertising packets.
239              Increasing buffer_size is recommended when this is set.
240          :param float timeout: the scan timeout in seconds.
241              If None, will scan until `stop_scan` is called.
242          :param float interval: the interval (in seconds) between the start
243               of two consecutive scan windows
244               Must be in the range 0.0025 - 40.959375 seconds.
245          :param float window: the duration (in seconds) to scan a single BLE channel.
246               window must be <= interval.
247          :param int minimum_rssi: the minimum rssi of entries to return.
248          :param bool active: request and retrieve scan responses for scannable advertisements.
249          :return: If any ``advertisement_types`` are given,
250             only Advertisements of those types are produced by the returned iterator.
251             If none are given then `Advertisement` objects will be returned.
252          :rtype: iterable
253          """
254          if not advertisement_types:
255              advertisement_types = (Advertisement,)
256  
257          all_prefix_bytes = tuple(adv.get_prefix_bytes() for adv in advertisement_types)
258  
259          # If one of the advertisement_types has no prefix restrictions, then
260          # no prefixes should be specified at all, so we match everything.
261          prefixes = b"" if b"" in all_prefix_bytes else b"".join(all_prefix_bytes)
262  
263          for entry in self._adapter.start_scan(
264              prefixes=prefixes,
265              buffer_size=buffer_size,
266              extended=extended,
267              timeout=timeout,
268              interval=interval,
269              window=window,
270              minimum_rssi=minimum_rssi,
271              active=active,
272          ):
273              adv_type = Advertisement
274              for possible_type in advertisement_types:
275                  if possible_type.matches(entry) and issubclass(possible_type, adv_type):
276                      adv_type = possible_type
277              # Double check the adv_type is requested. We may return Advertisement accidentally
278              # otherwise.
279              if adv_type not in advertisement_types:
280                  continue
281              advertisement = adv_type.from_entry(entry)
282              if advertisement:
283                  yield advertisement
284  
285      def stop_scan(self):
286          """Stops any active scan.
287  
288             The scan results iterator will return any buffered results and then raise StopIteration
289             once empty."""
290          self._adapter.stop_scan()
291  
292      def connect(self, advertisement, *, timeout=4.0):
293          """
294          Initiates a `BLEConnection` to the peer that advertised the given advertisement.
295  
296          :param advertisement Advertisement: An `Advertisement` or a subclass of `Advertisement`
297          :param timeout float: how long to wait for a connection
298          :return: the connection to the peer
299          :rtype: BLEConnection
300          """
301          connection = self._adapter.connect(advertisement.address, timeout=timeout)
302          self._connection_cache[connection] = BLEConnection(connection)
303          return self._connection_cache[connection]
304  
305      @property
306      def connected(self):
307          """True if any peers are connected."""
308          return self._adapter.connected
309  
310      @property
311      def connections(self):
312          """A tuple of active `BLEConnection` objects."""
313          connections = self._adapter.connections
314          wrapped_connections = [None] * len(connections)
315          for i, connection in enumerate(connections):
316              if connection not in self._connection_cache:
317                  self._connection_cache[connection] = BLEConnection(connection)
318              wrapped_connections[i] = self._connection_cache[connection]
319  
320          return tuple(wrapped_connections)
321  
322      @property
323      def name(self):
324          """The name for this device. Used in advertisements and
325          as the Device Name in the Generic Access Service, available to a connected peer.
326          """
327          return self._adapter.name
328  
329      @name.setter
330      def name(self, value):
331          self._adapter.name = value
332  
333      @property
334      def tx_power(self):
335          """Transmit power, in dBm."""
336          return 0
337  
338      @tx_power.setter
339      def tx_power(self, value):
340          raise NotImplementedError("setting tx_power not yet implemented")
341  
342      @property
343      def address_bytes(self):
344          """The device address, as a ``bytes()`` object of length 6."""
345          return self._adapter.address.address_bytes
346  
347      @property
348      def advertising(self):
349          """The advertising state"""
350          return self._adapter.advertising