/ RNS / Interfaces / AX25KISSInterface.py
AX25KISSInterface.py
  1  # Reticulum License
  2  #
  3  # Copyright (c) 2016-2025 Mark Qvist
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # - The Software shall not be used in any kind of system which includes amongst
 13  #   its functions the ability to purposefully do harm to human beings.
 14  #
 15  # - The Software shall not be used, directly or indirectly, in the creation of
 16  #   an artificial intelligence, machine learning or language model training
 17  #   dataset, including but not limited to any use that contributes to the
 18  #   training or development of such a model or algorithm.
 19  #
 20  # - The above copyright notice and this permission notice shall be included in
 21  #   all copies or substantial portions of the Software.
 22  #
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 29  # SOFTWARE.
 30  
 31  from RNS.Interfaces.Interface import Interface
 32  from time import sleep
 33  import sys
 34  import threading
 35  import time
 36  import RNS
 37  
 38  class KISS():
 39      FEND              = 0xC0
 40      FESC              = 0xDB
 41      TFEND             = 0xDC
 42      TFESC             = 0xDD
 43      CMD_UNKNOWN       = 0xFE
 44      CMD_DATA          = 0x00
 45      CMD_TXDELAY       = 0x01
 46      CMD_P             = 0x02
 47      CMD_SLOTTIME      = 0x03
 48      CMD_TXTAIL        = 0x04
 49      CMD_FULLDUPLEX    = 0x05
 50      CMD_SETHARDWARE   = 0x06
 51      CMD_READY         = 0x0F
 52      CMD_RETURN        = 0xFF
 53  
 54      @staticmethod
 55      def escape(data):
 56          data = data.replace(bytes([0xdb]), bytes([0xdb, 0xdd]))
 57          data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc]))
 58          return data
 59  
 60  class AX25():
 61      PID_NOLAYER3    = 0xF0
 62      CTRL_UI         = 0x03
 63      CRC_CORRECT     = bytes([0xF0])+bytes([0xB8])
 64      HEADER_SIZE     = 16
 65  
 66  
 67  class AX25KISSInterface(Interface):
 68      MAX_CHUNK = 32768
 69      BITRATE_GUESS = 1200
 70      DEFAULT_IFAC_SIZE = 8
 71  
 72      owner    = None
 73      port     = None
 74      speed    = None
 75      databits = None
 76      parity   = None
 77      stopbits = None
 78      serial   = None
 79  
 80      def __init__(self, owner, configuration):
 81          import importlib.util
 82          if importlib.util.find_spec('serial') != None:
 83              import serial
 84          else:
 85              RNS.log("Using the AX.25 KISS interface requires a serial communication module to be installed.", RNS.LOG_CRITICAL)
 86              RNS.log("You can install one with the command: python3 -m pip install pyserial", RNS.LOG_CRITICAL)
 87              RNS.panic()
 88  
 89          super().__init__()
 90  
 91          c = Interface.get_config_obj(configuration)
 92          name = c["name"]
 93          preamble = int(c["preamble"]) if "preamble" in c else None
 94          txtail = int(c["txtail"]) if "txtail" in c else None
 95          persistence = int(c["persistence"]) if "persistence" in c else None
 96          slottime = int(c["slottime"]) if "slottime" in c else None
 97          flow_control = c.as_bool("flow_control") if "flow_control" in c else False
 98          port = c["port"] if "port" in c else None
 99          speed = int(c["speed"]) if "speed" in c else 9600
