/ RNS / Interfaces / LocalInterface.py
LocalInterface.py
  1  # Reticulum License
  2  #
  3  # Copyright (c) 2016-2025 Mark Qvist
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # - The Software shall not be used in any kind of system which includes amongst
 13  #   its functions the ability to purposefully do harm to human beings.
 14  #
 15  # - The Software shall not be used, directly or indirectly, in the creation of
 16  #   an artificial intelligence, machine learning or language model training
 17  #   dataset, including but not limited to any use that contributes to the
 18  #   training or development of such a model or algorithm.
 19  #
 20  # - The above copyright notice and this permission notice shall be included in
 21  #   all copies or substantial portions of the Software.
 22  #
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 29  # SOFTWARE.
 30  
 31  from RNS.Interfaces.Interface import Interface
 32  from RNS.Interfaces.BackboneInterface import BackboneInterface
 33  import socketserver
 34  import threading
 35  import socket
 36  import time
 37  import sys
 38  import os
 39  import RNS
 40  from threading import Lock
 41  
 42  class HDLC():
 43      FLAG              = 0x7E
 44      ESC               = 0x7D
 45      ESC_MASK          = 0x20
 46  
 47      @staticmethod
 48      def escape(data):
 49          data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK]))
 50          data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK]))
 51          return data
 52  
 53  class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
 54      def server_bind(self):
 55          if RNS.vendor.platformutils.is_windows():
 56              self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
 57          else:
 58              self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 59          self.socket.bind(self.server_address)
 60          self.server_address = self.socket.getsockname()
 61  
 62  class LocalClientInterface(Interface):
 63      RECONNECT_WAIT = 8
 64      AUTOCONFIGURE_MTU = True
 65  
 66      def __init__(self, owner, name, target_port = None, connected_socket=None, socket_path=None):
 67          super().__init__()
 68  
 69          self.epoll_backend    = False
 70          self.HW_MTU           = 262144
 71          self.online           = False
 72          
 73          if socket_path != None and RNS.Reticulum.get_instance().use_af_unix: self.socket_path = f"\0rns/{socket_path}"
 74          else: self.socket_path = None
 75          
 76          self.IN               = True
 77          self.OUT              = False
 78          self.socket           = None
 79          self.parent_interface = None
 80          self.reconnecting     = False
 81          self.never_connected  = True
 82          self.detached         = False
 83          self.name             = name
 84          self.mode             = RNS.Interfaces.Interface.Interface.MODE_FULL
 85          self.frame_buffer     = b""
 86          self.transmit_buffer  = b""
 87  
 88          if RNS.vendor.platformutils.use_epoll():
 89              self.epoll_backend = True
 90  
 91          if connected_socket != None:
 92              self.receives    = True
 93              self.target_ip   = None
 94              self.target_port = None
 95              self.socket      = connected_socket
 96  
 97              if self.socket.family == socket.AF_INET:
 98                  self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 99  
