/ adafruit_gps.py
adafruit_gps.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2017 Tony DiCola for Adafruit Industries 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `adafruit_gps` 24 ==================================================== 25 26 GPS parsing module. Can parse simple NMEA data sentences from serial GPS 27 modules to read latitude, longitude, and more. 28 29 * Author(s): Tony DiCola 30 31 Implementation Notes 32 -------------------- 33 34 **Hardware:** 35 36 * Adafruit `Ultimate GPS Breakout <https://www.adafruit.com/product/746>`_ 37 * Adafruit `Ultimate GPS FeatherWing <https://www.adafruit.com/product/3133>`_ 38 39 **Software and Dependencies:** 40 41 * Adafruit CircuitPython firmware for the ESP8622 and M0-based boards: 42 https://github.com/adafruit/circuitpython/releases 43 44 """ 45 import time 46 from micropython import const 47 48 __version__ = "0.0.0-auto.0" 49 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_GPS.git" 50 51 52 _GPSI2C_DEFAULT_ADDRESS = const(0x10) 53 54 # Internal helper parsing functions. 55 # These handle input that might be none or null and return none instead of 56 # throwing errors. 57 def _parse_degrees(nmea_data): 58 # Parse a NMEA lat/long data pair 'dddmm.mmmm' into a pure degrees value. 59 # Where ddd is the degrees, mm.mmmm is the minutes. 60 if nmea_data is None or len(nmea_data) < 3: 61 return None 62 raw = float(nmea_data) 63 deg = raw // 100 64 minutes = raw % 100 65 return deg + minutes / 60 66 67 68 def _parse_int(nmea_data): 69 if nmea_data is None or nmea_data == "": 70 return None 71 return int(nmea_data) 72 73 74 def _parse_float(nmea_data): 75 if nmea_data is None or nmea_data == "": 76 return None 77 return float(nmea_data) 78 79 80 def _parse_str(nmea_data): 81 if nmea_data is None or nmea_data == "": 82 return None 83 return str(nmea_data) 84 85 86 # lint warning about too many attributes disabled 87 # pylint: disable-msg=R0902 88 89 90 class GPS: 91 """GPS parsing module. Can parse simple NMEA data sentences from serial 92 GPS modules to read latitude, longitude, and more. 93 """ 94 95 def __init__(self, uart, debug=False): 96 self._uart = uart 97 # Initialize null starting values for GPS attributes. 98 self.timestamp_utc = None 99 self.latitude = None 100 self.longitude = None 101 self.fix_quality = None 102 self.fix_quality_3d = None 103 self.satellites = None 104 self.satellites_prev = None 105 self.horizontal_dilution = None 106 self.altitude_m = None 107 self.height_geoid = None 108 self.speed_knots = None 109 self.track_angle_deg = None 110 self.sats = None 111 self.isactivedata = None 112 self.true_track = None 113 self.mag_track = None 114 self.sat_prns = None 115 self.sel_mode = None 116 self.pdop = None 117 self.hdop = None 118 self.vdop = None 119 self.total_mess_num = None 120 self.mess_num = None 121 self._raw_sentence = None 122 self.debug = debug 123 124 def update(self): 125 """Check for updated data from the GPS module and process it 126 accordingly. Returns True if new data was processed, and False if 127 nothing new was received. 128 """ 129 # Grab a sentence and check its data type to call the appropriate 130 # parsing function. 131 try: 132 sentence = self._parse_sentence() 133 except UnicodeError: 134 return None 135 if sentence is None: 136 return False 137 if self.debug: 138 print(sentence) 139 data_type, args = sentence 140 data_type = bytes(data_type.upper(), "ascii") 141 # return sentence 142 if data_type in ( 143 b"GPGLL", 144 b"GNGLL", 145 ): # GLL, Geographic Position – Latitude/Longitude 146 self._parse_gpgll(args) 147 elif data_type in (b"GPRMC", b"GNRMC"): # RMC, minimum location info 148 self._parse_gprmc(args) 149 elif data_type in (b"GPGGA", b"GNGGA"): # GGA, 3d location fix 150 self._parse_gpgga(args) 151 return True 152 153 def send_command(self, command, add_checksum=True): 154 """Send a command string to the GPS. If add_checksum is True (the 155 default) a NMEA checksum will automatically be computed and added. 156 Note you should NOT add the leading $ and trailing * to the command 157 as they will automatically be added! 158 """ 159 self.write(b"$") 160 self.write(command) 161 if add_checksum: 162 checksum = 0 163 for char in command: 164 checksum ^= char 165 self.write(b"*") 166 self.write(bytes("{:02x}".format(checksum).upper(), "ascii")) 167 self.write(b"\r\n") 168 169 @property 170 def has_fix(self): 171 """True if a current fix for location information is available.""" 172 return self.fix_quality is not None and self.fix_quality >= 1 173 174 @property 175 def has_3d_fix(self): 176 """Returns true if there is a 3d fix available. 177 use has_fix to determine if a 2d fix is available, 178 passing it the same data""" 179 return self.fix_quality_3d is not None and self.fix_quality_3d >= 2 180 181 @property 182 def datetime(self): 183 """Return struct_time object to feed rtc.set_time_source() function""" 184 return self.timestamp_utc 185 186 @property 187 def nmea_sentence(self): 188 """Return raw_sentence which is the raw NMEA sentence read from the GPS""" 189 return self._raw_sentence 190 191 def read(self, num_bytes): 192 """Read up to num_bytes of data from the GPS directly, without parsing. 193 Returns a bytearray with up to num_bytes or None if nothing was read""" 194 return self._uart.read(num_bytes) 195 196 def write(self, bytestr): 197 """Write a bytestring data to the GPS directly, without parsing 198 or checksums""" 199 return self._uart.write(bytestr) 200 201 @property 202 def in_waiting(self): 203 """Returns number of bytes available in UART read buffer""" 204 return self._uart.in_waiting 205 206 def readline(self): 207 """Returns a newline terminated bytearray, must have timeout set for 208 the underlying UART or this will block forever!""" 209 return self._uart.readline() 210 211 def _read_sentence(self): 212 # Parse any NMEA sentence that is available. 213 # pylint: disable=len-as-condition 214 # This needs to be refactored when it can be tested. 215 216 # Only continue if we have at least 32 bytes in the input buffer 217 if self.in_waiting < 32: 218 return None 219 220 sentence = self.readline() 221 if sentence is None or sentence == b"" or len(sentence) < 1: 222 return None 223 try: 224 sentence = str(sentence, "ascii").strip() 225 except UnicodeError: 226 return None 227 # Look for a checksum and validate it if present. 228 if len(sentence) > 7 and sentence[-3] == "*": 229 # Get included checksum, then calculate it and compare. 230 expected = int(sentence[-2:], 16) 231 actual = 0 232 for i in range(1, len(sentence) - 3): 233 actual ^= ord(sentence[i]) 234 if actual != expected: 235 return None # Failed to validate checksum. 236 237 # copy the raw sentence 238 self._raw_sentence = sentence 239 240 return sentence 241 # At this point we don't have a valid sentence 242 return None 243 244 def _parse_sentence(self): 245 sentence = self._read_sentence() 246 247 # sentence is a valid NMEA with a valid checksum 248 if sentence is None: 249 return None 250 251 # Remove checksum once validated. 252 sentence = sentence[:-3] 253 # Parse out the type of sentence (first string after $ up to comma) 254 # and then grab the rest as data within the sentence. 255 delimiter = sentence.find(",") 256 if delimiter == -1: 257 return None # Invalid sentence, no comma after data type. 258 data_type = sentence[1:delimiter] 259 return (data_type, sentence[delimiter + 1 :]) 260 261 def _parse_gpgll(self, args): 262 data = args.split(",") 263 if data is None or data[0] is None or (data[0] == ""): 264 return # Unexpected number of params. 265 266 # Parse latitude and longitude. 267 self.latitude = _parse_degrees(data[0]) 268 if self.latitude is not None and data[1] is not None and data[1].lower() == "s": 269 self.latitude *= -1.0 270 self.longitude = _parse_degrees(data[2]) 271 if ( 272 self.longitude is not None 273 and data[3] is not None 274 and data[3].lower() == "w" 275 ): 276 self.longitude *= -1.0 277 time_utc = int(_parse_int(float(data[4]))) 278 if time_utc is not None: 279 hours = time_utc // 10000 280 mins = (time_utc // 100) % 100 281 secs = time_utc % 100 282 # Set or update time to a friendly python time struct. 283 if self.timestamp_utc is not None: 284 self.timestamp_utc = time.struct_time( 285 (0, 0, 0, hours, mins, secs, 0, 0, -1) 286 ) 287 else: 288 self.timestamp_utc = time.struct_time( 289 (0, 0, 0, hours, mins, secs, 0, 0, -1) 290 ) 291 # Parse data active or void 292 self.isactivedata = _parse_str(data[5]) 293 294 def _parse_gprmc(self, args): 295 # Parse the arguments (everything after data type) for NMEA GPRMC 296 # minimum location fix sentence. 297 data = args.split(",") 298 if data is None or len(data) < 11 or data[0] is None or (data[0] == ""): 299 return # Unexpected number of params. 300 # Parse fix time. 301 time_utc = int(_parse_float(data[0])) 302 if time_utc is not None: 303 hours = time_utc // 10000 304 mins = (time_utc // 100) % 100 305 secs = time_utc % 100 306 # Set or update time to a friendly python time struct. 307 if self.timestamp_utc is not None: 308 self.timestamp_utc = time.struct_time( 309 ( 310 self.timestamp_utc.tm_year, 311 self.timestamp_utc.tm_mon, 312 self.timestamp_utc.tm_mday, 313 hours, 314 mins, 315 secs, 316 0, 317 0, 318 -1, 319 ) 320 ) 321 else: 322 self.timestamp_utc = time.struct_time( 323 (0, 0, 0, hours, mins, secs, 0, 0, -1) 324 ) 325 # Parse status (active/fixed or void). 326 status = data[1] 327 self.fix_quality = 0 328 if status is not None and status.lower() == "a": 329 self.fix_quality = 1 330 # Parse latitude and longitude. 331 self.latitude = _parse_degrees(data[2]) 332 if self.latitude is not None and data[3] is not None and data[3].lower() == "s": 333 self.latitude *= -1.0 334 self.longitude = _parse_degrees(data[4]) 335 if ( 336 self.longitude is not None 337 and data[5] is not None 338 and data[5].lower() == "w" 339 ): 340 self.longitude *= -1.0 341 # Parse out speed and other simple numeric values. 342 self.speed_knots = _parse_float(data[6]) 343 self.track_angle_deg = _parse_float(data[7]) 344 # Parse date. 345 if data[8] is not None and len(data[8]) == 6: 346 day = int(data[8][0:2]) 347 month = int(data[8][2:4]) 348 year = 2000 + int(data[8][4:6]) # Y2k bug, 2 digit year assumption. 349 # This is a problem with the NMEA 350 # spec and not this code. 351 if self.timestamp_utc is not None: 352 # Replace the timestamp with an updated one. 353 # (struct_time is immutable and can't be changed in place) 354 self.timestamp_utc = time.struct_time( 355 ( 356 year, 357 month, 358 day, 359 self.timestamp_utc.tm_hour, 360 self.timestamp_utc.tm_min, 361 self.timestamp_utc.tm_sec, 362 0, 363 0, 364 -1, 365 ) 366 ) 367 else: 368 # Time hasn't been set so create it. 369 self.timestamp_utc = time.struct_time( 370 (year, month, day, 0, 0, 0, 0, 0, -1) 371 ) 372 373 def _parse_gpgga(self, args): 374 # Parse the arguments (everything after data type) for NMEA GPGGA 375 # 3D location fix sentence. 376 data = args.split(",") 377 if data is None or len(data) != 14 or (data[0] == ""): 378 return # Unexpected number of params. 379 # Parse fix time. 380 time_utc = int(_parse_float(data[0])) 381 if time_utc is not None: 382 hours = time_utc // 10000 383 mins = (time_utc // 100) % 100 384 secs = time_utc % 100 385 # Set or update time to a friendly python time struct. 386 if self.timestamp_utc is not None: 387 self.timestamp_utc = time.struct_time( 388 ( 389 self.timestamp_utc.tm_year, 390 self.timestamp_utc.tm_mon, 391 self.timestamp_utc.tm_mday, 392 hours, 393 mins, 394 secs, 395 0, 396 0, 397 -1, 398 ) 399 ) 400 else: 401 self.timestamp_utc = time.struct_time( 402 (0, 0, 0, hours, mins, secs, 0, 0, -1) 403 ) 404 # Parse latitude and longitude. 405 self.latitude = _parse_degrees(data[1]) 406 if self.latitude is not None and data[2] is not None and data[2].lower() == "s": 407 self.latitude *= -1.0 408 self.longitude = _parse_degrees(data[3]) 409 if ( 410 self.longitude is not None 411 and data[4] is not None 412 and data[4].lower() == "w" 413 ): 414 self.longitude *= -1.0 415 # Parse out fix quality and other simple numeric values. 416 self.fix_quality = _parse_int(data[5]) 417 self.satellites = _parse_int(data[6]) 418 self.horizontal_dilution = _parse_float(data[7]) 419 self.altitude_m = _parse_float(data[8]) 420 self.height_geoid = _parse_float(data[10]) 421 422 def _parse_gpgsa(self, args): 423 data = args.split(",") 424 if data is None or (data[0] == ""): 425 return # Unexpected number of params 426 427 # Parse selection mode 428 self.sel_mode = _parse_str(data[0]) 429 # Parse 3d fix 430 self.fix_quality_3d = _parse_int(data[1]) 431 satlist = list(filter(None, data[2:-4])) 432 self.sat_prns = {} 433 for i, sat in enumerate(satlist, 1): 434 self.sat_prns["gps{}".format(i)] = _parse_int(sat) 435 436 # Parse PDOP, dilution of precision 437 self.pdop = _parse_float(data[-3]) 438 # Parse HDOP, horizontal dilution of precision 439 self.hdop = _parse_float(data[-2]) 440 # Parse VDOP, vertical dilution of precision 441 self.vdop = _parse_float(data[-1]) 442 443 def _parse_gpgsv(self, args): 444 # Parse the arguments (everything after data type) for NMEA GPGGA 445 # 3D location fix sentence. 446 data = args.split(",") 447 if data is None or (data[0] == ""): 448 return # Unexpected number of params. 449 450 # Parse number of messages 451 self.total_mess_num = _parse_int(data[0]) # Total number of messages 452 # Parse message number 453 self.mess_num = _parse_int(data[1]) # Message number 454 # Parse number of satellites in view 455 self.satellites = _parse_int(data[2]) # Number of satellites 456 457 if len(data) < 3: 458 return 459 460 sat_tup = data[3:] 461 462 satdict = {} 463 for i in range(len(sat_tup) / 4): 464 j = i * 4 465 key = "gps{}".format(i + (4 * (self.mess_num - 1))) 466 satnum = _parse_int(sat_tup[0 + j]) # Satellite number 467 satdeg = _parse_int(sat_tup[1 + j]) # Elevation in degrees 468 satazim = _parse_int(sat_tup[2 + j]) # Azimuth in degrees 469 satsnr = _parse_int(sat_tup[3 + j]) # signal-to-noise ratio in dB 470 value = (satnum, satdeg, satazim, satsnr) 471 satdict[key] = value 472 473 if self.sats is None: 474 self.sats = {} 475 for satnum in satdict: 476 self.sats[satnum] = satdict[satnum] 477 478 try: 479 if self.satellites < self.satellites_prev: 480 for i in self.sats: 481 try: 482 if int(i[-2]) >= self.satellites: 483 del self.sats[i] 484 except ValueError: 485 if int(i[-1]) >= self.satellites: 486 del self.sats[i] 487 except TypeError: 488 pass 489 self.satellites_prev = self.satellites 490 491 492 class GPS_GtopI2C(GPS): 493 """GTop-compatible I2C GPS parsing module. Can parse simple NMEA data 494 sentences from an I2C-capable GPS module to read latitude, longitude, and more. 495 """ 496 497 def __init__( 498 self, i2c_bus, *, address=_GPSI2C_DEFAULT_ADDRESS, debug=False, timeout=5 499 ): 500 import adafruit_bus_device.i2c_device as i2c_device # pylint: disable=import-outside-toplevel 501 502 super().__init__(None, debug) # init the parent with no UART 503 self._i2c = i2c_device.I2CDevice(i2c_bus, address) 504 self._lastbyte = None 505 self._charbuff = bytearray(1) 506 self._internalbuffer = [] 507 self._timeout = timeout 508 509 def read(self, num_bytes=1): 510 """Read up to num_bytes of data from the GPS directly, without parsing. 511 Returns a bytearray with up to num_bytes or None if nothing was read""" 512 result = [] 513 for _ in range(num_bytes): 514 with self._i2c as i2c: 515 # we read one byte at a time, verify it isnt part of a string of 516 # 'stuffed' newlines and then append to our result array for byteification 517 i2c.readinto(self._charbuff) 518 char = self._charbuff[0] 519 if (char == ord("\n")) and (self._lastbyte != ord("\r")): 520 continue # skip duplicate \n's! 521 result.append(char) 522 self._lastbyte = char # keep track of the last character approved 523 return bytearray(result) 524 525 def write(self, bytestr): 526 """Write a bytestring data to the GPS directly, without parsing 527 or checksums""" 528 with self._i2c as i2c: 529 i2c.write(bytestr) 530 531 @property 532 def in_waiting(self): 533 """Returns number of bytes available in UART read buffer, always 32 534 since I2C does not have the ability to know how much data is available""" 535 return 32 536 537 def readline(self): 538 """Returns a newline terminated bytearray, must have timeout set for 539 the underlying UART or this will block forever!""" 540 timeout = time.monotonic() + self._timeout 541 while timeout > time.monotonic(): 542 # check if our internal buffer has a '\n' termination already 543 if self._internalbuffer and (self._internalbuffer[-1] == ord("\n")): 544 break 545 char = self.read(1) 546 if not char: 547 continue 548 self._internalbuffer.append(char[0]) 549 # print(bytearray(self._internalbuffer)) 550 if self._internalbuffer and self._internalbuffer[-1] == ord("\n"): 551 ret = bytearray(self._internalbuffer) 552 self._internalbuffer = [] # reset the buffer to empty 553 return ret 554 return None # no completed data yet