/ adafruit_gps.py
adafruit_gps.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2017 Tony DiCola for Adafruit Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_gps`
 24  ====================================================
 25  
 26  GPS parsing module.  Can parse simple NMEA data sentences from serial GPS
 27  modules to read latitude, longitude, and more.
 28  
 29  * Author(s): Tony DiCola
 30  
 31  Implementation Notes
 32  --------------------
 33  
 34  **Hardware:**
 35  
 36  * Adafruit `Ultimate GPS Breakout <https://www.adafruit.com/product/746>`_
 37  * Adafruit `Ultimate GPS FeatherWing <https://www.adafruit.com/product/3133>`_
 38  
 39  **Software and Dependencies:**
 40  
 41  * Adafruit CircuitPython firmware for the ESP8622 and M0-based boards:
 42    https://github.com/adafruit/circuitpython/releases
 43  
 44  """
 45  import time
 46  from micropython import const
 47  
 48  __version__ = "0.0.0-auto.0"
 49  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_GPS.git"
 50  
 51  
 52  _GPSI2C_DEFAULT_ADDRESS = const(0x10)
 53  
 54  # Internal helper parsing functions.
 55  # These handle input that might be none or null and return none instead of
 56  # throwing errors.
 57  def _parse_degrees(nmea_data):
 58      # Parse a NMEA lat/long data pair 'dddmm.mmmm' into a pure degrees value.
 59      # Where ddd is the degrees, mm.mmmm is the minutes.
 60      if nmea_data is None or len(nmea_data) < 3:
 61          return None
 62      raw = float(nmea_data)
 63      deg = raw // 100
 64      minutes = raw % 100
 65      return deg + minutes / 60
 66  
 67  
 68  def _parse_int(nmea_data):
 69      if nmea_data is None or nmea_data == "":
 70          return None
 71      return int(nmea_data)
 72  
 73  
 74  def _parse_float(nmea_data):
 75      if nmea_data is None or nmea_data == "":
 76          return None
 77      return float(nmea_data)
 78  
 79  
 80  def _parse_str(nmea_data):
 81      if nmea_data is None or nmea_data == "":
 82          return None
 83      return str(nmea_data)
 84  
 85  
 86  # lint warning about too many attributes disabled
 87  # pylint: disable-msg=R0902
 88  
 89  
 90  class GPS:
 91      """GPS parsing module.  Can parse simple NMEA data sentences from serial
 92      GPS modules to read latitude, longitude, and more.
 93      """
 94  
 95      def __init__(self, uart, debug=False):
 96          self._uart = uart
 97          # Initialize null starting values for GPS attributes.
 98          self.timestamp_utc = None
 99          self.latitude = None
100          self.longitude = None
101          self.fix_quality = None
102          self.fix_quality_3d = None
103          self.satellites = None
104          self.satellites_prev = None
105          self.horizontal_dilution = None
106          self.altitude_m = None
107          self.height_geoid = None
108          self.speed_knots = None
109          self.track_angle_deg = None
110          self.sats = None
111          self.isactivedata = None
112          self.true_track = None
113          self.mag_track = None
114          self.sat_prns = None
115          self.sel_mode = None
116          self.pdop = None
117          self.hdop = None
118          self.vdop = None
119          self.total_mess_num = None
120          self.mess_num = None
121          self._raw_sentence = None
122          self.debug = debug
123  
124      def update(self):
125          """Check for updated data from the GPS module and process it
126          accordingly.  Returns True if new data was processed, and False if
127          nothing new was received.
128          """
129          # Grab a sentence and check its data type to call the appropriate
130          # parsing function.
131          try:
132              sentence = self._parse_sentence()
133          except UnicodeError:
134              return None
135          if sentence is None:
136              return False
137          if self.debug:
138              print(sentence)
139          data_type, args = sentence
140          data_type = bytes(data_type.upper(), "ascii")
141          # return sentence
142          if data_type in (
143              b"GPGLL",
144              b"GNGLL",
145          ):  # GLL, Geographic Position – Latitude/Longitude
146              self._parse_gpgll(args)
147          elif data_type in (b"GPRMC", b"GNRMC"):  # RMC, minimum location info
148              self._parse_gprmc(args)
149          elif data_type in (b"GPGGA", b"GNGGA"):  # GGA, 3d location fix
150              self._parse_gpgga(args)
151          return True
152  
153      def send_command(self, command, add_checksum=True):
154          """Send a command string to the GPS.  If add_checksum is True (the
155          default) a NMEA checksum will automatically be computed and added.
156          Note you should NOT add the leading $ and trailing * to the command
157          as they will automatically be added!
158          """
159          self.write(b"$")
160          self.write(command)
161          if add_checksum:
162              checksum = 0
163              for char in command:
164                  checksum ^= char
165              self.write(b"*")
166              self.write(bytes("{:02x}".format(checksum).upper(), "ascii"))
167          self.write(b"\r\n")
168  
169      @property
170      def has_fix(self):
171          """True if a current fix for location information is available."""
172          return self.fix_quality is not None and self.fix_quality >= 1
173  
174      @property
175      def has_3d_fix(self):
176          """Returns true if there is a 3d fix available.
177          use has_fix to determine if a 2d fix is available,
178          passing it the same data"""
179          return self.fix_quality_3d is not None and self.fix_quality_3d >= 2
180  
181      @property
182      def datetime(self):
183          """Return struct_time object to feed rtc.set_time_source() function"""
184          return self.timestamp_utc
185  
186      @property
187      def nmea_sentence(self):
188          """Return raw_sentence which is the raw NMEA sentence read from the GPS"""
189          return self._raw_sentence
190  
191      def read(self, num_bytes):
192          """Read up to num_bytes of data from the GPS directly, without parsing.
193          Returns a bytearray with up to num_bytes or None if nothing was read"""
194          return self._uart.read(num_bytes)
195  
196      def write(self, bytestr):
197          """Write a bytestring data to the GPS directly, without parsing
198          or checksums"""
199          return self._uart.write(bytestr)
200  
201      @property
202      def in_waiting(self):
203          """Returns number of bytes available in UART read buffer"""
204          return self._uart.in_waiting
205  
206      def readline(self):
207          """Returns a newline terminated bytearray, must have timeout set for
208          the underlying UART or this will block forever!"""
209          return self._uart.readline()
210  
211      def _read_sentence(self):
212          # Parse any NMEA sentence that is available.
213          # pylint: disable=len-as-condition
214          # This needs to be refactored when it can be tested.
215  
216          # Only continue if we have at least 32 bytes in the input buffer
217          if self.in_waiting < 32:
218              return None
219  
220          sentence = self.readline()
221          if sentence is None or sentence == b"" or len(sentence) < 1:
222              return None
223          try:
224              sentence = str(sentence, "ascii").strip()
225          except UnicodeError:
226              return None
227          # Look for a checksum and validate it if present.
228          if len(sentence) > 7 and sentence[-3] == "*":
229              # Get included checksum, then calculate it and compare.
230              expected = int(sentence[-2:], 16)
231              actual = 0
232              for i in range(1, len(sentence) - 3):
233                  actual ^= ord(sentence[i])
234              if actual != expected:
235                  return None  # Failed to validate checksum.
236  
237              # copy the raw sentence
238              self._raw_sentence = sentence
239  
240              return sentence
241          # At this point we don't have a valid sentence
242          return None
243  
244      def _parse_sentence(self):
245          sentence = self._read_sentence()
246  
247          # sentence is a valid NMEA with a valid checksum
248          if sentence is None:
249              return None
250  
251          # Remove checksum once validated.
252          sentence = sentence[:-3]
253          # Parse out the type of sentence (first string after $ up to comma)
254          # and then grab the rest as data within the sentence.
255          delimiter = sentence.find(",")
256          if delimiter == -1:
257              return None  # Invalid sentence, no comma after data type.
258          data_type = sentence[1:delimiter]
259          return (data_type, sentence[delimiter + 1 :])
260  
261      def _parse_gpgll(self, args):
262          data = args.split(",")
263          if data is None or data[0] is None or (data[0] == ""):
264              return  # Unexpected number of params.
265  
266          # Parse latitude and longitude.
267          self.latitude = _parse_degrees(data[0])
268          if self.latitude is not None and data[1] is not None and data[1].lower() == "s":
269              self.latitude *= -1.0
270          self.longitude = _parse_degrees(data[2])
271          if (
272              self.longitude is not None
273              and data[3] is not None
274              and data[3].lower() == "w"
275          ):
276              self.longitude *= -1.0
277          time_utc = int(_parse_int(float(data[4])))
278          if time_utc is not None:
279              hours = time_utc // 10000
280              mins = (time_utc // 100) % 100
281              secs = time_utc % 100
282              # Set or update time to a friendly python time struct.
283              if self.timestamp_utc is not None:
284                  self.timestamp_utc = time.struct_time(
285                      (0, 0, 0, hours, mins, secs, 0, 0, -1)
286                  )
287              else:
288                  self.timestamp_utc = time.struct_time(
289                      (0, 0, 0, hours, mins, secs, 0, 0, -1)
290                  )
291          # Parse data active or void
292          self.isactivedata = _parse_str(data[5])
293  
294      def _parse_gprmc(self, args):
295          # Parse the arguments (everything after data type) for NMEA GPRMC
296          # minimum location fix sentence.
297          data = args.split(",")
298          if data is None or len(data) < 11 or data[0] is None or (data[0] == ""):
299              return  # Unexpected number of params.
300          # Parse fix time.
301          time_utc = int(_parse_float(data[0]))
302          if time_utc is not None:
303              hours = time_utc // 10000
304              mins = (time_utc // 100) % 100
305              secs = time_utc % 100
306              # Set or update time to a friendly python time struct.
307              if self.timestamp_utc is not None:
308                  self.timestamp_utc = time.struct_time(
309                      (
310                          self.timestamp_utc.tm_year,
311                          self.timestamp_utc.tm_mon,
312                          self.timestamp_utc.tm_mday,
313                          hours,
314                          mins,
315                          secs,
316                          0,
317                          0,
318                          -1,
319                      )
320                  )
321              else:
322                  self.timestamp_utc = time.struct_time(
323                      (0, 0, 0, hours, mins, secs, 0, 0, -1)
324                  )
325          # Parse status (active/fixed or void).
326          status = data[1]
327          self.fix_quality = 0
328          if status is not None and status.lower() == "a":
329              self.fix_quality = 1
330          # Parse latitude and longitude.
331          self.latitude = _parse_degrees(data[2])
332          if self.latitude is not None and data[3] is not None and data[3].lower() == "s":
333              self.latitude *= -1.0
334          self.longitude = _parse_degrees(data[4])
335          if (
336              self.longitude is not None
337              and data[5] is not None
338              and data[5].lower() == "w"
339          ):
340              self.longitude *= -1.0
341          # Parse out speed and other simple numeric values.
342          self.speed_knots = _parse_float(data[6])
343          self.track_angle_deg = _parse_float(data[7])
344          # Parse date.
345          if data[8] is not None and len(data[8]) == 6:
346              day = int(data[8][0:2])
347              month = int(data[8][2:4])
348              year = 2000 + int(data[8][4:6])  # Y2k bug, 2 digit year assumption.
349              # This is a problem with the NMEA
350              # spec and not this code.
351              if self.timestamp_utc is not None:
352                  # Replace the timestamp with an updated one.
353                  # (struct_time is immutable and can't be changed in place)
354                  self.timestamp_utc = time.struct_time(
355                      (
356                          year,
357                          month,
358                          day,
359                          self.timestamp_utc.tm_hour,
360                          self.timestamp_utc.tm_min,
361                          self.timestamp_utc.tm_sec,
362                          0,
363                          0,
364                          -1,
365                      )
366                  )
367              else:
368                  # Time hasn't been set so create it.
369                  self.timestamp_utc = time.struct_time(
370                      (year, month, day, 0, 0, 0, 0, 0, -1)
371                  )
372  
373      def _parse_gpgga(self, args):
374          # Parse the arguments (everything after data type) for NMEA GPGGA
375          # 3D location fix sentence.
376          data = args.split(",")
377          if data is None or len(data) != 14 or (data[0] == ""):
378              return  # Unexpected number of params.
379          # Parse fix time.
380          time_utc = int(_parse_float(data[0]))
381          if time_utc is not None:
382              hours = time_utc // 10000
383              mins = (time_utc // 100) % 100
384              secs = time_utc % 100
385              # Set or update time to a friendly python time struct.
386              if self.timestamp_utc is not None:
387                  self.timestamp_utc = time.struct_time(
388                      (
389                          self.timestamp_utc.tm_year,
390                          self.timestamp_utc.tm_mon,
391                          self.timestamp_utc.tm_mday,
392                          hours,
393                          mins,
394                          secs,
395                          0,
396                          0,
397                          -1,
398                      )
399                  )
400              else:
401                  self.timestamp_utc = time.struct_time(
402                      (0, 0, 0, hours, mins, secs, 0, 0, -1)
403                  )
404          # Parse latitude and longitude.
405          self.latitude = _parse_degrees(data[1])
406          if self.latitude is not None and data[2] is not None and data[2].lower() == "s":
407              self.latitude *= -1.0
408          self.longitude = _parse_degrees(data[3])
409          if (
410              self.longitude is not None
411              and data[4] is not None
412              and data[4].lower() == "w"
413          ):
414              self.longitude *= -1.0
415          # Parse out fix quality and other simple numeric values.
416          self.fix_quality = _parse_int(data[5])
417          self.satellites = _parse_int(data[6])
418          self.horizontal_dilution = _parse_float(data[7])
419          self.altitude_m = _parse_float(data[8])
420          self.height_geoid = _parse_float(data[10])
421  
422      def _parse_gpgsa(self, args):
423          data = args.split(",")
424          if data is None or (data[0] == ""):
425              return  # Unexpected number of params
426  
427          # Parse selection mode
428          self.sel_mode = _parse_str(data[0])
429          # Parse 3d fix
430          self.fix_quality_3d = _parse_int(data[1])
431          satlist = list(filter(None, data[2:-4]))
432          self.sat_prns = {}
433          for i, sat in enumerate(satlist, 1):
434              self.sat_prns["gps{}".format(i)] = _parse_int(sat)
435  
436          # Parse PDOP, dilution of precision
437          self.pdop = _parse_float(data[-3])
438          # Parse HDOP, horizontal dilution of precision
439          self.hdop = _parse_float(data[-2])
440          # Parse VDOP, vertical dilution of precision
441          self.vdop = _parse_float(data[-1])
442  
443      def _parse_gpgsv(self, args):
444          # Parse the arguments (everything after data type) for NMEA GPGGA
445          # 3D location fix sentence.
446          data = args.split(",")
447          if data is None or (data[0] == ""):
448              return  # Unexpected number of params.
449  
450          # Parse number of messages
451          self.total_mess_num = _parse_int(data[0])  # Total number of messages
452          # Parse message number
453          self.mess_num = _parse_int(data[1])  # Message number
454          # Parse number of satellites in view
455          self.satellites = _parse_int(data[2])  # Number of satellites
456  
457          if len(data) < 3:
458              return
459  
460          sat_tup = data[3:]
461  
462          satdict = {}
463          for i in range(len(sat_tup) / 4):
464              j = i * 4
465              key = "gps{}".format(i + (4 * (self.mess_num - 1)))
466              satnum = _parse_int(sat_tup[0 + j])  # Satellite number
467              satdeg = _parse_int(sat_tup[1 + j])  # Elevation in degrees
468              satazim = _parse_int(sat_tup[2 + j])  # Azimuth in degrees
469              satsnr = _parse_int(sat_tup[3 + j])  # signal-to-noise ratio in dB
470              value = (satnum, satdeg, satazim, satsnr)
471              satdict[key] = value
472  
473          if self.sats is None:
474              self.sats = {}
475          for satnum in satdict:
476              self.sats[satnum] = satdict[satnum]
477  
478          try:
479              if self.satellites < self.satellites_prev:
480                  for i in self.sats:
481                      try:
482                          if int(i[-2]) >= self.satellites:
483                              del self.sats[i]
484                      except ValueError:
485                          if int(i[-1]) >= self.satellites:
486                              del self.sats[i]
487          except TypeError:
488              pass
489          self.satellites_prev = self.satellites
490  
491  
492  class GPS_GtopI2C(GPS):
493      """GTop-compatible I2C GPS parsing module.  Can parse simple NMEA data
494      sentences from an I2C-capable GPS module to read latitude, longitude, and more.
495      """
496  
497      def __init__(
498          self, i2c_bus, *, address=_GPSI2C_DEFAULT_ADDRESS, debug=False, timeout=5
499      ):
500          import adafruit_bus_device.i2c_device as i2c_device  # pylint: disable=import-outside-toplevel
501  
502          super().__init__(None, debug)  # init the parent with no UART
503          self._i2c = i2c_device.I2CDevice(i2c_bus, address)
504          self._lastbyte = None
505          self._charbuff = bytearray(1)
506          self._internalbuffer = []
507          self._timeout = timeout
508  
509      def read(self, num_bytes=1):
510          """Read up to num_bytes of data from the GPS directly, without parsing.
511          Returns a bytearray with up to num_bytes or None if nothing was read"""
512          result = []
513          for _ in range(num_bytes):
514              with self._i2c as i2c:
515                  # we read one byte at a time, verify it isnt part of a string of
516                  # 'stuffed' newlines and then append to our result array for byteification
517                  i2c.readinto(self._charbuff)
518                  char = self._charbuff[0]
519                  if (char == ord("\n")) and (self._lastbyte != ord("\r")):
520                      continue  # skip duplicate \n's!
521                  result.append(char)
522                  self._lastbyte = char  # keep track of the last character approved
523          return bytearray(result)
524  
525      def write(self, bytestr):
526          """Write a bytestring data to the GPS directly, without parsing
527          or checksums"""
528          with self._i2c as i2c:
529              i2c.write(bytestr)
530  
531      @property
532      def in_waiting(self):
533          """Returns number of bytes available in UART read buffer, always 32
534          since I2C does not have the ability to know how much data is available"""
535          return 32
536  
537      def readline(self):
538          """Returns a newline terminated bytearray, must have timeout set for
539          the underlying UART or this will block forever!"""
540          timeout = time.monotonic() + self._timeout
541          while timeout > time.monotonic():
542              # check if our internal buffer has a '\n' termination already
543              if self._internalbuffer and (self._internalbuffer[-1] == ord("\n")):
544                  break
545              char = self.read(1)
546              if not char:
547                  continue
548              self._internalbuffer.append(char[0])
549              # print(bytearray(self._internalbuffer))
550          if self._internalbuffer and self._internalbuffer[-1] == ord("\n"):
551              ret = bytearray(self._internalbuffer)
552              self._internalbuffer = []  # reset the buffer to empty
553              return ret
554          return None  # no completed data yet