/ RNS / Interfaces / PipeInterface.py
PipeInterface.py
  1  # Reticulum License
  2  #
  3  # Copyright (c) 2016-2025 Mark Qvist
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # - The Software shall not be used in any kind of system which includes amongst
 13  #   its functions the ability to purposefully do harm to human beings.
 14  #
 15  # - The Software shall not be used, directly or indirectly, in the creation of
 16  #   an artificial intelligence, machine learning or language model training
 17  #   dataset, including but not limited to any use that contributes to the
 18  #   training or development of such a model or algorithm.
 19  #
 20  # - The above copyright notice and this permission notice shall be included in
 21  #   all copies or substantial portions of the Software.
 22  #
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 29  # SOFTWARE.
 30  
 31  from RNS.Interfaces.Interface import Interface
 32  from time import sleep
 33  import sys
 34  import threading
 35  import time
 36  import RNS
 37  
 38  import subprocess
 39  import shlex
 40  
 41  class HDLC():
 42      # The Pipe Interface packetizes data using
 43      # simplified HDLC framing, similar to PPP
 44      FLAG              = 0x7E
 45      ESC               = 0x7D
 46      ESC_MASK          = 0x20
 47  
 48      @staticmethod
 49      def escape(data):
 50          data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK]))
 51          data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK]))
 52          return data
 53  
 54  class PipeInterface(Interface):
 55      MAX_CHUNK = 32768
 56      BITRATE_GUESS = 1*1000*1000
 57      DEFAULT_IFAC_SIZE = 8
 58  
 59      owner    = None
 60      command  = None
 61      
 62      def __init__(self, owner, configuration):
 63          super().__init__()
 64  
 65          c = Interface.get_config_obj(configuration)
 66          name = c["name"]
 67          command = c["command"] if "command" in c else None
 68          respawn_delay = c.as_float("respawn_delay") if "respawn_delay" in c else None
 69  
 70          if command == None:
 71              raise ValueError("No command specified for PipeInterface")
 72  
 73          if respawn_delay == None:
 74              respawn_delay = 5
 75  
 76          self.HW_MTU = 1064
 77          
 78          self.owner    = owner
 79          self.name     = name
 80          self.command  = command
 81          self.process  = None
 82          self.timeout  = 100
 83          self.online   = False
 84          self.pipe_is_open = False
 85          self.bitrate  = PipeInterface.BITRATE_GUESS
 86          self.respawn_delay = respawn_delay
 87  
 88          try:
 89              self.open_pipe()
 90  
 91          except Exception as e:
 92              RNS.log("Could connect pipe for interface "+str(self), RNS.LOG_ERROR)
 93              raise e
 94  
 95          if self.pipe_is_open:
 96              self.configure_pipe()
 97          else:
 98              raise IOError("Could not connect pipe")
 99  
100  
101      def open_pipe(self):
102          RNS.log("Connecting subprocess pipe for "+str(self)+"...", RNS.LOG_VERBOSE)
103          
104          try:
105              self.process = subprocess.Popen(shlex.split(self.command), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
106              self.pipe_is_open = True
107          except Exception as e:
108              raise e
109              self.pipe_is_open = False
110  
111  
112      def configure_pipe(self):
113          sleep(0.01)
114          thread = threading.Thread(target=self.readLoop)
115          thread.daemon = True
116          thread.start()
117          self.online = True
118          RNS.log("Subprocess pipe for "+str(self)+" is now connected", RNS.LOG_VERBOSE)
119  
120  
121      def process_incoming(self, data):
122          self.rxb += len(data)            
123          self.owner.inbound(data, self)
124  
125  
126      def process_outgoing(self,data):
127          if self.online:
128              data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
129              written = self.process.stdin.write(data)
130              self.process.stdin.flush()
131              self.txb += len(data)            
132              if written != len(data):
133                  raise IOError("Pipe interface only wrote "+str(written)+" bytes of "+str(len(data)))
134  
135  
136      def readLoop(self):
137          try:
138              in_frame = False
139              escape = False
140              data_buffer = b""
141              last_read_ms = int(time.time()*1000)
142  
143              while True:
144                  process_output = self.process.stdout.read(1)
145                  if len(process_output) == 0 and self.process.poll() is not None:
146                      break
147  
148                  else:
149                      byte = ord(process_output)
150                      last_read_ms = int(time.time()*1000)
151  
152                      if (in_frame and byte == HDLC.FLAG):
153                          in_frame = False
154                          self.process_incoming(data_buffer)
155                      elif (byte == HDLC.FLAG):
156                          in_frame = True
157                          data_buffer = b""
158                      elif (in_frame and len(data_buffer) < self.HW_MTU):
159                          if (byte == HDLC.ESC):
160                              escape = True
161                          else:
162                              if (escape):
163                                  if (byte == HDLC.FLAG ^ HDLC.ESC_MASK):
164                                      byte = HDLC.FLAG
165                                  if (byte == HDLC.ESC  ^ HDLC.ESC_MASK):
166                                      byte = HDLC.ESC
167                                  escape = False
168                              data_buffer = data_buffer+bytes([byte])
169  
170              RNS.log("Subprocess terminated on "+str(self))
171              self.process.kill()
172                      
173          except Exception as e:
174              self.online = False
175              try:
176                  self.process.kill()
177              except Exception as e:
178                  pass
179  
180              RNS.log("A pipe error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR)
181              RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is now offline.", RNS.LOG_ERROR)
182              
183              if RNS.Reticulum.panic_on_interface_error:
184                  RNS.panic()
185  
186              RNS.log("Reticulum will attempt to reconnect the interface periodically.", RNS.LOG_ERROR)
187  
188          self.online = False
189          self.reconnect_pipe()
190  
191      def reconnect_pipe(self):
192          while not self.online:
193              try:
194                  time.sleep(self.respawn_delay)
195                  RNS.log("Attempting to respawn subprocess for "+str(self)+"...", RNS.LOG_VERBOSE)
196                  self.open_pipe()
197                  if self.pipe_is_open:
198                      self.configure_pipe()
199              except Exception as e:
200                  RNS.log("Error while spawning subprocess, the contained exception was: "+str(e), RNS.LOG_ERROR)
201  
202          RNS.log("Reconnected pipe for "+str(self))
203  
204      def __str__(self):
205          return "PipeInterface["+self.name+"]"