PipeInterface.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 from RNS.Interfaces.Interface import Interface 32 from time import sleep 33 import sys 34 import threading 35 import time 36 import RNS 37 38 import subprocess 39 import shlex 40 41 class HDLC(): 42 # The Pipe Interface packetizes data using 43 # simplified HDLC framing, similar to PPP 44 FLAG = 0x7E 45 ESC = 0x7D 46 ESC_MASK = 0x20 47 48 @staticmethod 49 def escape(data): 50 data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK])) 51 data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK])) 52 return data 53 54 class PipeInterface(Interface): 55 MAX_CHUNK = 32768 56 BITRATE_GUESS = 1*1000*1000 57 DEFAULT_IFAC_SIZE = 8 58 59 owner = None 60 command = None 61 62 def __init__(self, owner, configuration): 63 super().__init__() 64 65 c = Interface.get_config_obj(configuration) 66 name = c["name"] 67 command = c["command"] if "command" in c else None 68 respawn_delay = c.as_float("respawn_delay") if "respawn_delay" in c else None 69 70 if command == None: 71 raise ValueError("No command specified for PipeInterface") 72 73 if respawn_delay == None: 74 respawn_delay = 5 75 76 self.HW_MTU = 1064 77 78 self.owner = owner 79 self.name = name 80 self.command = command 81 self.process = None 82 self.timeout = 100 83 self.online = False 84 self.pipe_is_open = False 85 self.bitrate = PipeInterface.BITRATE_GUESS 86 self.respawn_delay = respawn_delay 87 88 try: 89 self.open_pipe() 90 91 except Exception as e: 92 RNS.log("Could connect pipe for interface "+str(self), RNS.LOG_ERROR) 93 raise e 94 95 if self.pipe_is_open: 96 self.configure_pipe() 97 else: 98 raise IOError("Could not connect pipe") 99 100 101 def open_pipe(self): 102 RNS.log("Connecting subprocess pipe for "+str(self)+"...", RNS.LOG_VERBOSE) 103 104 try: 105 self.process = subprocess.Popen(shlex.split(self.command), stdin=subprocess.PIPE, stdout=subprocess.PIPE) 106 self.pipe_is_open = True 107 except Exception as e: 108 raise e 109 self.pipe_is_open = False 110 111 112 def configure_pipe(self): 113 sleep(0.01) 114 thread = threading.Thread(target=self.readLoop) 115 thread.daemon = True 116 thread.start() 117 self.online = True 118 RNS.log("Subprocess pipe for "+str(self)+" is now connected", RNS.LOG_VERBOSE) 119 120 121 def process_incoming(self, data): 122 self.rxb += len(data) 123 self.owner.inbound(data, self) 124 125 126 def process_outgoing(self,data): 127 if self.online: 128 data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) 129 written = self.process.stdin.write(data) 130 self.process.stdin.flush() 131 self.txb += len(data) 132 if written != len(data): 133 raise IOError("Pipe interface only wrote "+str(written)+" bytes of "+str(len(data))) 134 135 136 def readLoop(self): 137 try: 138 in_frame = False 139 escape = False 140 data_buffer = b"" 141 last_read_ms = int(time.time()*1000) 142 143 while True: 144 process_output = self.process.stdout.read(1) 145 if len(process_output) == 0 and self.process.poll() is not None: 146 break 147 148 else: 149 byte = ord(process_output) 150 last_read_ms = int(time.time()*1000) 151 152 if (in_frame and byte == HDLC.FLAG): 153 in_frame = False 154 self.process_incoming(data_buffer) 155 elif (byte == HDLC.FLAG): 156 in_frame = True 157 data_buffer = b"" 158 elif (in_frame and len(data_buffer) < self.HW_MTU): 159 if (byte == HDLC.ESC): 160 escape = True 161 else: 162 if (escape): 163 if (byte == HDLC.FLAG ^ HDLC.ESC_MASK): 164 byte = HDLC.FLAG 165 if (byte == HDLC.ESC ^ HDLC.ESC_MASK): 166 byte = HDLC.ESC 167 escape = False 168 data_buffer = data_buffer+bytes([byte]) 169 170 RNS.log("Subprocess terminated on "+str(self)) 171 self.process.kill() 172 173 except Exception as e: 174 self.online = False 175 try: 176 self.process.kill() 177 except Exception as e: 178 pass 179 180 RNS.log("A pipe error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) 181 RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is now offline.", RNS.LOG_ERROR) 182 183 if RNS.Reticulum.panic_on_interface_error: 184 RNS.panic() 185 186 RNS.log("Reticulum will attempt to reconnect the interface periodically.", RNS.LOG_ERROR) 187 188 self.online = False 189 self.reconnect_pipe() 190 191 def reconnect_pipe(self): 192 while not self.online: 193 try: 194 time.sleep(self.respawn_delay) 195 RNS.log("Attempting to respawn subprocess for "+str(self)+"...", RNS.LOG_VERBOSE) 196 self.open_pipe() 197 if self.pipe_is_open: 198 self.configure_pipe() 199 except Exception as e: 200 RNS.log("Error while spawning subprocess, the contained exception was: "+str(e), RNS.LOG_ERROR) 201 202 RNS.log("Reconnected pipe for "+str(self)) 203 204 def __str__(self): 205 return "PipeInterface["+self.name+"]"