LocalInterface.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 from RNS.Interfaces.Interface import Interface 32 from RNS.Interfaces.BackboneInterface import BackboneInterface 33 import socketserver 34 import threading 35 import socket 36 import time 37 import sys 38 import os 39 import RNS 40 from threading import Lock 41 42 class HDLC(): 43 FLAG = 0x7E 44 ESC = 0x7D 45 ESC_MASK = 0x20 46 47 @staticmethod 48 def escape(data): 49 data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK])) 50 data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK])) 51 return data 52 53 class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): 54 def server_bind(self): 55 if RNS.vendor.platformutils.is_windows(): 56 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 57 else: 58 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 59 self.socket.bind(self.server_address) 60 self.server_address = self.socket.getsockname() 61 62 class LocalClientInterface(Interface): 63 RECONNECT_WAIT = 8 64 AUTOCONFIGURE_MTU = True 65 66 def __init__(self, owner, name, target_port = None, connected_socket=None, socket_path=None): 67 super().__init__() 68 69 self.epoll_backend = False 70 self.HW_MTU = 262144 71 self.online = False 72 73 if socket_path != None and RNS.Reticulum.get_instance().use_af_unix: self.socket_path = f"\0rns/{socket_path}" 74 else: self.socket_path = None 75 76 self.IN = True 77 self.OUT = False 78 self.socket = None 79 self.parent_interface = None 80 self.reconnecting = False 81 self.never_connected = True 82 self.detached = False 83 self.name = name 84 self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL 85 self.frame_buffer = b"" 86 self.transmit_buffer = b"" 87 88 if RNS.vendor.platformutils.use_epoll(): 89 self.epoll_backend = True 90 91 if connected_socket != None: 92 self.receives = True 93 self.target_ip = None 94 self.target_port = None 95 self.socket = connected_socket 96 97 if self.socket.family == socket.AF_INET: 98 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 99 100 self.is_connected_to_shared_instance = False 101 102 elif self.socket_path != None: 103 self.receives = True 104 self.target_ip = None 105 self.target_port = None 106 self.connect() 107 108 elif target_port != None: 109 self.receives = True 110 self.target_ip = "127.0.0.1" 111 self.target_port = target_port 112 self.connect() 113 114 self.owner = owner 115 self.bitrate = 1_000_000_000 116 self.online = True 117 self.writing = False 118 119 self._force_bitrate = False 120 121 self.announce_rate_target = None 122 self.announce_rate_grace = None 123 self.announce_rate_penalty = None 124 125 if connected_socket == None: 126 if not self.epoll_backend: 127 thread = threading.Thread(target=self.read_loop) 128 thread.daemon = True 129 thread.start() 130 131 def should_ingress_limit(self): 132 return False 133 134 def connect(self): 135 if self.socket_path != None: 136 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 137 self.socket.connect(self.socket_path) 138 139 else: 140 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 141 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 142 self.socket.connect((self.target_ip, self.target_port)) 143 144 self.online = True 145 self.is_connected_to_shared_instance = True 146 self.never_connected = False 147 148 if self.epoll_backend: BackboneInterface.add_client_socket(self.socket, self) 149 150 return True 151 152 153 def reconnect(self): 154 if self.is_connected_to_shared_instance: 155 if not self.reconnecting: 156 self.reconnecting = True 157 attempts = 0 158 159 while not self.online: 160 time.sleep(LocalClientInterface.RECONNECT_WAIT) 161 attempts += 1 162 163 try: 164 self.connect() 165 166 except Exception as e: 167 RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG) 168 169 if not self.never_connected: 170 RNS.log("Reconnected socket for "+str(self)+".", RNS.LOG_INFO) 171 172 self.reconnecting = False 173 if not self.epoll_backend: 174 thread = threading.Thread(target=self.read_loop) 175 thread.daemon = True 176 thread.start() 177 178 def job(): 179 time.sleep(LocalClientInterface.RECONNECT_WAIT+2) 180 RNS.Transport.shared_connection_reappeared() 181 threading.Thread(target=job, daemon=True).start() 182 183 else: 184 RNS.log("Attempt to reconnect on a non-initiator shared local interface. This should not happen.", RNS.LOG_ERROR) 185 raise IOError("Attempt to reconnect on a non-initiator local interface") 186 187 188 def process_incoming(self, data): 189 self.rxb += len(data) 190 if self.parent_interface != None: self.parent_interface.rxb += len(data) 191 192 try: 193 self.owner.inbound(data, self) 194 except Exception as e: 195 RNS.log(f"An error in the processing of an incoming frame for {self}: {e}", RNS.LOG_ERROR) 196 RNS.trace_exception(e) 197 198 def process_outgoing(self, data): 199 if self.online: 200 try: 201 if self.epoll_backend: 202 self.transmit_buffer += bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) 203 BackboneInterface.tx_ready(self) 204 205 else: 206 self.writing = True 207 208 if self._force_bitrate: 209 if not hasattr(self, "send_lock"): 210 self.send_lock = Lock() 211 212 with self.send_lock: 213 # RNS.log(f"Simulating latency of {RNS.prettytime(s)} for {len(data)} bytes", RNS.LOG_EXTREME) 214 s = len(data) / self.bitrate * 8 215 time.sleep(s) 216 217 data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) 218 self.socket.sendall(data) 219 self.writing = False 220 self.txb += len(data) 221 if hasattr(self, "parent_interface") and self.parent_interface != None: 222 self.parent_interface.txb += len(data) 223 224 except Exception as e: 225 RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR) 226 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 227 RNS.trace_exception(e) 228 self.teardown() 229 230 def handle_hdlc(self, data_in): 231 self.frame_buffer += data_in 232 flags_remaining = True 233 while flags_remaining: 234 frame_start = self.frame_buffer.find(HDLC.FLAG) 235 if frame_start != -1: 236 frame_end = self.frame_buffer.find(HDLC.FLAG, frame_start+1) 237 if frame_end != -1: 238 frame = self.frame_buffer[frame_start+1:frame_end] 239 frame = frame.replace(bytes([HDLC.ESC, HDLC.FLAG ^ HDLC.ESC_MASK]), bytes([HDLC.FLAG])) 240 frame = frame.replace(bytes([HDLC.ESC, HDLC.ESC ^ HDLC.ESC_MASK]), bytes([HDLC.ESC])) 241 if len(frame) > RNS.Reticulum.HEADER_MINSIZE: 242 self.process_incoming(frame) 243 self.frame_buffer = self.frame_buffer[frame_end:] 244 else: 245 flags_remaining = False 246 else: 247 flags_remaining = False 248 249 def receive(self, data_in): 250 try: 251 if len(data_in) > 0: self.handle_hdlc(data_in) 252 else: 253 self.online = False 254 if self.is_connected_to_shared_instance and not self.detached: 255 RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) 256 RNS.Transport.shared_connection_disappeared() 257 # TODO: Potentially run this in a thread, but since if we get here, 258 # there's no other connectivity left to block anyway, it might be 259 # unnecessary. 260 self.reconnect() 261 else: 262 self.teardown(nowarning=True) 263 264 except Exception as e: 265 self.online = False 266 RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) 267 RNS.log("Tearing down "+str(self), RNS.LOG_ERROR) 268 self.teardown() 269 270 def read_loop(self): 271 try: 272 self.frame_buffer = b"" 273 data_in = b"" 274 while True: 275 data_in = self.socket.recv(4096) 276 if len(data_in) > 0: self.handle_hdlc(data_in) 277 else: 278 self.online = False 279 if self.is_connected_to_shared_instance and not self.detached: 280 RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) 281 RNS.Transport.shared_connection_disappeared() 282 # TODO: Potentially run this in a thread, but since if we get here, 283 # there's no other connectivity left to block anyway, it might be 284 # unnecessary. 285 self.reconnect() 286 else: 287 self.teardown(nowarning=True) 288 289 break 290 291 except Exception as e: 292 self.online = False 293 RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) 294 RNS.log("Tearing down "+str(self), RNS.LOG_ERROR) 295 self.teardown() 296 297 def detach(self): 298 if self.socket != None: 299 if hasattr(self.socket, "close"): 300 if callable(self.socket.close): 301 RNS.log("Detaching "+str(self), RNS.LOG_DEBUG) 302 self.detached = True 303 304 try: 305 if self.socket != None: 306 self.socket.shutdown(socket.SHUT_RDWR) 307 except Exception as e: 308 RNS.log("Error while shutting down socket for "+str(self)+": "+str(e)) 309 310 try: 311 if self.socket != None: 312 self.socket.close() 313 except Exception as e: 314 RNS.log("Error while closing socket for "+str(self)+": "+str(e)) 315 316 self.socket = None 317 318 def teardown(self, nowarning=False): 319 self.online = False 320 self.OUT = False 321 self.IN = False 322 323 if self in RNS.Transport.interfaces: 324 RNS.Transport.interfaces.remove(self) 325 326 if self in RNS.Transport.local_client_interfaces: 327 RNS.Transport.local_client_interfaces.remove(self) 328 if hasattr(self, "parent_interface") and self.parent_interface != None: 329 self.parent_interface.clients -= 1 330 if hasattr(RNS.Transport, "owner") and RNS.Transport.owner != None: 331 RNS.Transport.owner._should_persist_data() 332 333 if nowarning == False: 334 RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) 335 if RNS.Reticulum.panic_on_interface_error: 336 RNS.panic() 337 338 if self.is_connected_to_shared_instance: 339 if nowarning == False: 340 RNS.log("Permanently lost connection to local shared RNS instance. Exiting now.", RNS.LOG_CRITICAL) 341 342 RNS.exit() 343 344 345 def __str__(self): 346 if self.socket_path: return "LocalInterface["+str(self.socket_path.replace("\0", ""))+"]" 347 else: return "LocalInterface["+str(self.target_port)+"]" 348 349 350 class LocalServerInterface(Interface): 351 AUTOCONFIGURE_MTU = True 352 353 def __init__(self, owner, bindport=None, socket_path=None): 354 super().__init__() 355 self.epoll_backend = False 356 self.online = False 357 self.clients = 0 358 359 if socket_path != None and RNS.Reticulum.get_instance().use_af_unix: self.socket_path = f"\0rns/{socket_path}" 360 else: self.socket_path = None 361 362 self.IN = True 363 self.OUT = False 364 self.name = "Reticulum" 365 self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL 366 367 if RNS.vendor.platformutils.use_epoll(): 368 self.epoll_backend = True 369 370 if socket_path != None and self.epoll_backend: 371 self.receives = True 372 self.bind_ip = None 373 self.bind_port = None 374 375 self.owner = owner 376 self.is_local_shared_instance = True 377 BackboneInterface.add_listener(self, self.socket_path, socket_type=socket.AF_UNIX) 378 379 elif bindport != None: 380 self.receives = True 381 self.bind_ip = "127.0.0.1" 382 self.bind_port = bindport 383 384 self.owner = owner 385 self.is_local_shared_instance = True 386 387 address = (self.bind_ip, self.bind_port) 388 if self.epoll_backend: BackboneInterface.add_listener(self, address) 389 else: 390 def handlerFactory(callback): 391 def createHandler(*args, **keys): 392 return LocalInterfaceHandler(callback, *args, **keys) 393 return createHandler 394 395 self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection)) 396 self.server.daemon_threads = True 397 thread = threading.Thread(target=self.server.serve_forever) 398 thread.daemon = True 399 thread.start() 400 401 self.announce_rate_target = None 402 self.announce_rate_grace = None 403 self.announce_rate_penalty = None 404 405 self.bitrate = 1000*1000*1000 406 self.online = True 407 408 def incoming_connection(self, handler): 409 if self.epoll_backend: 410 client_socket = handler 411 if client_socket.family == socket.AF_INET: 412 interface_name = str(str(client_socket.getpeername()[1])) 413 elif client_socket.family == socket.AF_UNIX: 414 interface_name = f"{self.clients}@{self.socket_path}" 415 416 spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=client_socket) 417 spawned_interface.OUT = self.OUT 418 spawned_interface.IN = self.IN 419 spawned_interface.socket = client_socket 420 spawned_interface.parent_interface = self 421 spawned_interface.bitrate = self.bitrate 422 423 if client_socket.family == socket.AF_INET: 424 spawned_interface.target_ip = client_socket.getpeername()[0] 425 spawned_interface.target_port = str(client_socket.getpeername()[1]) 426 427 elif client_socket.family == socket.AF_UNIX: 428 spawned_interface.target_ip = None 429 spawned_interface.target_port = interface_name 430 spawned_interface.socket_path = self.socket_path 431 432 if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate 433 RNS.Transport.interfaces.append(spawned_interface) 434 RNS.Transport.local_client_interfaces.append(spawned_interface) 435 BackboneInterface.add_client_socket(client_socket, spawned_interface) 436 self.clients += 1 437 return True 438 439 else: 440 interface_name = str(str(handler.client_address[1])) 441 spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=handler.request) 442 spawned_interface.OUT = self.OUT 443 spawned_interface.IN = self.IN 444 spawned_interface.target_ip = handler.client_address[0] 445 spawned_interface.target_port = str(handler.client_address[1]) 446 spawned_interface.parent_interface = self 447 spawned_interface.bitrate = self.bitrate 448 if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate 449 RNS.Transport.interfaces.append(spawned_interface) 450 RNS.Transport.local_client_interfaces.append(spawned_interface) 451 self.clients += 1 452 spawned_interface.read_loop() 453 454 def process_outgoing(self, data): 455 pass 456 457 def received_announce(self, from_spawned=False): 458 if from_spawned: self.ia_freq_deque.append(time.time()) 459 460 def sent_announce(self, from_spawned=False): 461 if from_spawned: self.oa_freq_deque.append(time.time()) 462 463 def __str__(self): 464 if self.socket_path: return "Shared Instance["+str(self.socket_path.replace("\0", ""))+"]" 465 else: return "Shared Instance["+str(self.bind_port)+"]" 466 467 class LocalInterfaceHandler(socketserver.BaseRequestHandler): 468 def __init__(self, callback, *args, **keys): 469 self.callback = callback 470 socketserver.BaseRequestHandler.__init__(self, *args, **keys) 471 472 def handle(self): 473 self.callback(handler=self)