/ adafruit_rockblock.py
adafruit_rockblock.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2020 Carter Nelson for Adafruit Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_rockblock`
 24  ================================================================================
 25  
 26  CircuitPython driver for Rock Seven RockBLOCK Iridium satellite modem
 27  
 28  
 29  * Author(s): Carter Nelson
 30  
 31  Implementation Notes
 32  --------------------
 33  
 34  **Hardware:**
 35  
 36  * `RockBLOCK 9603 Iridium Satellite Modem <https://www.adafruit.com/product/4521>`_
 37  
 38  **Software and Dependencies:**
 39  
 40  * Adafruit CircuitPython firmware for the supported boards:
 41    https://github.com/adafruit/circuitpython/releases
 42  
 43  """
 44  
 45  import time
 46  import struct
 47  
 48  __version__ = "0.0.0-auto.0"
 49  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RockBlock.git"
 50  
 51  
 52  class RockBlock:
 53      """Driver for RockBLOCK Iridium satellite modem."""
 54  
 55      def __init__(self, uart, baudrate=19200):
 56          self._uart = uart
 57          self._uart.baudrate = baudrate
 58          self._buf_out = None
 59          self.reset()
 60  
 61      def _uart_xfer(self, cmd):
 62          """Send AT command and return response as tuple of lines read."""
 63  
 64          self._uart.reset_input_buffer()
 65          self._uart.write(str.encode("AT" + cmd + "\r"))
 66  
 67          resp = []
 68          line = self._uart.readline()
 69          resp.append(line)
 70          while not any(EOM in line for EOM in (b"OK\r\n", b"ERROR\r\n")):
 71              line = self._uart.readline()
 72              resp.append(line)
 73  
 74          self._uart.reset_input_buffer()
 75  
 76          return tuple(resp)
 77  
 78      def reset(self):
 79          """Perform a software reset."""
 80          self._uart_xfer("&F0")  # factory defaults
 81          self._uart_xfer("&K0")  # flow control off
 82  
 83      @property
 84      def data_out(self):
 85          "The binary data in the outbound buffer."
 86          return self._buf_out
 87  
 88      @data_out.setter
 89      def data_out(self, buf):
 90          if buf is None:
 91              # clear the buffer
 92              resp = self._uart_xfer("+SBDD0")
 93              resp = int(resp[1].strip().decode())
 94              if resp == 1:
 95                  raise RuntimeError("Error clearing buffer.")
 96          else:
 97              # set the buffer
 98              if len(buf) > 340:
 99                  raise RuntimeError("Maximum length of 340 bytes.")
100              self._uart.write(str.encode("AT+SBDWB={}\r".format(len(buf))))
101              line = self._uart.readline()
102              while line != b"READY\r\n":
103                  line = self._uart.readline()
104              # binary data plus checksum
105              self._uart.write(buf + struct.pack(">H", sum(buf)))
106              line = self._uart.readline()  # blank line
107              line = self._uart.readline()  # status response
108              resp = int(line)
109              if resp != 0:
110                  raise RuntimeError("Write error", resp)
111              # seems to want some time to digest
112              time.sleep(0.1)
113          self._buf_out = buf
114  
115      @property
116      def text_out(self):
117          """The text in the outbound buffer."""
118          text = None
119          # TODO: add better check for non-text in buffer
120          # pylint: disable=broad-except
121          try:
122              text = self._buf_out.decode()
123          except Exception:
124              pass
125          return text
126  
127      @text_out.setter
128      def text_out(self, text):
129          if not isinstance(text, str):
130              raise ValueError("Only strings allowed.")
131          if len(text) > 120:
132              raise ValueError("Text size limited to 120 bytes.")
133          self.data_out = str.encode(text)
134  
135      @property
136      def data_in(self):
137          """The binary data in the inbound buffer."""
138          data = None
139          if self.status[2] == 1:
140              resp = self._uart_xfer("+SBDRB")
141              data = resp[0].splitlines()[1]
142              data = data[2:-2]
143          return data
144  
145      @data_in.setter
146      def data_in(self, buf):
147          if buf is not None:
148              raise ValueError("Can only set in buffer to None to clear.")
149          resp = self._uart_xfer("+SBDD1")
150          resp = int(resp[1].strip().decode())
151          if resp == 1:
152              raise RuntimeError("Error clearing buffer.")
153  
154      @property
155      def text_in(self):
156          """The text in the inbound buffer."""
157          text = None
158          if self.status[2] == 1:
159              resp = self._uart_xfer("+SBDRT")
160              try:
161                  text = resp[2].strip().decode()
162              except UnicodeDecodeError:
163                  pass
164          return text
165  
166      @text_in.setter
167      def text_in(self, text):
168          self.data_in = text
169  
170      def satellite_transfer(self, location=None):
171          """Initiate a Short Burst Data transfer with satellites."""
172          status = (None,) * 6
173          if location:
174              resp = self._uart_xfer("+SBDIX=" + location)
175          else:
176              resp = self._uart_xfer("+SBDIX")
177          if resp[-1].strip().decode() == "OK":
178              status = resp[-3].strip().decode().split(":")[1]
179              status = [int(s) for s in status.split(",")]
180              if status[0] <= 8:
181                  # outgoing message sent successfully
182                  self.data_out = None
183          return tuple(status)
184  
185      @property
186      def status(self):
187          """Return tuple of Short Burst Data status."""
188          resp = self._uart_xfer("+SBDSX")
189          if resp[-1].strip().decode() == "OK":
190              status = resp[1].strip().decode().split(":")[1]
191              return tuple([int(a) for a in status.split(",")])
192          return (None,) * 6
193  
194      @property
195      def model(self):
196          """Return modem model."""
197          resp = self._uart_xfer("+GMM")
198          if resp[-1].strip().decode() == "OK":
199              return resp[1].strip().decode()
200          return None
201  
202      def _transfer_buffer(self):
203          """Copy out buffer to in buffer to simulate receiving a message."""
204          self._uart_xfer("+SBDTC")