/ RNS / Interfaces / Android / SerialInterface.py
SerialInterface.py
  1  # Reticulum License
  2  #
  3  # Copyright (c) 2016-2025 Mark Qvist
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # - The Software shall not be used in any kind of system which includes amongst
 13  #   its functions the ability to purposefully do harm to human beings.
 14  #
 15  # - The Software shall not be used, directly or indirectly, in the creation of
 16  #   an artificial intelligence, machine learning or language model training
 17  #   dataset, including but not limited to any use that contributes to the
 18  #   training or development of such a model or algorithm.
 19  #
 20  # - The above copyright notice and this permission notice shall be included in
 21  #   all copies or substantial portions of the Software.
 22  #
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 29  # SOFTWARE.
 30  
 31  from RNS.Interfaces.Interface import Interface
 32  from time import sleep
 33  import sys
 34  import threading
 35  import time
 36  import RNS
 37  
 38  class HDLC():
 39      # The Serial Interface packetizes data using
 40      # simplified HDLC framing, similar to PPP
 41      FLAG              = 0x7E
 42      ESC               = 0x7D
 43      ESC_MASK          = 0x20
 44  
 45      @staticmethod
 46      def escape(data):
 47          data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK]))
 48          data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK]))
 49          return data
 50  
 51  class SerialInterface(Interface):
 52      MAX_CHUNK = 32768
 53      DEFAULT_IFAC_SIZE = 8
 54  
 55      owner    = None
 56      port     = None
 57      speed    = None
 58      databits = None
 59      parity   = None
 60      stopbits = None
 61      serial   = None
 62  
 63      def __init__(self, owner, configuration):
 64          import importlib.util
 65          if RNS.vendor.platformutils.is_android():
 66              self.on_android  = True
 67              if importlib.util.find_spec('usbserial4a') != None:
 68                  if importlib.util.find_spec('jnius') == None:
 69                      RNS.log("Could not load jnius API wrapper for Android, Serial interface cannot be created.", RNS.LOG_CRITICAL)
 70                      RNS.log("This probably means you are trying to use an USB-based interface from within Termux or similar.", RNS.LOG_CRITICAL)
 71                      RNS.log("This is currently not possible, due to this environment limiting access to the native Android APIs.", RNS.LOG_CRITICAL)
 72                      RNS.panic()
 73  
 74                  from usbserial4a import serial4a as serial
 75                  self.parity = "N"
 76              
 77              else:
 78                  RNS.log("Could not load USB serial module for Android, Serial interface cannot be created.", RNS.LOG_CRITICAL)
 79                  RNS.log("You can install this module by issuing: pip install usbserial4a", RNS.LOG_CRITICAL)
 80                  RNS.panic()
 81          else:
 82              raise SystemError("Android-specific interface was used on non-Android OS")
 83  
 84          super().__init__()
 85  
 86          c = Interface.get_config_obj(configuration)
 87          name = c["name"]
 88          port = c["port"] if "port" in c else None
 89          speed = int(c["speed"]) if "speed" in c else 9600
 90          databits = int(c["databits"]) if "databits" in c else 8
 91          parity = c["parity"] if "parity" in c else "N"
 92          stopbits = int(c["stopbits"]) if "stopbits" in c else 1
 93  
 94          if port == None:
 95              raise ValueError("No port specified for serial interface")
 96  
 97          self.HW_MTU = 564
 98          
 99          self.pyserial = serial