100              self.is_connected_to_shared_instance = False
101  
102          elif self.socket_path != None:
103              self.receives    = True
104              self.target_ip   = None
105              self.target_port = None
106              self.connect()
107  
108          elif target_port != None:
109              self.receives    = True
110              self.target_ip   = "127.0.0.1"
111              self.target_port = target_port
112              self.connect()
113  
114          self.owner   = owner
115          self.bitrate = 1_000_000_000
116          self.online  = True
117          self.writing = False
118  
119          self._force_bitrate = False
120  
121          self.announce_rate_target  = None
122          self.announce_rate_grace   = None
123          self.announce_rate_penalty = None
124  
125          if connected_socket == None:
126              if not self.epoll_backend:
127                  thread = threading.Thread(target=self.read_loop)
128                  thread.daemon = True
129                  thread.start()
130  
131      def should_ingress_limit(self):
132          return False
133  
134      def connect(self):
135          if self.socket_path != None:
136              self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
137              self.socket.connect(self.socket_path)
138          
139          else:
140              self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
141              self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
142              self.socket.connect((self.target_ip, self.target_port))
143  
144          self.online = True
145          self.is_connected_to_shared_instance = True
146          self.never_connected = False
147  
148          if self.epoll_backend: BackboneInterface.add_client_socket(self.socket, self)
149  
150          return True
151  
152  
153      def reconnect(self):
154          if self.is_connected_to_shared_instance:
155              if not self.reconnecting:
156                  self.reconnecting = True
157                  attempts = 0
158  
159                  while not self.online:
160                      time.sleep(LocalClientInterface.RECONNECT_WAIT)
161                      attempts += 1
162  
163                      try:
164                          self.connect()
165  
166                      except Exception as e:
167                          RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG)
168  
169                  if not self.never_connected:
170                      RNS.log("Reconnected socket for "+str(self)+".", RNS.LOG_INFO)
171  
172                  self.reconnecting = False
173                  if not self.epoll_backend:
174                      thread = threading.Thread(target=self.read_loop)
175                      thread.daemon = True
176                      thread.start()
177  
178                  def job():
179                      time.sleep(LocalClientInterface.RECONNECT_WAIT+2)
180                      RNS.Transport.shared_connection_reappeared()
181                  threading.Thread(target=job, daemon=True).start()
182          
183          else:
184              RNS.log("Attempt to reconnect on a non-initiator shared local interface. This should not happen.", RNS.LOG_ERROR)
185              raise IOError("Attempt to reconnect on a non-initiator local interface")
186  
187  
188      def process_incoming(self, data):
189          self.rxb += len(data)
190          if self.parent_interface != None: self.parent_interface.rxb += len(data)
191          
192          try:
193              self.owner.inbound(data, self)
194          except Exception as e:
195              RNS.log(f"An error in the processing of an incoming frame for {self}: {e}", RNS.LOG_ERROR)
196              RNS.trace_exception(e)
197  
198      def process_outgoing(self, data):
199          if self.online:
200              try:
201                  if self.epoll_backend:
202                      self.transmit_buffer += bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
203                      BackboneInterface.tx_ready(self)
204  
205                  else:
206                      self.writing = True
207  
208                      if self._force_bitrate:
209                          if not hasattr(self, "send_lock"):
210                              self.send_lock = Lock()
211  
212                          with self.send_lock:
213                              # RNS.log(f"Simulating latency of {RNS.prettytime(s)} for {len(data)} bytes", RNS.LOG_EXTREME)
214                              s = len(data) / self.bitrate * 8
215                              time.sleep(s)
216  
217                      data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
218                      self.socket.sendall(data)
219                      self.writing = False
220                      self.txb += len(data)
221                      if hasattr(self, "parent_interface") and self.parent_interface != None:
222                          self.parent_interface.txb += len(data)
223  
224              except Exception as e:
225                  RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR)
226                  RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
227                  RNS.trace_exception(e)
228                  self.teardown()
229  
230      def handle_hdlc(self, data_in):
231          self.frame_buffer += data_in
232          flags_remaining = True
233          while flags_remaining:
234              frame_start = self.frame_buffer.find(HDLC.FLAG)
235              if frame_start != -1:
236                  frame_end = self.frame_buffer.find(HDLC.FLAG, frame_start+1)
237                  if frame_end != -1:
238                      frame = self.frame_buffer[frame_start+1:frame_end]
239                      frame = frame.replace(bytes([HDLC.ESC, HDLC.FLAG ^ HDLC.ESC_MASK]), bytes([HDLC.FLAG]))
240                      frame = frame.replace(bytes([HDLC.ESC, HDLC.ESC  ^ HDLC.ESC_MASK]), bytes([HDLC.ESC]))
241                      if len(frame) > RNS.Reticulum.HEADER_MINSIZE:
242                          self.process_incoming(frame)
243                      self.frame_buffer = self.frame_buffer[frame_end:]
244                  else:
245                      flags_remaining = False
246              else:
247                  flags_remaining = False
248  
249      def receive(self, data_in):
250          try:
251              if len(data_in) > 0: self.handle_hdlc(data_in)
252              else:
253                  self.online = False
254                  if self.is_connected_to_shared_instance and not self.detached:
255                      RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING)
256                      RNS.Transport.shared_connection_disappeared()
257                      # TODO: Potentially run this in a thread, but since if we get here,
258                      # there's no other connectivity left to block anyway, it might be
259                      # unnecessary.
260                      self.reconnect()
261                  else:
262                      self.teardown(nowarning=True)
263                  
264          except Exception as e:
265              self.online = False
266              RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR)
267              RNS.log("Tearing down "+str(self), RNS.LOG_ERROR)
268              self.teardown()
269  
270      def read_loop(self):
271          try:
272              self.frame_buffer = b""
273              data_in = b""
274              while True:
275                  data_in = self.socket.recv(4096)
276                  if len(data_in) > 0: self.handle_hdlc(data_in)
277                  else:
278                      self.online = False
279                      if self.is_connected_to_shared_instance and not self.detached:
280                          RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING)
281                          RNS.Transport.shared_connection_disappeared()
282                          # TODO: Potentially run this in a thread, but since if we get here,
283                          # there's no other connectivity left to block anyway, it might be
284                          # unnecessary.
285                          self.reconnect()
286                      else:
287                          self.teardown(nowarning=True)
288  
289                      break
290  
291          except Exception as e:
292              self.online = False
293              RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR)
294              RNS.log("Tearing down "+str(self), RNS.LOG_ERROR)
295              self.teardown()
296  
297      def detach(self):
298          if self.socket != None:
299              if hasattr(self.socket, "close"):
300                  if callable(self.socket.close):
301                      RNS.log("Detaching "+str(self), RNS.LOG_DEBUG)
302                      self.detached = True
303                      
304                      try:
305                          if self.socket != None:
306                              self.socket.shutdown(socket.SHUT_RDWR)
307                      except Exception as e:
308                          RNS.log("Error while shutting down socket for "+str(self)+": "+str(e))
309  
310                      try:
311                          if self.socket != None:
312                              self.socket.close()
313                      except Exception as e:
314                          RNS.log("Error while closing socket for "+str(self)+": "+str(e))
315  
316                      self.socket = None
317  
318      def teardown(self, nowarning=False):
319          self.online = False
320          self.OUT = False
321          self.IN = False
322  
323          if self in RNS.Transport.interfaces:
324              RNS.Transport.interfaces.remove(self)
325  
326          if self in RNS.Transport.local_client_interfaces:
327              RNS.Transport.local_client_interfaces.remove(self)
328              if hasattr(self, "parent_interface") and self.parent_interface != None:
329                  self.parent_interface.clients -= 1
330                  if hasattr(RNS.Transport, "owner") and RNS.Transport.owner != None:
331                      RNS.Transport.owner._should_persist_data()
332  
333          if nowarning == False:
334              RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR)
335              if RNS.Reticulum.panic_on_interface_error:
336                  RNS.panic()
337  
338          if self.is_connected_to_shared_instance:
339              if nowarning == False:
340                  RNS.log("Permanently lost connection to local shared RNS instance. Exiting now.", RNS.LOG_CRITICAL)
341      
342              RNS.exit()
343  
344  
345      def __str__(self):
346          if self.socket_path: return "LocalInterface["+str(self.socket_path.replace("\0", ""))+"]"
347          else: return "LocalInterface["+str(self.target_port)+"]"
348  
349  
350  class LocalServerInterface(Interface):
351      AUTOCONFIGURE_MTU = True
352  
353      def __init__(self, owner, bindport=None, socket_path=None):
354          super().__init__()
355          self.epoll_backend = False
356          self.online = False
357          self.clients = 0
358          
359          if socket_path != None and RNS.Reticulum.get_instance().use_af_unix: self.socket_path = f"\0rns/{socket_path}"
360          else: self.socket_path = None
361          
362          self.IN  = True
363          self.OUT = False
364          self.name = "Reticulum"
365          self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL
366  
367          if RNS.vendor.platformutils.use_epoll():
368              self.epoll_backend = True
369  
370          if socket_path != None and self.epoll_backend:
371              self.receives = True
372              self.bind_ip = None
373              self.bind_port = None
374  
375              self.owner = owner
376              self.is_local_shared_instance = True
377              BackboneInterface.add_listener(self, self.socket_path, socket_type=socket.AF_UNIX)
378  
379          elif bindport != None:
380              self.receives = True
381              self.bind_ip = "127.0.0.1"
382              self.bind_port = bindport
383  
384              self.owner = owner
385              self.is_local_shared_instance = True
386  
387              address = (self.bind_ip, self.bind_port)
388              if self.epoll_backend: BackboneInterface.add_listener(self, address)
389              else:
390                  def handlerFactory(callback):
391                      def createHandler(*args, **keys):
392                          return LocalInterfaceHandler(callback, *args, **keys)
393                      return createHandler
394  
395                  self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection))
396                  self.server.daemon_threads = True
397                  thread = threading.Thread(target=self.server.serve_forever)
398                  thread.daemon = True
399                  thread.start()
400  
401          self.announce_rate_target  = None
402          self.announce_rate_grace   = None
403          self.announce_rate_penalty = None
404  
405          self.bitrate = 1000*1000*1000
406          self.online = True
407  
408      def incoming_connection(self, handler):
409          if self.epoll_backend:
410              client_socket = handler
411              if client_socket.family == socket.AF_INET:
412                  interface_name = str(str(client_socket.getpeername()[1]))
413              elif client_socket.family == socket.AF_UNIX:
414                  interface_name = f"{self.clients}@{self.socket_path}"
415  
416              spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=client_socket)
417              spawned_interface.OUT = self.OUT
418              spawned_interface.IN  = self.IN
419              spawned_interface.socket = client_socket
420              spawned_interface.parent_interface = self
421              spawned_interface.bitrate = self.bitrate
422  
423              if client_socket.family == socket.AF_INET:
424                  spawned_interface.target_ip = client_socket.getpeername()[0]
425                  spawned_interface.target_port = str(client_socket.getpeername()[1])
426  
427              elif client_socket.family == socket.AF_UNIX:
428                  spawned_interface.target_ip = None
429                  spawned_interface.target_port = interface_name
430                  spawned_interface.socket_path = self.socket_path
431  
432              if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate
433              RNS.Transport.interfaces.append(spawned_interface)
434              RNS.Transport.local_client_interfaces.append(spawned_interface)
435              BackboneInterface.add_client_socket(client_socket, spawned_interface)
436              self.clients += 1
437              return True
438  
439          else:
440              interface_name = str(str(handler.client_address[1]))
441              spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=handler.request)
442              spawned_interface.OUT = self.OUT
443              spawned_interface.IN  = self.IN
444              spawned_interface.target_ip = handler.client_address[0]
445              spawned_interface.target_port = str(handler.client_address[1])
446              spawned_interface.parent_interface = self
447              spawned_interface.bitrate = self.bitrate
448              if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate
449              RNS.Transport.interfaces.append(spawned_interface)
450              RNS.Transport.local_client_interfaces.append(spawned_interface)
451              self.clients += 1
452              spawned_interface.read_loop()
453  
454      def process_outgoing(self, data):
455          pass
456  
457      def received_announce(self, from_spawned=False):
458          if from_spawned: self.ia_freq_deque.append(time.time())
459  
460      def sent_announce(self, from_spawned=False):
461          if from_spawned: self.oa_freq_deque.append(time.time())
462  
463      def __str__(self):
464          if self.socket_path: return "Shared Instance["+str(self.socket_path.replace("\0", ""))+"]"
465          else: return "Shared Instance["+str(self.bind_port)+"]"
466  
467  class LocalInterfaceHandler(socketserver.BaseRequestHandler):
468      def __init__(self, callback, *args, **keys):
469          self.callback = callback
470          socketserver.BaseRequestHandler.__init__(self, *args, **keys)
471  
472      def handle(self):
473          self.callback(handler=self)