/ adafruit_ble / __init__.py
__init__.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2019 Dan Halbert for Adafruit Industries 4 # Copyright (c) 2019 Scott Shawcroft for Adafruit Industries 5 # 6 # Permission is hereby granted, free of charge, to any person obtaining a copy 7 # of this software and associated documentation files (the "Software"), to deal 8 # in the Software without restriction, including without limitation the rights 9 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 # copies of the Software, and to permit persons to whom the Software is 11 # furnished to do so, subject to the following conditions: 12 # 13 # The above copyright notice and this permission notice shall be included in 14 # all copies or substantial portions of the Software. 15 # 16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 22 # THE SOFTWARE. 23 """ 24 25 This module provides higher-level BLE (Bluetooth Low Energy) functionality, 26 building on the native `_bleio` module. 27 28 """ 29 # pylint: disable=wrong-import-position 30 import sys 31 32 if sys.implementation.name == "circuitpython" and sys.implementation.version[0] <= 4: 33 raise ImportError( 34 "This release is not compatible with CircuitPython 4.x; use library release 1.x.x" 35 ) 36 # pylint: enable=wrong-import-position 37 38 import _bleio 39 40 from .services import Service 41 from .advertising import Advertisement 42 43 __version__ = "0.0.0-auto.0" 44 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" 45 46 47 class BLEConnection: 48 """ 49 Represents a connection to a peer BLE device. 50 It acts as a map from a `Service` type to a `Service` instance for the connection. 51 52 :param bleio_connection _bleio.Connection: the native `_bleio.Connection` object to wrap 53 54 """ 55 56 def __init__(self, bleio_connection): 57 self._bleio_connection = bleio_connection 58 # _bleio.Service objects representing services found during discovery. 59 self._discovered_bleio_services = {} 60 # Service objects that wrap remote services. 61 self._constructed_services = {} 62 63 def _discover_remote(self, uuid): 64 remote_service = None 65 if uuid in self._discovered_bleio_services: 66 remote_service = self._discovered_bleio_services[uuid] 67 else: 68 results = self._bleio_connection.discover_remote_services( 69 (uuid.bleio_uuid,) 70 ) 71 if results: 72 remote_service = results[0] 73 self._discovered_bleio_services[uuid] = remote_service 74 return remote_service 75 76 def __contains__(self, key): 77 """ 78 Allows easy testing for a particular Service class or a particular UUID 79 associated with this connection. 80 81 Example:: 82 83 if UARTService in connection: 84 # do something 85 86 if StandardUUID(0x1234) in connection: 87 # do something 88 """ 89 uuid = key 90 if hasattr(key, "uuid"): 91 uuid = key.uuid 92 return self._discover_remote(uuid) is not None 93 94 def __getitem__(self, key): 95 """Return the Service for the given Service class or uuid, if any.""" 96 uuid = key 97 maybe_service = False 98 if hasattr(key, "uuid"): 99 uuid = key.uuid 100 maybe_service = True 101 102 if uuid in self._constructed_services: 103 return self._constructed_services[uuid] 104 105 remote_service = self._discover_remote(uuid) 106 if remote_service: 107 constructed_service = None 108 if maybe_service: 109 constructed_service = key(service=remote_service) 110 self._constructed_services[uuid] = constructed_service 111 return constructed_service 112 113 raise KeyError("{!r} object has no service {}".format(self, key)) 114 115 @property 116 def connected(self): 117 """True if the connection to the peer is still active.""" 118 return self._bleio_connection.connected 119 120 @property 121 def paired(self): 122 """True if the paired to the peer.""" 123 return self._bleio_connection.paired 124 125 @property 126 def connection_interval(self): 127 """Time between transmissions in milliseconds. Will be multiple of 1.25ms. Lower numbers 128 increase speed and decrease latency but increase power consumption. 129 130 When setting connection_interval, the peer may reject the new interval and 131 `connection_interval` will then remain the same. 132 133 Apple has additional guidelines that dictate should be a multiple of 15ms except if HID 134 is available. When HID is available Apple devices may accept 11.25ms intervals.""" 135 return self._bleio_connection.connection_interval 136 137 @connection_interval.setter 138 def connection_interval(self, value): 139 self._bleio_connection.connection_interval = value 140 141 def pair(self, *, bond=True): 142 """Pair to the peer to increase security of the connection.""" 143 return self._bleio_connection.pair(bond=bond) 144 145 def disconnect(self): 146 """Disconnect from peer.""" 147 self._bleio_connection.disconnect() 148 149 150 class BLERadio: 151 """ 152 BLERadio provides the interfaces for BLE advertising, 153 scanning for advertisements, and connecting to peers. There may be 154 multiple connections active at once. 155 156 It uses this library's `Advertisement` classes and the `BLEConnection` class.""" 157 158 def __init__(self, adapter=None): 159 if not adapter: 160 adapter = _bleio.adapter 161 self._adapter = adapter 162 self._current_advertisement = None 163 self._connection_cache = {} 164 165 def start_advertising( 166 self, advertisement, scan_response=None, interval=0.1, timeout=None 167 ): 168 """ 169 Starts advertising the given advertisement. 170 171 :param buf scan_response: scan response data packet bytes. 172 If ``None``, a default scan response will be generated that includes 173 `BLERadio.name` and `BLERadio.tx_power`. 174 :param float interval: advertising interval, in seconds 175 :param int timeout: advertising timeout in seconds. 176 If None, no timeout. 177 178 ``timeout`` is not available in CircuitPython 5.x and must be `None`. 179 """ 180 advertisement_bytes = bytes(advertisement) 181 scan_response_bytes = b"" 182 if not scan_response and len(advertisement_bytes) <= 31: 183 scan_response = Advertisement() 184 scan_response.complete_name = self.name 185 scan_response.tx_power = self.tx_power 186 if scan_response: 187 scan_response_bytes = bytes(scan_response) 188 189 # Remove after 5.x is no longer supported. 190 if ( 191 sys.implementation.name == "circuitpython" 192 and sys.implementation.version[0] <= 5 193 ): 194 if timeout is not None: 195 raise NotImplementedError("timeout not available for CircuitPython 5.x") 196 self._adapter.start_advertising( 197 advertisement_bytes, 198 scan_response=scan_response_bytes, 199 connectable=advertisement.connectable, 200 interval=interval, 201 ) 202 else: 203 self._adapter.start_advertising( 204 advertisement_bytes, 205 scan_response=scan_response_bytes, 206 connectable=advertisement.connectable, 207 interval=interval, 208 timeout=0 if timeout is None else timeout, 209 ) 210 211 def stop_advertising(self): 212 """Stops advertising.""" 213 self._adapter.stop_advertising() 214 215 def start_scan( 216 self, 217 *advertisement_types, 218 buffer_size=512, 219 extended=False, 220 timeout=None, 221 interval=0.1, 222 window=0.1, 223 minimum_rssi=-80, 224 active=True 225 ): 226 """ 227 Starts scanning. Returns an iterator of advertisement objects of the types given in 228 advertisement_types. The iterator will block until an advertisement is heard or the scan 229 times out. 230 231 If any ``advertisement_types`` are given, only Advertisements of those types are produced 232 by the returned iterator. If none are given then `Advertisement` objects will be 233 returned. 234 235 Advertisements and scan responses are filtered and returned separately. 236 237 :param int buffer_size: the maximum number of advertising bytes to buffer. 238 :param bool extended: When True, support extended advertising packets. 239 Increasing buffer_size is recommended when this is set. 240 :param float timeout: the scan timeout in seconds. 241 If None, will scan until `stop_scan` is called. 242 :param float interval: the interval (in seconds) between the start 243 of two consecutive scan windows 244 Must be in the range 0.0025 - 40.959375 seconds. 245 :param float window: the duration (in seconds) to scan a single BLE channel. 246 window must be <= interval. 247 :param int minimum_rssi: the minimum rssi of entries to return. 248 :param bool active: request and retrieve scan responses for scannable advertisements. 249 :return: If any ``advertisement_types`` are given, 250 only Advertisements of those types are produced by the returned iterator. 251 If none are given then `Advertisement` objects will be returned. 252 :rtype: iterable 253 """ 254 if not advertisement_types: 255 advertisement_types = (Advertisement,) 256 257 all_prefix_bytes = tuple(adv.get_prefix_bytes() for adv in advertisement_types) 258 259 # If one of the advertisement_types has no prefix restrictions, then 260 # no prefixes should be specified at all, so we match everything. 261 prefixes = b"" if b"" in all_prefix_bytes else b"".join(all_prefix_bytes) 262 263 for entry in self._adapter.start_scan( 264 prefixes=prefixes, 265 buffer_size=buffer_size, 266 extended=extended, 267 timeout=timeout, 268 interval=interval, 269 window=window, 270 minimum_rssi=minimum_rssi, 271 active=active, 272 ): 273 adv_type = Advertisement 274 for possible_type in advertisement_types: 275 if possible_type.matches(entry) and issubclass(possible_type, adv_type): 276 adv_type = possible_type 277 # Double check the adv_type is requested. We may return Advertisement accidentally 278 # otherwise. 279 if adv_type not in advertisement_types: 280 continue 281 advertisement = adv_type.from_entry(entry) 282 if advertisement: 283 yield advertisement 284 285 def stop_scan(self): 286 """Stops any active scan. 287 288 The scan results iterator will return any buffered results and then raise StopIteration 289 once empty.""" 290 self._adapter.stop_scan() 291 292 def connect(self, advertisement, *, timeout=4.0): 293 """ 294 Initiates a `BLEConnection` to the peer that advertised the given advertisement. 295 296 :param advertisement Advertisement: An `Advertisement` or a subclass of `Advertisement` 297 :param timeout float: how long to wait for a connection 298 :return: the connection to the peer 299 :rtype: BLEConnection 300 """ 301 connection = self._adapter.connect(advertisement.address, timeout=timeout) 302 self._connection_cache[connection] = BLEConnection(connection) 303 return self._connection_cache[connection] 304 305 @property 306 def connected(self): 307 """True if any peers are connected.""" 308 return self._adapter.connected 309 310 @property 311 def connections(self): 312 """A tuple of active `BLEConnection` objects.""" 313 connections = self._adapter.connections 314 wrapped_connections = [None] * len(connections) 315 for i, connection in enumerate(connections): 316 if connection not in self._connection_cache: 317 self._connection_cache[connection] = BLEConnection(connection) 318 wrapped_connections[i] = self._connection_cache[connection] 319 320 return tuple(wrapped_connections) 321 322 @property 323 def name(self): 324 """The name for this device. Used in advertisements and 325 as the Device Name in the Generic Access Service, available to a connected peer. 326 """ 327 return self._adapter.name 328 329 @name.setter 330 def name(self, value): 331 self._adapter.name = value 332 333 @property 334 def tx_power(self): 335 """Transmit power, in dBm.""" 336 return 0 337 338 @tx_power.setter 339 def tx_power(self, value): 340 raise NotImplementedError("setting tx_power not yet implemented") 341 342 @property 343 def address_bytes(self): 344 """The device address, as a ``bytes()`` object of length 6.""" 345 return self._adapter.address.address_bytes 346 347 @property 348 def advertising(self): 349 """The advertising state""" 350 return self._adapter.advertising