100          self.serial   = None
101          self.owner    = owner
102          self.name     = name
103          self.port     = port
104          self.speed    = speed
105          self.databits = databits
106          self.parity   = "N"
107          self.stopbits = stopbits
108          self.timeout  = 100
109          self.online   = False
110          self.bitrate  = self.speed
111  
112          if parity.lower() == "e" or parity.lower() == "even":
113              self.parity = "E"
114  
115          if parity.lower() == "o" or parity.lower() == "odd":
116              self.parity = "O"
117  
118          try:
119              self.open_port()
120          except Exception as e:
121              RNS.log("Could not open serial port for interface "+str(self), RNS.LOG_ERROR)
122              raise e
123  
124          if self.serial.is_open:
125              self.configure_device()
126          else:
127              raise IOError("Could not open serial port")
128  
129  
130      def open_port(self):
131          RNS.log("Opening serial port "+self.port+"...")
132          # Get device parameters
133          from usb4a import usb
134          device = usb.get_usb_device(self.port)
135          if device:
136              vid = device.getVendorId()
137              pid = device.getProductId()
138  
139              # Driver overrides for speficic chips
140              proxy = self.pyserial.get_serial_port
141              if vid == 0x1A86 and pid == 0x55D4:
142                  # Force CDC driver for Qinheng CH34x
143                  RNS.log(str(self)+" using CDC driver for "+RNS.hexrep(vid)+":"+RNS.hexrep(pid), RNS.LOG_DEBUG)
144                  from usbserial4a.cdcacmserial4a import CdcAcmSerial
145                  proxy = CdcAcmSerial
146  
147              self.serial = proxy(
148                  self.port,
149                  baudrate = self.speed,
150                  bytesize = self.databits,
151                  parity = self.parity,
152                  stopbits = self.stopbits,
153                  xonxoff = False,
154                  rtscts = False,
155                  timeout = None,
156                  inter_byte_timeout = None,
157                  # write_timeout = wtimeout,
158                  dsrdtr = False,
159              )
160  
161              if vid == 0x0403:
162                  # Hardware parameters for FTDI devices @ 115200 baud
163                  self.serial.DEFAULT_READ_BUFFER_SIZE = 16 * 1024
164                  self.serial.USB_READ_TIMEOUT_MILLIS = 100
165                  self.serial.timeout = 0.1
166              elif vid == 0x10C4:
167                  # Hardware parameters for SiLabs CP210x @ 115200 baud
168                  self.serial.DEFAULT_READ_BUFFER_SIZE = 64 
169                  self.serial.USB_READ_TIMEOUT_MILLIS = 12
170                  self.serial.timeout = 0.012
171              elif vid == 0x1A86 and pid == 0x55D4:
172                  # Hardware parameters for Qinheng CH34x @ 115200 baud
173                  self.serial.DEFAULT_READ_BUFFER_SIZE = 64
174                  self.serial.USB_READ_TIMEOUT_MILLIS = 12
175                  self.serial.timeout = 0.1
176              else:
177                  # Default values
178                  self.serial.DEFAULT_READ_BUFFER_SIZE = 1 * 1024
179                  self.serial.USB_READ_TIMEOUT_MILLIS = 100
180                  self.serial.timeout = 0.1
181  
182              RNS.log(str(self)+" USB read buffer size set to "+RNS.prettysize(self.serial.DEFAULT_READ_BUFFER_SIZE), RNS.LOG_DEBUG)
183              RNS.log(str(self)+" USB read timeout set to "+str(self.serial.USB_READ_TIMEOUT_MILLIS)+"ms", RNS.LOG_DEBUG)
184              RNS.log(str(self)+" USB write timeout set to "+str(self.serial.USB_WRITE_TIMEOUT_MILLIS)+"ms", RNS.LOG_DEBUG)
185  
186      def configure_device(self):
187          sleep(0.5)
188          thread = threading.Thread(target=self.readLoop)
189          thread.daemon = True
190          thread.start()
191          self.online = True
192          RNS.log("Serial port "+self.port+" is now open", RNS.LOG_VERBOSE)
193  
194  
195      def process_incoming(self, data):
196          self.rxb += len(data)
197          def af():
198              self.owner.inbound(data, self)
199          threading.Thread(target=af, daemon=True).start()
200  
201      def process_outgoing(self,data):
202          if self.online:
203              data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
204              written = self.serial.write(data)
205              self.txb += len(data)            
206              if written != len(data):
207                  raise IOError("Serial interface only wrote "+str(written)+" bytes of "+str(len(data)))
208  
209      def readLoop(self):
210          try:
211              in_frame = False
212              escape = False
213              data_buffer = b""
214              last_read_ms = int(time.time()*1000)
215  
216              while self.serial.is_open:
217                  serial_bytes = self.serial.read()
218                  got = len(serial_bytes)
219  
220                  for byte in serial_bytes:
221                      last_read_ms = int(time.time()*1000)
222  
223                      if (in_frame and byte == HDLC.FLAG):
224                          in_frame = False
225                          self.process_incoming(data_buffer)
226                      elif (byte == HDLC.FLAG):
227                          in_frame = True
228                          data_buffer = b""
229                      elif (in_frame and len(data_buffer) < self.HW_MTU):
230                          if (byte == HDLC.ESC):
231                              escape = True
232                          else:
233                              if (escape):
234                                  if (byte == HDLC.FLAG ^ HDLC.ESC_MASK):
235                                      byte = HDLC.FLAG
236                                  if (byte == HDLC.ESC  ^ HDLC.ESC_MASK):
237                                      byte = HDLC.ESC
238                                  escape = False
239                              data_buffer = data_buffer+bytes([byte])
240                          
241                  if got == 0:
242                      time_since_last = int(time.time()*1000) - last_read_ms
243                      if len(data_buffer) > 0 and time_since_last > self.timeout:
244                          data_buffer = b""
245                          in_frame = False
246                          escape = False
247                      # sleep(0.08)
248                      
249          except Exception as e:
250              self.online = False
251              RNS.log("A serial port error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR)
252              RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is now offline.", RNS.LOG_ERROR)
253              
254              if RNS.Reticulum.panic_on_interface_error:
255                  RNS.panic()
256  
257              RNS.log("Reticulum will attempt to reconnect the interface periodically.", RNS.LOG_ERROR)
258  
259          self.online = False
260          self.serial.close()
261          self.reconnect_port()
262  
263      def reconnect_port(self):
264          while not self.online:
265              try:
266                  time.sleep(5)
267                  RNS.log("Attempting to reconnect serial port "+str(self.port)+" for "+str(self)+"...", RNS.LOG_VERBOSE)
268                  self.open_port()
269                  if self.serial.is_open:
270                      self.configure_device()
271              except Exception as e:
272                  RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR)
273  
274          RNS.log("Reconnected serial port for "+str(self))
275  
276      def should_ingress_limit(self):
277          return False
278  
279      def __str__(self):
280          return "SerialInterface["+self.name+"]"