/ adafruit_rockblock.py
adafruit_rockblock.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2020 Carter Nelson for Adafruit Industries 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `adafruit_rockblock` 24 ================================================================================ 25 26 CircuitPython driver for Rock Seven RockBLOCK Iridium satellite modem 27 28 29 * Author(s): Carter Nelson 30 31 Implementation Notes 32 -------------------- 33 34 **Hardware:** 35 36 * `RockBLOCK 9603 Iridium Satellite Modem <https://www.adafruit.com/product/4521>`_ 37 38 **Software and Dependencies:** 39 40 * Adafruit CircuitPython firmware for the supported boards: 41 https://github.com/adafruit/circuitpython/releases 42 43 """ 44 45 import time 46 import struct 47 48 __version__ = "0.0.0-auto.0" 49 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RockBlock.git" 50 51 52 class RockBlock: 53 """Driver for RockBLOCK Iridium satellite modem.""" 54 55 def __init__(self, uart, baudrate=19200): 56 self._uart = uart 57 self._uart.baudrate = baudrate 58 self._buf_out = None 59 self.reset() 60 61 def _uart_xfer(self, cmd): 62 """Send AT command and return response as tuple of lines read.""" 63 64 self._uart.reset_input_buffer() 65 self._uart.write(str.encode("AT" + cmd + "\r")) 66 67 resp = [] 68 line = self._uart.readline() 69 resp.append(line) 70 while not any(EOM in line for EOM in (b"OK\r\n", b"ERROR\r\n")): 71 line = self._uart.readline() 72 resp.append(line) 73 74 self._uart.reset_input_buffer() 75 76 return tuple(resp) 77 78 def reset(self): 79 """Perform a software reset.""" 80 self._uart_xfer("&F0") # factory defaults 81 self._uart_xfer("&K0") # flow control off 82 83 @property 84 def data_out(self): 85 "The binary data in the outbound buffer." 86 return self._buf_out 87 88 @data_out.setter 89 def data_out(self, buf): 90 if buf is None: 91 # clear the buffer 92 resp = self._uart_xfer("+SBDD0") 93 resp = int(resp[1].strip().decode()) 94 if resp == 1: 95 raise RuntimeError("Error clearing buffer.") 96 else: 97 # set the buffer 98 if len(buf) > 340: 99 raise RuntimeError("Maximum length of 340 bytes.") 100 self._uart.write(str.encode("AT+SBDWB={}\r".format(len(buf)))) 101 line = self._uart.readline() 102 while line != b"READY\r\n": 103 line = self._uart.readline() 104 # binary data plus checksum 105 self._uart.write(buf + struct.pack(">H", sum(buf))) 106 line = self._uart.readline() # blank line 107 line = self._uart.readline() # status response 108 resp = int(line) 109 if resp != 0: 110 raise RuntimeError("Write error", resp) 111 # seems to want some time to digest 112 time.sleep(0.1) 113 self._buf_out = buf 114 115 @property 116 def text_out(self): 117 """The text in the outbound buffer.""" 118 text = None 119 # TODO: add better check for non-text in buffer 120 # pylint: disable=broad-except 121 try: 122 text = self._buf_out.decode() 123 except Exception: 124 pass 125 return text 126 127 @text_out.setter 128 def text_out(self, text): 129 if not isinstance(text, str): 130 raise ValueError("Only strings allowed.") 131 if len(text) > 120: 132 raise ValueError("Text size limited to 120 bytes.") 133 self.data_out = str.encode(text) 134 135 @property 136 def data_in(self): 137 """The binary data in the inbound buffer.""" 138 data = None 139 if self.status[2] == 1: 140 resp = self._uart_xfer("+SBDRB") 141 data = resp[0].splitlines()[1] 142 data = data[2:-2] 143 return data 144 145 @data_in.setter 146 def data_in(self, buf): 147 if buf is not None: 148 raise ValueError("Can only set in buffer to None to clear.") 149 resp = self._uart_xfer("+SBDD1") 150 resp = int(resp[1].strip().decode()) 151 if resp == 1: 152 raise RuntimeError("Error clearing buffer.") 153 154 @property 155 def text_in(self): 156 """The text in the inbound buffer.""" 157 text = None 158 if self.status[2] == 1: 159 resp = self._uart_xfer("+SBDRT") 160 try: 161 text = resp[2].strip().decode() 162 except UnicodeDecodeError: 163 pass 164 return text 165 166 @text_in.setter 167 def text_in(self, text): 168 self.data_in = text 169 170 def satellite_transfer(self, location=None): 171 """Initiate a Short Burst Data transfer with satellites.""" 172 status = (None,) * 6 173 if location: 174 resp = self._uart_xfer("+SBDIX=" + location) 175 else: 176 resp = self._uart_xfer("+SBDIX") 177 if resp[-1].strip().decode() == "OK": 178 status = resp[-3].strip().decode().split(":")[1] 179 status = [int(s) for s in status.split(",")] 180 if status[0] <= 8: 181 # outgoing message sent successfully 182 self.data_out = None 183 return tuple(status) 184 185 @property 186 def status(self): 187 """Return tuple of Short Burst Data status.""" 188 resp = self._uart_xfer("+SBDSX") 189 if resp[-1].strip().decode() == "OK": 190 status = resp[1].strip().decode().split(":")[1] 191 return tuple([int(a) for a in status.split(",")]) 192 return (None,) * 6 193 194 @property 195 def model(self): 196 """Return modem model.""" 197 resp = self._uart_xfer("+GMM") 198 if resp[-1].strip().decode() == "OK": 199 return resp[1].strip().decode() 200 return None 201 202 def _transfer_buffer(self): 203 """Copy out buffer to in buffer to simulate receiving a message.""" 204 self._uart_xfer("+SBDTC")