100          databits = int(c["databits"]) if "databits" in c else 8
101          parity = c["parity"] if "parity" in c else "N"
102          stopbits = int(c["stopbits"]) if "stopbits" in c else 1
103  
104          callsign = c["callsign"] if "callsign" in c else ""
105          ssid = int(c["ssid"]) if "ssid" in c else -1
106  
107          if port == None:
108              raise ValueError("No port specified for serial interface")
109  
110          self.HW_MTU = 564
111          
112          self.pyserial = serial
113          self.serial   = None
114          self.owner    = owner
115          self.name     = name
116          self.src_call = callsign.upper().encode("ascii")
117          self.src_ssid = ssid
118          self.dst_call = "APZRNS".encode("ascii")
119          self.dst_ssid = 0
120          self.port     = port
121          self.speed    = speed
122          self.databits = databits
123          self.parity   = serial.PARITY_NONE
124          self.stopbits = stopbits
125          self.timeout  = 100
126          self.online   = False
127          self.bitrate  = AX25KISSInterface.BITRATE_GUESS
128  
129          self.packet_queue    = []
130          self.flow_control    = flow_control
131          self.interface_ready = False
132          self.flow_control_timeout = 5
133          self.flow_control_locked  = time.time()
134  
135          if (len(self.src_call) < 3 or len(self.src_call) > 6):
136              raise ValueError("Invalid callsign for "+str(self))
137  
138          if (self.src_ssid < 0 or self.src_ssid > 15):
139              raise ValueError("Invalid SSID for "+str(self))
140  
141          self.preamble    = preamble if preamble != None else 350;
142          self.txtail      = txtail if txtail != None else 20;
143          self.persistence = persistence if persistence != None else 64;
144          self.slottime    = slottime if slottime != None else 20;
145  
146          if parity.lower() == "e" or parity.lower() == "even":
147              self.parity = serial.PARITY_EVEN
148  
149          if parity.lower() == "o" or parity.lower() == "odd":
150              self.parity = serial.PARITY_ODD
151  
152          try:
153              self.open_port()
154          except Exception as e:
155              RNS.log("Could not open serial port for interface "+str(self), RNS.LOG_ERROR)
156              raise e
157  
158          if self.serial.is_open:
159              self.configure_device()
160          else:
161              raise IOError("Could not open serial port")
162  
163      def open_port(self):
164          RNS.log("Opening serial port "+self.port+"...", RNS.LOG_VERBOSE)
165          self.serial = self.pyserial.Serial(
166              port = self.port,
167              baudrate = self.speed,
168              bytesize = self.databits,
169              parity = self.parity,
170              stopbits = self.stopbits,
171              xonxoff = False,
172              rtscts = False,
173              timeout = 0,
174              inter_byte_timeout = None,
175              write_timeout = None,
176              dsrdtr = False,
177          )
178  
179      def configure_device(self):
180          # Allow time for interface to initialise before config
181          sleep(2.0)
182          thread = threading.Thread(target=self.readLoop)
183          thread.daemon = True
184          thread.start()
185          self.online = True
186          RNS.log("Serial port "+self.port+" is now open")
187          RNS.log("Configuring AX.25 KISS interface parameters...")
188          self.setPreamble(self.preamble)
189          self.setTxTail(self.txtail)
190          self.setPersistence(self.persistence)
191          self.setSlotTime(self.slottime)
192          self.setFlowControl(self.flow_control)
193          self.interface_ready = True
194          RNS.log("AX.25 KISS interface configured")
195  
196      def setPreamble(self, preamble):
197          preamble_ms = preamble
198          preamble = int(preamble_ms / 10)
199          if preamble < 0:
200              preamble = 0
201          if preamble > 255:
202              preamble = 255
203  
204          kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_TXDELAY])+bytes([preamble])+bytes([KISS.FEND])
205          written = self.serial.write(kiss_command)
206          if written != len(kiss_command):
207              raise IOError("Could not configure AX.25 KISS interface preamble to "+str(preamble_ms)+" (command value "+str(preamble)+")")
208  
209      def setTxTail(self, txtail):
210          txtail_ms = txtail
211          txtail = int(txtail_ms / 10)
212          if txtail < 0:
213              txtail = 0
214          if txtail > 255:
215              txtail = 255
216  
217          kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_TXTAIL])+bytes([txtail])+bytes([KISS.FEND])
218          written = self.serial.write(kiss_command)
219          if written != len(kiss_command):
220              raise IOError("Could not configure AX.25 KISS interface TX tail to "+str(txtail_ms)+" (command value "+str(txtail)+")")
221  
222      def setPersistence(self, persistence):
223          if persistence < 0:
224              persistence = 0
225          if persistence > 255:
226              persistence = 255
227  
228          kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_P])+bytes([persistence])+bytes([KISS.FEND])
229          written = self.serial.write(kiss_command)
230          if written != len(kiss_command):
231              raise IOError("Could not configure AX.25 KISS interface persistence to "+str(persistence))
232  
233      def setSlotTime(self, slottime):
234          slottime_ms = slottime
235          slottime = int(slottime_ms / 10)
236          if slottime < 0:
237              slottime = 0
238          if slottime > 255:
239              slottime = 255
240  
241          kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_SLOTTIME])+bytes([slottime])+bytes([KISS.FEND])
242          written = self.serial.write(kiss_command)
243          if written != len(kiss_command):
244              raise IOError("Could not configure AX.25 KISS interface slot time to "+str(slottime_ms)+" (command value "+str(slottime)+")")
245  
246      def setFlowControl(self, flow_control):
247          kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_READY])+bytes([0x01])+bytes([KISS.FEND])
248          written = self.serial.write(kiss_command)
249          if written != len(kiss_command):
250              if (flow_control):
251                  raise IOError("Could not enable AX.25 KISS interface flow control")
252              else:
253                  raise IOError("Could not enable AX.25 KISS interface flow control")
254  
255  
256      def process_incoming(self, data):
257          if (len(data) > AX25.HEADER_SIZE):
258              self.rxb += len(data)
259              self.owner.inbound(data[AX25.HEADER_SIZE:], self)
260  
261  
262      def process_outgoing(self,data):
263          datalen = len(data)
264          if self.online:
265              if self.interface_ready:
266                  if self.flow_control:
267                      self.interface_ready = False
268                      self.flow_control_locked = time.time()
269  
270                  encoded_dst_ssid = bytes([0x60 | (self.dst_ssid << 1)])
271                  encoded_src_ssid = bytes([0x60 | (self.src_ssid << 1) | 0x01])
272  
273                  addr = b""
274  
275                  for i in range(0,6):
276                      if (i < len(self.dst_call)):
277                          addr += bytes([self.dst_call[i]<<1])
278                      else:
279                          addr += bytes([0x20])
280                  addr += encoded_dst_ssid
281  
282                  for i in range(0,6):
283                      if (i < len(self.src_call)):
284                          addr += bytes([self.src_call[i]<<1])
285                      else:
286                          addr += bytes([0x20])
287                  addr += encoded_src_ssid
288  
289                  data = addr+bytes([AX25.CTRL_UI])+bytes([AX25.PID_NOLAYER3])+data
290  
291                  data = data.replace(bytes([0xdb]), bytes([0xdb])+bytes([0xdd]))
292                  data = data.replace(bytes([0xc0]), bytes([0xdb])+bytes([0xdc]))
293                  kiss_frame = bytes([KISS.FEND])+bytes([0x00])+data+bytes([KISS.FEND])
294  
295                  written = self.serial.write(kiss_frame)
296                  self.txb += datalen
297  
298                  if written != len(kiss_frame):
299                      if self.flow_control:
300                          self.interface_ready = True
301                      raise IOError("AX.25 interface only wrote "+str(written)+" bytes of "+str(len(kiss_frame)))
302              else:
303                  self.queue(data)
304  
305      def queue(self, data):
306          self.packet_queue.append(data)
307  
308      def process_queue(self):
309          if len(self.packet_queue) > 0:
310              data = self.packet_queue.pop(0)
311              self.interface_ready = True
312              self.process_outgoing(data)
313          elif len(self.packet_queue) == 0:
314              self.interface_ready = True
315  
316      def readLoop(self):
317          try:
318              in_frame = False
319              escape = False
320              command = KISS.CMD_UNKNOWN
321              data_buffer = b""
322              last_read_ms = int(time.time()*1000)
323  
324              while self.serial.is_open:
325                  if self.serial.in_waiting:
326                      byte = ord(self.serial.read(1))
327                      last_read_ms = int(time.time()*1000)
328  
329                      if (in_frame and byte == KISS.FEND and command == KISS.CMD_DATA):
330                          in_frame = False
331                          self.process_incoming(data_buffer)
332                      elif (byte == KISS.FEND):
333                          in_frame = True
334                          command = KISS.CMD_UNKNOWN
335                          data_buffer = b""
336                      elif (in_frame and len(data_buffer) < self.HW_MTU+AX25.HEADER_SIZE):
337                          if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN):
338                              # We only support one HDLC port for now, so
339                              # strip off the port nibble
340                              byte = byte & 0x0F
341                              command = byte
342                          elif (command == KISS.CMD_DATA):
343                              if (byte == KISS.FESC):
344                                  escape = True
345                              else:
346                                  if (escape):
347                                      if (byte == KISS.TFEND):
348                                          byte = KISS.FEND
349                                      if (byte == KISS.TFESC):
350                                          byte = KISS.FESC
351                                      escape = False
352                                  data_buffer = data_buffer+bytes([byte])
353                          elif (command == KISS.CMD_READY):
354                              self.process_queue()
355                  else:
356                      time_since_last = int(time.time()*1000) - last_read_ms
357                      if len(data_buffer) > 0 and time_since_last > self.timeout:
358                          data_buffer = b""
359                          in_frame = False
360                          command = KISS.CMD_UNKNOWN
361                          escape = False
362                      sleep(0.05)
363  
364                      if self.flow_control:
365                          if not self.interface_ready:
366                              if time.time() > self.flow_control_locked + self.flow_control_timeout:
367                                  RNS.log("Interface "+str(self)+" is unlocking flow control due to time-out. This should not happen. Your hardware might have missed a flow-control READY command, or maybe it does not support flow-control.", RNS.LOG_WARNING)
368                                  self.process_queue()
369  
370          except Exception as e:
371              self.online = False
372              RNS.log("A serial port error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR)
373              RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is now offline.", RNS.LOG_ERROR)
374              
375              if RNS.Reticulum.panic_on_interface_error:
376                  RNS.panic()
377  
378              RNS.log("Reticulum will attempt to reconnect the interface periodically.", RNS.LOG_ERROR)
379  
380          self.online = False
381          self.serial.close()
382          self.reconnect_port()
383  
384      def reconnect_port(self):
385          while not self.online:
386              try:
387                  time.sleep(5)
388                  RNS.log("Attempting to reconnect serial port "+str(self.port)+" for "+str(self)+"...", RNS.LOG_VERBOSE)
389                  self.open_port()
390                  if self.serial.is_open:
391                      self.configure_device()
392              except Exception as e:
393                  RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR)
394  
395          RNS.log("Reconnected serial port for "+str(self))
396  
397      def should_ingress_limit(self):
398          return False
399  
400      def __str__(self):
401          return "AX25KISSInterface["+self.name+"]"