/ adafruit_rplidar.py
adafruit_rplidar.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2019 Dave Astels for Adafruit Industries 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 23 """ 24 `adafruit_rplidar` 25 ==================================================== 26 27 Provide an interface to the Slamtech RPLidar that works in plain Python3 28 as well as CircuitPython/Blinka. 29 30 * Author(s): Dave Astels 31 * Based on https://github.com/SkoltechRobotics/rplidar by Artyom Pavlov 32 33 Implementation Notes 34 -------------------- 35 36 **Hardware:** 37 38 39 **Software and Dependencies:** 40 41 * Adafruit CircuitPython firmware for the supported boards: 42 https://github.com/adafruit/circuitpython/releases 43 44 Version 0.0.1 does NOT support CircutPython. Future versions will. 45 """ 46 47 import sys 48 import time 49 import struct 50 51 # pylint:disable=invalid-name,undefined-variable,global-variable-not-assigned 52 # pylint:disable=too-many-arguments 53 54 __version__ = "0.0.1-auto.0" 55 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RPLIDAR.git" 56 57 SYNC_BYTE = b"\xA5" 58 SYNC_BYTE2 = b"\x5A" 59 60 GET_INFO_BYTE = b"\x50" 61 GET_HEALTH_BYTE = b"\x52" 62 63 STOP_BYTE = b"\x25" 64 RESET_BYTE = b"\x40" 65 66 SCAN_BYTE = b"\x20" 67 FORCE_SCAN_BYTE = b"\x21" 68 69 DESCRIPTOR_LEN = 7 70 INFO_LEN = 20 71 HEALTH_LEN = 3 72 73 INFO_TYPE = 4 74 HEALTH_TYPE = 6 75 SCAN_TYPE = 129 76 77 # Constants & Command to start A2 motor 78 MAX_MOTOR_PWM = 1023 79 DEFAULT_MOTOR_PWM = 660 80 SET_PWM_BYTE = b"\xF0" 81 82 _HEALTH_STATUSES = { 83 0: "Good", 84 1: "Warning", 85 2: "Error", 86 } 87 88 89 class RPLidarException(Exception): 90 """Basic exception class for RPLidar""" 91 92 93 def _process_scan(raw): 94 """Processes input raw data and returns measurment data""" 95 new_scan = bool(raw[0] & 0b1) 96 inversed_new_scan = bool((raw[0] >> 1) & 0b1) 97 quality = raw[0] >> 2 98 if new_scan == inversed_new_scan: 99 raise RPLidarException("New scan flags mismatch") 100 check_bit = raw[1] & 0b1 101 if check_bit != 1: 102 raise RPLidarException("Check bit not equal to 1") 103 angle = ((raw[1] >> 1) + (raw[2] << 7)) / 64.0 104 distance = (raw[3] + (raw[4] << 8)) / 4.0 105 return new_scan, quality, angle, distance 106 107 108 class RPLidar: 109 """Class for communicating with RPLidar rangefinder scanners""" 110 111 motor_pin = None #: DigitalInOut instance controlling the motor 112 _serial_port = None #: Serial port (or UART) instance 113 port = None #: Serial port name, e.g. /dev/ttyUSB0 114 timeout = 1 #: Serial port timeout 115 motor = False #: Is motor running? 116 baudrate = 115200 #: Baudrate for serial port 117 118 def __init__(self, motor_pin, port, baudrate=115200, timeout=1, logging=False): 119 """Initilize RPLidar object for communicating with the sensor. 120 121 Parameters 122 123 port : busio.UART or str 124 Serial port instance or name of the port to which the sensor is connected 125 baudrate : int, optional 126 Baudrate for serial connection (the default is 115200) 127 timeout : float, optional 128 Serial port connection timeout in seconds (the default is 1) 129 logging : whether to output logging information 130 """ 131 self.motor_pin = motor_pin 132 self.port = port 133 self.baudrate = baudrate 134 self.timeout = timeout 135 self.motor_running = False 136 self.logging = logging 137 138 self.is_CP = not isinstance(port, str) 139 140 if self.is_CP: 141 _serial_port = port 142 else: 143 global serial # pylint: disable=global-statement 144 import serial # pylint: disable=import-outside-toplevel 145 146 self.connect() 147 self.start_motor() 148 149 def log(self, level, msg): 150 """Output the level and a message if logging is enabled.""" 151 if self.logging: 152 sys.stdout.write("{0}: {1}\n".format(level, msg)) 153 154 def log_bytes(self, level, msg, ba): 155 """Log and output a byte array in a readable way.""" 156 bs = ["%02x" % b for b in ba] 157 self.log(level, msg + " ".join(bs)) 158 159 def connect(self): 160 """Connects to the serial port named by the port instance var. If it was 161 connected to another serial port disconnects from it first.""" 162 if not self.is_CP: 163 if self._serial_port is not None: 164 self.disconnect() 165 try: 166 self._serial_port = serial.Serial( 167 self.port, 168 self.baudrate, 169 parity=serial.PARITY_NONE, 170 stopbits=serial.STOPBITS_ONE, 171 timeout=self.timeout, 172 dsrdtr=True, 173 ) 174 except serial.SerialException as err: 175 raise RPLidarException( 176 "Failed to connect to the sensor " "due to: %s" % err 177 ) 178 179 def disconnect(self): 180 """Disconnects from the serial port""" 181 if self._serial_port is None: 182 return 183 self._serial_port.close() 184 185 def set_pwm(self, pwm): 186 """Set the motor PWM""" 187 assert 0 <= pwm <= MAX_MOTOR_PWM 188 payload = struct.pack("<H", pwm) 189 self._send_payload_cmd(SET_PWM_BYTE, payload) 190 191 def _control_motor(self, val): 192 """Manipular the motor""" 193 if self.is_CP: 194 self.motor_pin.value = val 195 else: 196 self._serial_port.dtr = not val 197 198 def start_motor(self): 199 """Starts sensor motor""" 200 self.log("info", "Starting motor") 201 # For A1 202 self._control_motor(True) 203 204 # For A2 205 self.set_pwm(DEFAULT_MOTOR_PWM) 206 self.motor_running = True 207 208 def stop_motor(self): 209 """Stops sensor motor""" 210 self.log("info", "Stopping motor") 211 # For A2 212 self.set_pwm(0) 213 time.sleep(0.001) 214 # For A1 215 self._control_motor(False) 216 self.motor_running = False 217 218 def _send_payload_cmd(self, cmd, payload): 219 """Sends `cmd` command with `payload` to the sensor""" 220 size = struct.pack("B", len(payload)) 221 req = SYNC_BYTE + cmd + size + payload 222 checksum = 0 223 for v in struct.unpack("B" * len(req), req): 224 checksum ^= v 225 req += struct.pack("B", checksum) 226 self._serial_port.write(req) 227 self.log_bytes("debug", "Command sent: ", req) 228 229 def _send_cmd(self, cmd): 230 """Sends `cmd` command to the sensor""" 231 req = SYNC_BYTE + cmd 232 self._serial_port.write(req) 233 self.log_bytes("debug", "Command sent: ", req) 234 235 def _read_descriptor(self): 236 """Reads descriptor packet""" 237 descriptor = self._serial_port.read(DESCRIPTOR_LEN) 238 self.log_bytes("debug", "Received descriptor:", descriptor) 239 if len(descriptor) != DESCRIPTOR_LEN: 240 raise RPLidarException("Descriptor length mismatch") 241 if not descriptor.startswith(SYNC_BYTE + SYNC_BYTE2): 242 raise RPLidarException("Incorrect descriptor starting bytes") 243 is_single = descriptor[-2] == 0 244 return descriptor[2], is_single, descriptor[-1] 245 246 def _read_response(self, dsize): 247 """Reads response packet with length of `dsize` bytes""" 248 self.log("debug", "Trying to read response: %d bytes" % dsize) 249 data = self._serial_port.read(dsize) 250 self.log_bytes("debug", "Received data:", data) 251 if len(data) != dsize: 252 raise RPLidarException("Wrong body size") 253 return data 254 255 @property 256 def info(self): 257 """Get device information 258 259 Returns 260 261 dict 262 Dictionary with the sensor information 263 """ 264 self._send_cmd(GET_INFO_BYTE) 265 dsize, is_single, dtype = self._read_descriptor() 266 if dsize != INFO_LEN: 267 raise RPLidarException("Wrong info reply length") 268 if not is_single: 269 raise RPLidarException("Not a single response mode") 270 if dtype != INFO_TYPE: 271 raise RPLidarException("Wrong response data type") 272 raw = self._read_response(dsize) 273 serialnumber_bytes = struct.unpack("BBBBBBBBBBBBBBBB", raw[4:]) 274 serialnumber = "".join(reversed(["%02x" % b for b in serialnumber_bytes])) 275 data = { 276 "model": raw[0], 277 "firmware": (raw[2], raw[1]), 278 "hardware": raw[3], 279 "serialnumber": serialnumber, 280 } 281 return data 282 283 @property 284 def health(self): 285 """Get device health state. When the core system detects some 286 potential risk that may cause hardware failure in the future, 287 the returned status value will be 'Warning'. But sensor can still work 288 as normal. When sensor is in the Protection Stop state, the returned 289 status value will be 'Error'. In case of warning or error statuses 290 non-zero error code will be returned. 291 292 Returns 293 294 status : str 295 'Good', 'Warning' or 'Error' statuses 296 error_code : int 297 The related error code that caused a warning/error. 298 """ 299 self._send_cmd(GET_HEALTH_BYTE) 300 dsize, is_single, dtype = self._read_descriptor() 301 if dsize != HEALTH_LEN: 302 raise RPLidarException("Wrong info reply length") 303 if not is_single: 304 raise RPLidarException("Not a single response mode") 305 if dtype != HEALTH_TYPE: 306 raise RPLidarException("Wrong response data type") 307 raw = self._read_response(dsize) 308 status = _HEALTH_STATUSES[raw[0]] 309 error_code = (raw[1] << 8) + raw[2] 310 return (status, error_code) 311 312 def clear_input(self): 313 """Clears input buffer by reading all available data""" 314 315 def stop(self): 316 """Stops scanning process, disables laser diode and the measurment 317 system, moves sensor to the idle state.""" 318 self.log("info", "Stoping scanning") 319 self._send_cmd(STOP_BYTE) 320 time.sleep(0.001) 321 self.clear_input() 322 323 def reset(self): 324 """Resets sensor core, reverting it to a similar state as it has 325 just been powered up.""" 326 self.log("info", "Reseting the sensor") 327 self._send_cmd(RESET_BYTE) 328 time.sleep(0.002) 329 330 def iter_measurments(self, max_buf_meas=500): 331 """Iterate over measurments. Note that consumer must be fast enough, 332 otherwise data will be accumulated inside buffer and consumer will get 333 data with increaing lag. 334 335 Parameters 336 337 max_buf_meas : int 338 Maximum number of measurments to be stored inside the buffer. Once 339 numbe exceeds this limit buffer will be emptied out. 340 341 Yields 342 343 new_scan : bool 344 True if measurment belongs to a new scan 345 quality : int 346 Reflected laser pulse strength 347 angle : float 348 The measurment heading angle in degree unit [0, 360) 349 distance : float 350 Measured object distance related to the sensor's rotation center. 351 In millimeter unit. Set to 0 when measurment is invalid. 352 """ 353 self.start_motor() 354 status, error_code = self.health 355 self.log("debug", "Health status: %s [%d]" % (status, error_code)) 356 if status == _HEALTH_STATUSES[2]: 357 self.log( 358 "warning", 359 "Trying to reset sensor due to the error. " 360 "Error code: %d" % (error_code), 361 ) 362 self.reset() 363 status, error_code = self.health 364 if status == _HEALTH_STATUSES[2]: 365 raise RPLidarException( 366 "RPLidar hardware failure. " "Error code: %d" % error_code 367 ) 368 elif status == _HEALTH_STATUSES[1]: 369 self.log( 370 "warning", 371 "Warning sensor status detected! " "Error code: %d" % (error_code), 372 ) 373 cmd = SCAN_BYTE 374 self._send_cmd(cmd) 375 dsize, is_single, dtype = self._read_descriptor() 376 if dsize != 5: 377 raise RPLidarException("Wrong info reply length") 378 if is_single: 379 raise RPLidarException("Not a multiple response mode") 380 if dtype != SCAN_TYPE: 381 raise RPLidarException("Wrong response data type") 382 while True: 383 raw = self._read_response(dsize) 384 self.log_bytes("debug", "Received scan response: ", raw) 385 if max_buf_meas: 386 data_in_buf = self._serial_port.in_waiting 387 if data_in_buf > max_buf_meas * dsize: 388 self.log( 389 "warning", 390 "Too many measurments in the input buffer: %d/%d. " 391 "Clearing buffer..." % (data_in_buf // dsize, max_buf_meas), 392 ) 393 self._serial_port.read(data_in_buf // dsize * dsize) 394 yield _process_scan(raw) 395 396 def iter_scans(self, max_buf_meas=500, min_len=5): 397 """Iterate over scans. Note that consumer must be fast enough, 398 otherwise data will be accumulated inside buffer and consumer will get 399 data with increasing lag. 400 401 Parameters 402 403 max_buf_meas : int 404 Maximum number of measurments to be stored inside the buffer. Once 405 numbe exceeds this limit buffer will be emptied out. 406 min_len : int 407 Minimum number of measurments in the scan for it to be yelded. 408 409 Yields 410 411 scan : list 412 List of the measurments. Each measurment is tuple with following 413 format: (quality, angle, distance). For values description please 414 refer to `iter_measurments` method's documentation. 415 """ 416 scan = [] 417 iterator = self.iter_measurments(max_buf_meas) 418 for new_scan, quality, angle, distance in iterator: 419 if new_scan: 420 if len(scan) > min_len: 421 yield scan 422 scan = [] 423 if quality > 0 and distance > 0: 424 scan.append((quality, angle, distance))