/ adafruit_rplidar.py
adafruit_rplidar.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2019 Dave Astels for Adafruit Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  
 23  """
 24  `adafruit_rplidar`
 25  ====================================================
 26  
 27  Provide an interface to the Slamtech RPLidar that works in plain Python3
 28  as well as CircuitPython/Blinka.
 29  
 30  * Author(s): Dave Astels
 31  * Based on https://github.com/SkoltechRobotics/rplidar by Artyom Pavlov
 32  
 33  Implementation Notes
 34  --------------------
 35  
 36  **Hardware:**
 37  
 38  
 39  **Software and Dependencies:**
 40  
 41  * Adafruit CircuitPython firmware for the supported boards:
 42    https://github.com/adafruit/circuitpython/releases
 43  
 44  Version 0.0.1 does NOT support CircutPython. Future versions will.
 45  """
 46  
 47  import sys
 48  import time
 49  import struct
 50  
 51  # pylint:disable=invalid-name,undefined-variable,global-variable-not-assigned
 52  # pylint:disable=too-many-arguments
 53  
 54  __version__ = "0.0.1-auto.0"
 55  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RPLIDAR.git"
 56  
 57  SYNC_BYTE = b"\xA5"
 58  SYNC_BYTE2 = b"\x5A"
 59  
 60  GET_INFO_BYTE = b"\x50"
 61  GET_HEALTH_BYTE = b"\x52"
 62  
 63  STOP_BYTE = b"\x25"
 64  RESET_BYTE = b"\x40"
 65  
 66  SCAN_BYTE = b"\x20"
 67  FORCE_SCAN_BYTE = b"\x21"
 68  
 69  DESCRIPTOR_LEN = 7
 70  INFO_LEN = 20
 71  HEALTH_LEN = 3
 72  
 73  INFO_TYPE = 4
 74  HEALTH_TYPE = 6
 75  SCAN_TYPE = 129
 76  
 77  # Constants & Command to start A2 motor
 78  MAX_MOTOR_PWM = 1023
 79  DEFAULT_MOTOR_PWM = 660
 80  SET_PWM_BYTE = b"\xF0"
 81  
 82  _HEALTH_STATUSES = {
 83      0: "Good",
 84      1: "Warning",
 85      2: "Error",
 86  }
 87  
 88  
 89  class RPLidarException(Exception):
 90      """Basic exception class for RPLidar"""
 91  
 92  
 93  def _process_scan(raw):
 94      """Processes input raw data and returns measurment data"""
 95      new_scan = bool(raw[0] & 0b1)
 96      inversed_new_scan = bool((raw[0] >> 1) & 0b1)
 97      quality = raw[0] >> 2
 98      if new_scan == inversed_new_scan:
 99          raise RPLidarException("New scan flags mismatch")
100      check_bit = raw[1] & 0b1
101      if check_bit != 1:
102          raise RPLidarException("Check bit not equal to 1")
103      angle = ((raw[1] >> 1) + (raw[2] << 7)) / 64.0
104      distance = (raw[3] + (raw[4] << 8)) / 4.0
105      return new_scan, quality, angle, distance
106  
107  
108  class RPLidar:
109      """Class for communicating with RPLidar rangefinder scanners"""
110  
111      motor_pin = None  #: DigitalInOut instance controlling the motor
112      _serial_port = None  #: Serial port (or UART) instance
113      port = None  #: Serial port name, e.g. /dev/ttyUSB0
114      timeout = 1  #: Serial port timeout
115      motor = False  #: Is motor running?
116      baudrate = 115200  #: Baudrate for serial port
117  
118      def __init__(self, motor_pin, port, baudrate=115200, timeout=1, logging=False):
119          """Initilize RPLidar object for communicating with the sensor.
120  
121          Parameters
122  
123          port : busio.UART or str
124              Serial port instance or name of the port to which the sensor is connected
125          baudrate : int, optional
126              Baudrate for serial connection (the default is 115200)
127          timeout : float, optional
128              Serial port connection timeout in seconds (the default is 1)
129          logging : whether to output logging information
130          """
131          self.motor_pin = motor_pin
132          self.port = port
133          self.baudrate = baudrate
134          self.timeout = timeout
135          self.motor_running = False
136          self.logging = logging
137  
138          self.is_CP = not isinstance(port, str)
139  
140          if self.is_CP:
141              _serial_port = port
142          else:
143              global serial  # pylint: disable=global-statement
144              import serial  # pylint: disable=import-outside-toplevel
145  
146          self.connect()
147          self.start_motor()
148  
149      def log(self, level, msg):
150          """Output the level and a message if logging is enabled."""
151          if self.logging:
152              sys.stdout.write("{0}: {1}\n".format(level, msg))
153  
154      def log_bytes(self, level, msg, ba):
155          """Log and output a byte array in a readable way."""
156          bs = ["%02x" % b for b in ba]
157          self.log(level, msg + " ".join(bs))
158  
159      def connect(self):
160          """Connects to the serial port named by the port instance var. If it was
161          connected to another serial port disconnects from it first."""
162          if not self.is_CP:
163              if self._serial_port is not None:
164                  self.disconnect()
165              try:
166                  self._serial_port = serial.Serial(
167                      self.port,
168                      self.baudrate,
169                      parity=serial.PARITY_NONE,
170                      stopbits=serial.STOPBITS_ONE,
171                      timeout=self.timeout,
172                      dsrdtr=True,
173                  )
174              except serial.SerialException as err:
175                  raise RPLidarException(
176                      "Failed to connect to the sensor " "due to: %s" % err
177                  )
178  
179      def disconnect(self):
180          """Disconnects from the serial port"""
181          if self._serial_port is None:
182              return
183          self._serial_port.close()
184  
185      def set_pwm(self, pwm):
186          """Set the motor PWM"""
187          assert 0 <= pwm <= MAX_MOTOR_PWM
188          payload = struct.pack("<H", pwm)
189          self._send_payload_cmd(SET_PWM_BYTE, payload)
190  
191      def _control_motor(self, val):
192          """Manipular the motor"""
193          if self.is_CP:
194              self.motor_pin.value = val
195          else:
196              self._serial_port.dtr = not val
197  
198      def start_motor(self):
199          """Starts sensor motor"""
200          self.log("info", "Starting motor")
201          # For A1
202          self._control_motor(True)
203  
204          # For A2
205          self.set_pwm(DEFAULT_MOTOR_PWM)
206          self.motor_running = True
207  
208      def stop_motor(self):
209          """Stops sensor motor"""
210          self.log("info", "Stopping motor")
211          # For A2
212          self.set_pwm(0)
213          time.sleep(0.001)
214          # For A1
215          self._control_motor(False)
216          self.motor_running = False
217  
218      def _send_payload_cmd(self, cmd, payload):
219          """Sends `cmd` command with `payload` to the sensor"""
220          size = struct.pack("B", len(payload))
221          req = SYNC_BYTE + cmd + size + payload
222          checksum = 0
223          for v in struct.unpack("B" * len(req), req):
224              checksum ^= v
225          req += struct.pack("B", checksum)
226          self._serial_port.write(req)
227          self.log_bytes("debug", "Command sent: ", req)
228  
229      def _send_cmd(self, cmd):
230          """Sends `cmd` command to the sensor"""
231          req = SYNC_BYTE + cmd
232          self._serial_port.write(req)
233          self.log_bytes("debug", "Command sent: ", req)
234  
235      def _read_descriptor(self):
236          """Reads descriptor packet"""
237          descriptor = self._serial_port.read(DESCRIPTOR_LEN)
238          self.log_bytes("debug", "Received descriptor:", descriptor)
239          if len(descriptor) != DESCRIPTOR_LEN:
240              raise RPLidarException("Descriptor length mismatch")
241          if not descriptor.startswith(SYNC_BYTE + SYNC_BYTE2):
242              raise RPLidarException("Incorrect descriptor starting bytes")
243          is_single = descriptor[-2] == 0
244          return descriptor[2], is_single, descriptor[-1]
245  
246      def _read_response(self, dsize):
247          """Reads response packet with length of `dsize` bytes"""
248          self.log("debug", "Trying to read response: %d bytes" % dsize)
249          data = self._serial_port.read(dsize)
250          self.log_bytes("debug", "Received data:", data)
251          if len(data) != dsize:
252              raise RPLidarException("Wrong body size")
253          return data
254  
255      @property
256      def info(self):
257          """Get device information
258  
259          Returns
260  
261          dict
262              Dictionary with the sensor information
263          """
264          self._send_cmd(GET_INFO_BYTE)
265          dsize, is_single, dtype = self._read_descriptor()
266          if dsize != INFO_LEN:
267              raise RPLidarException("Wrong info reply length")
268          if not is_single:
269              raise RPLidarException("Not a single response mode")
270          if dtype != INFO_TYPE:
271              raise RPLidarException("Wrong response data type")
272          raw = self._read_response(dsize)
273          serialnumber_bytes = struct.unpack("BBBBBBBBBBBBBBBB", raw[4:])
274          serialnumber = "".join(reversed(["%02x" % b for b in serialnumber_bytes]))
275          data = {
276              "model": raw[0],
277              "firmware": (raw[2], raw[1]),
278              "hardware": raw[3],
279              "serialnumber": serialnumber,
280          }
281          return data
282  
283      @property
284      def health(self):
285          """Get device health state. When the core system detects some
286          potential risk that may cause hardware failure in the future,
287          the returned status value will be 'Warning'. But sensor can still work
288          as normal. When sensor is in the Protection Stop state, the returned
289          status value will be 'Error'. In case of warning or error statuses
290          non-zero error code will be returned.
291  
292          Returns
293  
294          status : str
295              'Good', 'Warning' or 'Error' statuses
296          error_code : int
297              The related error code that caused a warning/error.
298          """
299          self._send_cmd(GET_HEALTH_BYTE)
300          dsize, is_single, dtype = self._read_descriptor()
301          if dsize != HEALTH_LEN:
302              raise RPLidarException("Wrong info reply length")
303          if not is_single:
304              raise RPLidarException("Not a single response mode")
305          if dtype != HEALTH_TYPE:
306              raise RPLidarException("Wrong response data type")
307          raw = self._read_response(dsize)
308          status = _HEALTH_STATUSES[raw[0]]
309          error_code = (raw[1] << 8) + raw[2]
310          return (status, error_code)
311  
312      def clear_input(self):
313          """Clears input buffer by reading all available data"""
314  
315      def stop(self):
316          """Stops scanning process, disables laser diode and the measurment
317          system, moves sensor to the idle state."""
318          self.log("info", "Stoping scanning")
319          self._send_cmd(STOP_BYTE)
320          time.sleep(0.001)
321          self.clear_input()
322  
323      def reset(self):
324          """Resets sensor core, reverting it to a similar state as it has
325          just been powered up."""
326          self.log("info", "Reseting the sensor")
327          self._send_cmd(RESET_BYTE)
328          time.sleep(0.002)
329  
330      def iter_measurments(self, max_buf_meas=500):
331          """Iterate over measurments. Note that consumer must be fast enough,
332          otherwise data will be accumulated inside buffer and consumer will get
333          data with increaing lag.
334  
335          Parameters
336  
337          max_buf_meas : int
338              Maximum number of measurments to be stored inside the buffer. Once
339              numbe exceeds this limit buffer will be emptied out.
340  
341          Yields
342  
343          new_scan : bool
344              True if measurment belongs to a new scan
345          quality : int
346              Reflected laser pulse strength
347          angle : float
348              The measurment heading angle in degree unit [0, 360)
349          distance : float
350              Measured object distance related to the sensor's rotation center.
351              In millimeter unit. Set to 0 when measurment is invalid.
352          """
353          self.start_motor()
354          status, error_code = self.health
355          self.log("debug", "Health status: %s [%d]" % (status, error_code))
356          if status == _HEALTH_STATUSES[2]:
357              self.log(
358                  "warning",
359                  "Trying to reset sensor due to the error. "
360                  "Error code: %d" % (error_code),
361              )
362              self.reset()
363              status, error_code = self.health
364              if status == _HEALTH_STATUSES[2]:
365                  raise RPLidarException(
366                      "RPLidar hardware failure. " "Error code: %d" % error_code
367                  )
368          elif status == _HEALTH_STATUSES[1]:
369              self.log(
370                  "warning",
371                  "Warning sensor status detected! " "Error code: %d" % (error_code),
372              )
373          cmd = SCAN_BYTE
374          self._send_cmd(cmd)
375          dsize, is_single, dtype = self._read_descriptor()
376          if dsize != 5:
377              raise RPLidarException("Wrong info reply length")
378          if is_single:
379              raise RPLidarException("Not a multiple response mode")
380          if dtype != SCAN_TYPE:
381              raise RPLidarException("Wrong response data type")
382          while True:
383              raw = self._read_response(dsize)
384              self.log_bytes("debug", "Received scan response: ", raw)
385              if max_buf_meas:
386                  data_in_buf = self._serial_port.in_waiting
387                  if data_in_buf > max_buf_meas * dsize:
388                      self.log(
389                          "warning",
390                          "Too many measurments in the input buffer: %d/%d. "
391                          "Clearing buffer..." % (data_in_buf // dsize, max_buf_meas),
392                      )
393                      self._serial_port.read(data_in_buf // dsize * dsize)
394              yield _process_scan(raw)
395  
396      def iter_scans(self, max_buf_meas=500, min_len=5):
397          """Iterate over scans. Note that consumer must be fast enough,
398          otherwise data will be accumulated inside buffer and consumer will get
399          data with increasing lag.
400  
401          Parameters
402  
403          max_buf_meas : int
404              Maximum number of measurments to be stored inside the buffer. Once
405              numbe exceeds this limit buffer will be emptied out.
406          min_len : int
407              Minimum number of measurments in the scan for it to be yelded.
408  
409          Yields
410  
411          scan : list
412              List of the measurments. Each measurment is tuple with following
413              format: (quality, angle, distance). For values description please
414              refer to `iter_measurments` method's documentation.
415          """
416          scan = []
417          iterator = self.iter_measurments(max_buf_meas)
418          for new_scan, quality, angle, distance in iterator:
419              if new_scan:
420                  if len(scan) > min_len:
421                      yield scan
422                  scan = []
423              if quality > 0 and distance > 0:
424                  scan.append((quality, angle, distance))