I2PInterface.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 from RNS.Interfaces.Interface import Interface 32 import socketserver 33 import threading 34 import platform 35 import socket 36 import time 37 import sys 38 import os 39 import RNS 40 import asyncio 41 42 class HDLC(): 43 FLAG = 0x7E 44 ESC = 0x7D 45 ESC_MASK = 0x20 46 47 @staticmethod 48 def escape(data): 49 data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK])) 50 data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK])) 51 return data 52 53 class KISS(): 54 FEND = 0xC0 55 FESC = 0xDB 56 TFEND = 0xDC 57 TFESC = 0xDD 58 CMD_DATA = 0x00 59 CMD_UNKNOWN = 0xFE 60 61 @staticmethod 62 def escape(data): 63 data = data.replace(bytes([0xdb]), bytes([0xdb, 0xdd])) 64 data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc])) 65 return data 66 67 # TODO: Neater shutdown of the event loop and 68 # better error handling is needed. Sometimes 69 # errors occur in I2P that leave tunnel setup 70 # hanging indefinitely, and right now we have 71 # no way of catching it. Sometimes the server 72 # and client tasks are also not cancelled on 73 # shutdown, which leads to errors dumped to 74 # the console. This should also be remedied. 75 76 class I2PController: 77 def __init__(self, rns_storagepath): 78 import RNS.vendor.i2plib as i2plib 79 import RNS.vendor.i2plib.utils 80 81 self.client_tunnels = {} 82 self.server_tunnels = {} 83 self.i2plib_tunnels = {} 84 self.loop = None 85 self.i2plib = i2plib 86 self.utils = i2plib.utils 87 self.sam_address = i2plib.get_sam_address() 88 self.ready = False 89 90 self.storagepath = rns_storagepath+"/i2p" 91 if not os.path.isdir(self.storagepath): 92 os.makedirs(self.storagepath) 93 94 95 def start(self): 96 asyncio.set_event_loop(asyncio.new_event_loop()) 97 self.loop = asyncio.get_event_loop() 98 99 time.sleep(0.10) 100 if self.loop == None: 101 RNS.log("Could not get event loop for "+str(self)+", waiting for event loop to appear", RNS.LOG_VERBOSE) 102 103 while self.loop == None: 104 self.loop = asyncio.get_event_loop() 105 sleep(0.25) 106 107 try: 108 self.ready = True 109 self.loop.run_forever() 110 except Exception as e: 111 self.ready = False 112 RNS.log("Exception on event loop for "+str(self)+": "+str(e), RNS.LOG_ERROR) 113 finally: 114 self.loop.close() 115 116 117 def stop(self): 118 for i2ptunnel in self.i2plib_tunnels: 119 if hasattr(i2ptunnel, "stop") and callable(i2ptunnel.stop): 120 i2ptunnel.stop() 121 122 if hasattr(asyncio.Task, "all_tasks") and callable(asyncio.Task.all_tasks): 123 for task in asyncio.Task.all_tasks(loop=self.loop): 124 task.cancel() 125 126 time.sleep(0.2) 127 128 self.loop.stop() 129 130 131 def get_free_port(self): 132 return self.i2plib.utils.get_free_port() 133 134 135 def stop_tunnel(self, i2ptunnel): 136 if hasattr(i2ptunnel, "stop") and callable(i2ptunnel.stop): 137 i2ptunnel.stop() 138 139 def client_tunnel(self, owner, i2p_destination): 140 self.client_tunnels[i2p_destination] = False 141 self.i2plib_tunnels[i2p_destination] = None 142 143 while True: 144 if not self.client_tunnels[i2p_destination]: 145 try: 146 async def tunnel_up(): 147 RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO) 148 tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop) 149 self.i2plib_tunnels[i2p_destination] = tunnel 150 await tunnel.run() 151 152 self.loop.ext_owner = self 153 result = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() 154 155 if not i2p_destination in self.i2plib_tunnels: 156 raise IOError("No tunnel control instance was created") 157 158 else: 159 tn = self.i2plib_tunnels[i2p_destination] 160 if tn != None and hasattr(tn, "status"): 161 162 RNS.log("Waiting for status from I2P control process", RNS.LOG_EXTREME) 163 while not tn.status["setup_ran"]: 164 time.sleep(0.1) 165 RNS.log("Got status from I2P control process", RNS.LOG_EXTREME) 166 167 if tn.status["setup_failed"]: 168 self.stop_tunnel(tn) 169 raise tn.status["exception"] 170 171 else: 172 if owner.socket != None: 173 if hasattr(owner.socket, "close"): 174 if callable(owner.socket.close): 175 try: 176 owner.socket.shutdown(socket.SHUT_RDWR) 177 except Exception as e: 178 RNS.log("Error while shutting down socket for "+str(owner)+": "+str(e)) 179 180 try: 181 owner.socket.close() 182 except Exception as e: 183 RNS.log("Error while closing socket for "+str(owner)+": "+str(e)) 184 self.client_tunnels[i2p_destination] = True 185 owner.awaiting_i2p_tunnel = False 186 187 RNS.log(str(owner)+" tunnel setup complete", RNS.LOG_VERBOSE) 188 189 else: 190 raise IOError("Got no status response from SAM API") 191 192 except ConnectionRefusedError as e: 193 raise e 194 195 except ConnectionAbortedError as e: 196 raise e 197 198 except Exception as e: 199 RNS.log("Unexpected error type from I2P SAM: "+str(e), RNS.LOG_ERROR) 200 raise e 201 202 else: 203 i2ptunnel = self.i2plib_tunnels[i2p_destination] 204 if hasattr(i2ptunnel, "status"): 205 i2p_exception = i2ptunnel.status["exception"] 206 207 if i2ptunnel.status["setup_ran"] == False: 208 RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR) 209 210 self.stop_tunnel(i2ptunnel) 211 return False 212 213 elif i2p_exception != None: 214 RNS.log("An error ocurred while setting up I2P tunnel to "+str(i2p_destination), RNS.LOG_ERROR) 215 216 if isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.CantReachPeer): 217 RNS.log("The I2P daemon can't reach peer "+str(i2p_destination), RNS.LOG_ERROR) 218 219 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedDest): 220 RNS.log("The I2P daemon reported that the destination is already in use", RNS.LOG_ERROR) 221 222 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedId): 223 RNS.log("The I2P daemon reported that the ID is arleady in use", RNS.LOG_ERROR) 224 225 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidId): 226 RNS.log("The I2P daemon reported that the stream session ID doesn't exist", RNS.LOG_ERROR) 227 228 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidKey): 229 RNS.log("The I2P daemon reported that the key for "+str(i2p_destination)+" is invalid", RNS.LOG_ERROR) 230 231 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.KeyNotFound): 232 RNS.log("The I2P daemon could not find the key for "+str(i2p_destination), RNS.LOG_ERROR) 233 234 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.PeerNotFound): 235 RNS.log("The I2P daemon mould not find the peer "+str(i2p_destination), RNS.LOG_ERROR) 236 237 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.I2PError): 238 RNS.log("The I2P daemon experienced an unspecified error", RNS.LOG_ERROR) 239 240 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.Timeout): 241 RNS.log("I2P daemon timed out while setting up client tunnel to "+str(i2p_destination), RNS.LOG_ERROR) 242 243 RNS.log("Resetting I2P tunnel and retrying later", RNS.LOG_ERROR) 244 245 self.stop_tunnel(i2ptunnel) 246 return False 247 248 elif i2ptunnel.status["setup_failed"] == True: 249 RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR) 250 251 self.stop_tunnel(i2ptunnel) 252 return False 253 254 else: 255 RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR) 256 257 self.stop_tunnel(i2ptunnel) 258 return False 259 260 # Wait for status from I2P control process 261 time.sleep(5) 262 263 264 def server_tunnel(self, owner): 265 while RNS.Transport.identity == None: 266 time.sleep(1) 267 268 # Old format 269 i2p_dest_hash_of = RNS.Identity.full_hash(RNS.Identity.full_hash(owner.name.encode("utf-8"))) 270 i2p_keyfile_of = self.storagepath+"/"+RNS.hexrep(i2p_dest_hash_of, delimit=False)+".i2p" 271 272 # New format 273 i2p_dest_hash_nf = RNS.Identity.full_hash(RNS.Identity.full_hash(owner.name.encode("utf-8"))+RNS.Identity.full_hash(RNS.Transport.identity.hash)) 274 i2p_keyfile_nf = self.storagepath+"/"+RNS.hexrep(i2p_dest_hash_nf, delimit=False)+".i2p" 275 276 # Use old format if a key is already present 277 if os.path.isfile(i2p_keyfile_of): 278 i2p_keyfile = i2p_keyfile_of 279 else: 280 i2p_keyfile = i2p_keyfile_nf 281 282 i2p_dest = None 283 if not os.path.isfile(i2p_keyfile): 284 coro = self.i2plib.new_destination(sam_address=self.sam_address, loop=self.loop) 285 i2p_dest = asyncio.run_coroutine_threadsafe(coro, self.loop).result() 286 key_file = open(i2p_keyfile, "w") 287 key_file.write(i2p_dest.private_key.base64) 288 key_file.close() 289 else: 290 key_file = open(i2p_keyfile, "r") 291 prvd = key_file.read() 292 key_file.close() 293 i2p_dest = self.i2plib.Destination(data=prvd, has_private_key=True) 294 295 i2p_b32 = i2p_dest.base32 296 owner.b32 = i2p_b32 297 298 self.server_tunnels[i2p_b32] = False 299 self.i2plib_tunnels[i2p_b32] = None 300 301 while True: 302 if self.server_tunnels[i2p_b32] == False: 303 try: 304 async def tunnel_up(): 305 RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO) 306 tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address) 307 self.i2plib_tunnels[i2p_b32] = tunnel 308 await tunnel.run() 309 owner.online = True 310 RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE) 311 312 asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() 313 self.server_tunnels[i2p_b32] = True 314 315 except Exception as e: 316 raise e 317 318 else: 319 i2ptunnel = self.i2plib_tunnels[i2p_b32] 320 if hasattr(i2ptunnel, "status"): 321 i2p_exception = i2ptunnel.status["exception"] 322 323 if i2ptunnel.status["setup_ran"] == False: 324 RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR) 325 326 self.stop_tunnel(i2ptunnel) 327 return False 328 329 elif i2p_exception != None: 330 RNS.log("An error ocurred while setting up I2P tunnel", RNS.LOG_ERROR) 331 332 if isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.CantReachPeer): 333 RNS.log("The I2P daemon can't reach peer "+str(i2p_destination), RNS.LOG_ERROR) 334 335 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedDest): 336 RNS.log("The I2P daemon reported that the destination is already in use", RNS.LOG_ERROR) 337 338 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedId): 339 RNS.log("The I2P daemon reported that the ID is arleady in use", RNS.LOG_ERROR) 340 341 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidId): 342 RNS.log("The I2P daemon reported that the stream session ID doesn't exist", RNS.LOG_ERROR) 343 344 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidKey): 345 RNS.log("The I2P daemon reported that the key for "+str(i2p_destination)+" is invalid", RNS.LOG_ERROR) 346 347 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.KeyNotFound): 348 RNS.log("The I2P daemon could not find the key for "+str(i2p_destination), RNS.LOG_ERROR) 349 350 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.PeerNotFound): 351 RNS.log("The I2P daemon mould not find the peer "+str(i2p_destination), RNS.LOG_ERROR) 352 353 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.I2PError): 354 RNS.log("The I2P daemon experienced an unspecified error", RNS.LOG_ERROR) 355 356 elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.Timeout): 357 RNS.log("I2P daemon timed out while setting up client tunnel to "+str(i2p_destination), RNS.LOG_ERROR) 358 359 RNS.log("Resetting I2P tunnel and retrying later", RNS.LOG_ERROR) 360 361 self.stop_tunnel(i2ptunnel) 362 return False 363 364 elif i2ptunnel.status["setup_failed"] == True: 365 RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR) 366 367 self.stop_tunnel(i2ptunnel) 368 return False 369 370 else: 371 RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR) 372 373 self.stop_tunnel(i2ptunnel) 374 return False 375 376 time.sleep(5) 377 378 def get_loop(self): 379 return asyncio.get_event_loop() 380 381 382 class ThreadingI2PServer(socketserver.ThreadingMixIn, socketserver.TCPServer): 383 pass 384 385 class I2PInterfacePeer(Interface): 386 RECONNECT_WAIT = 15 387 RECONNECT_MAX_TRIES = None 388 389 # TCP socket options 390 I2P_USER_TIMEOUT = 45 391 I2P_PROBE_AFTER = 10 392 I2P_PROBE_INTERVAL = 9 393 I2P_PROBES = 5 394 I2P_READ_TIMEOUT = (I2P_PROBE_INTERVAL * I2P_PROBES + I2P_PROBE_AFTER)*2 395 396 TUNNEL_STATE_INIT = 0x00 397 TUNNEL_STATE_ACTIVE = 0x01 398 TUNNEL_STATE_STALE = 0x02 399 400 def __init__(self, parent_interface, owner, name, target_i2p_dest=None, connected_socket=None, max_reconnect_tries=None): 401 super().__init__() 402 403 self.HW_MTU = 1064 404 405 self.IN = True 406 self.OUT = False 407 self.socket = None 408 self.parent_interface = parent_interface 409 self.parent_count = True 410 self.name = name 411 self.initiator = False 412 self.reconnecting = False 413 self.never_connected = True 414 self.owner = owner 415 self.writing = False 416 self.online = False 417 self.detached = False 418 self.kiss_framing = False 419 self.i2p_tunneled = True 420 self.i2p_dest = None 421 self.i2p_tunnel_ready = False 422 self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL 423 self.bitrate = I2PInterface.BITRATE_GUESS 424 self.last_read = 0 425 self.last_write = 0 426 self.wd_reset = False 427 self.i2p_tunnel_state = I2PInterfacePeer.TUNNEL_STATE_INIT 428 429 self.ifac_size = self.parent_interface.ifac_size 430 self.ifac_netname = self.parent_interface.ifac_netname 431 self.ifac_netkey = self.parent_interface.ifac_netkey 432 if self.ifac_netname != None or self.ifac_netkey != None: 433 ifac_origin = b"" 434 if self.ifac_netname != None: 435 ifac_origin += RNS.Identity.full_hash(self.ifac_netname.encode("utf-8")) 436 if self.ifac_netkey != None: 437 ifac_origin += RNS.Identity.full_hash(self.ifac_netkey.encode("utf-8")) 438 439 ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) 440 self.ifac_key = RNS.Cryptography.hkdf( 441 length=64, 442 derive_from=ifac_origin_hash, 443 salt=RNS.Reticulum.IFAC_SALT, 444 context=None 445 ) 446 self.ifac_identity = RNS.Identity.from_bytes(self.ifac_key) 447 self.ifac_signature = self.ifac_identity.sign(RNS.Identity.full_hash(self.ifac_key)) 448 449 self.announce_rate_target = None 450 self.announce_rate_grace = None 451 self.announce_rate_penalty = None 452 453 if max_reconnect_tries == None: 454 self.max_reconnect_tries = I2PInterfacePeer.RECONNECT_MAX_TRIES 455 else: 456 self.max_reconnect_tries = max_reconnect_tries 457 458 if connected_socket != None: 459 self.receives = True 460 self.target_ip = None 461 self.target_port = None 462 self.socket = connected_socket 463 464 if platform.system() == "Linux": 465 self.set_timeouts_linux() 466 elif platform.system() == "Darwin": 467 self.set_timeouts_osx() 468 469 elif target_i2p_dest != None: 470 self.receives = True 471 self.initiator = True 472 473 self.bind_ip = "127.0.0.1" 474 475 self.awaiting_i2p_tunnel = True 476 477 def tunnel_job(): 478 while self.awaiting_i2p_tunnel: 479 try: 480 self.bind_port = self.parent_interface.i2p.get_free_port() 481 self.local_addr = (self.bind_ip, self.bind_port) 482 self.target_ip = self.bind_ip 483 self.target_port = self.bind_port 484 485 if not self.parent_interface.i2p.client_tunnel(self, target_i2p_dest): 486 RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR) 487 self.awaiting_i2p_tunnel = True 488 489 except Exception as e: 490 RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR) 491 RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR) 492 493 time.sleep(8) 494 495 thread = threading.Thread(target=tunnel_job) 496 thread.daemon = True 497 thread.start() 498 499 def wait_job(): 500 while self.awaiting_i2p_tunnel: 501 time.sleep(0.25) 502 time.sleep(2) 503 504 if not self.kiss_framing: 505 self.wants_tunnel = True 506 507 if not self.connect(initial=True): 508 thread = threading.Thread(target=self.reconnect) 509 thread.daemon = True 510 thread.start() 511 else: 512 thread = threading.Thread(target=self.read_loop) 513 thread.daemon = True 514 thread.start() 515 516 thread = threading.Thread(target=wait_job) 517 thread.daemon = True 518 thread.start() 519 520 521 def set_timeouts_linux(self): 522 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(I2PInterfacePeer.I2P_USER_TIMEOUT * 1000)) 523 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 524 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) 525 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(I2PInterfacePeer.I2P_PROBE_INTERVAL)) 526 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(I2PInterfacePeer.I2P_PROBES)) 527 528 def set_timeouts_osx(self): 529 if hasattr(socket, "TCP_KEEPALIVE"): 530 TCP_KEEPIDLE = socket.TCP_KEEPALIVE 531 else: 532 TCP_KEEPIDLE = 0x10 533 534 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 535 self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) 536 537 def shutdown_socket(self, target_socket): 538 if callable(target_socket.close): 539 try: 540 if socket != None: 541 target_socket.shutdown(socket.SHUT_RDWR) 542 except Exception as e: 543 RNS.log("Error while shutting down socket for "+str(self)+": "+str(e)) 544 545 try: 546 if socket != None: 547 target_socket.close() 548 except Exception as e: 549 RNS.log("Error while closing socket for "+str(self)+": "+str(e)) 550 551 def detach(self): 552 RNS.log("Detaching "+str(self), RNS.LOG_DEBUG) 553 if self.socket != None: 554 if hasattr(self.socket, "close"): 555 if callable(self.socket.close): 556 self.detached = True 557 558 try: 559 self.socket.shutdown(socket.SHUT_RDWR) 560 except Exception as e: 561 RNS.log("Error while shutting down socket for "+str(self)+": "+str(e)) 562 563 try: 564 self.socket.close() 565 except Exception as e: 566 RNS.log("Error while closing socket for "+str(self)+": "+str(e)) 567 568 self.socket = None 569 570 def connect(self, initial=False): 571 try: 572 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 573 self.socket.connect((self.target_ip, self.target_port)) 574 self.online = True 575 576 except Exception as e: 577 if initial: 578 if not self.awaiting_i2p_tunnel: 579 RNS.log("Initial connection for "+str(self)+" could not be established: "+str(e), RNS.LOG_ERROR) 580 RNS.log("Leaving unconnected and retrying connection in "+str(I2PInterfacePeer.RECONNECT_WAIT)+" seconds.", RNS.LOG_ERROR) 581 582 return False 583 584 else: 585 raise e 586 587 if platform.system() == "Linux": 588 self.set_timeouts_linux() 589 elif platform.system() == "Darwin": 590 self.set_timeouts_osx() 591 592 self.online = True 593 self.writing = False 594 self.never_connected = False 595 596 if not self.kiss_framing and self.wants_tunnel: 597 RNS.Transport.synthesize_tunnel(self) 598 599 return True 600 601 def reconnect(self): 602 if self.initiator: 603 if not self.reconnecting: 604 self.reconnecting = True 605 attempts = 0 606 while not self.online: 607 time.sleep(I2PInterfacePeer.RECONNECT_WAIT) 608 attempts += 1 609 610 if self.max_reconnect_tries != None and attempts > self.max_reconnect_tries: 611 RNS.log("Max reconnection attempts reached for "+str(self), RNS.LOG_ERROR) 612 self.teardown() 613 break 614 615 try: 616 self.connect() 617 618 except Exception as e: 619 if not self.awaiting_i2p_tunnel: 620 RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG) 621 else: 622 RNS.log(str(self)+" still waiting for I2P tunnel to appear", RNS.LOG_VERBOSE) 623 624 if not self.never_connected: 625 RNS.log(str(self)+" Re-established connection via I2P tunnel", RNS.LOG_INFO) 626 627 self.reconnecting = False 628 thread = threading.Thread(target=self.read_loop) 629 thread.daemon = True 630 thread.start() 631 if not self.kiss_framing: 632 RNS.Transport.synthesize_tunnel(self) 633 634 else: 635 RNS.log("Attempt to reconnect on a non-initiator I2P interface. This should not happen.", RNS.LOG_ERROR) 636 raise IOError("Attempt to reconnect on a non-initiator I2P interface") 637 638 def process_incoming(self, data): 639 self.rxb += len(data) 640 if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count: 641 self.parent_interface.rxb += len(data) 642 643 self.owner.inbound(data, self) 644 645 def process_outgoing(self, data): 646 if self.online: 647 while self.writing: 648 time.sleep(0.001) 649 650 try: 651 self.writing = True 652 653 if self.kiss_framing: 654 data = bytes([KISS.FEND])+bytes([KISS.CMD_DATA])+KISS.escape(data)+bytes([KISS.FEND]) 655 else: 656 data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) 657 658 self.socket.sendall(data) 659 self.writing = False 660 self.txb += len(data) 661 self.last_write = time.time() 662 663 if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count: 664 self.parent_interface.txb += len(data) 665 666 except Exception as e: 667 RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR) 668 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 669 self.teardown() 670 671 672 def read_watchdog(self): 673 while self.wd_reset: 674 time.sleep(0.25) 675 676 should_run = True 677 try: 678 while should_run and not self.wd_reset: 679 time.sleep(1) 680 681 if (time.time()-self.last_read > I2PInterfacePeer.I2P_PROBE_AFTER*2): 682 if self.i2p_tunnel_state != I2PInterfacePeer.TUNNEL_STATE_STALE: 683 RNS.log("I2P tunnel became unresponsive", RNS.LOG_DEBUG) 684 685 self.i2p_tunnel_state = I2PInterfacePeer.TUNNEL_STATE_STALE 686 else: 687 self.i2p_tunnel_state = I2PInterfacePeer.TUNNEL_STATE_ACTIVE 688 689 if (time.time()-self.last_write > I2PInterfacePeer.I2P_PROBE_AFTER*1): 690 try: 691 if self.socket != None: 692 self.socket.sendall(bytes([HDLC.FLAG, HDLC.FLAG])) 693 except Exception as e: 694 RNS.log("An error ocurred while sending I2P keepalive. The contained exception was: "+str(e), RNS.LOG_ERROR) 695 self.shutdown_socket(self.socket) 696 should_run = False 697 698 if (time.time()-self.last_read > I2PInterfacePeer.I2P_READ_TIMEOUT): 699 RNS.log("I2P socket is unresponsive, restarting...", RNS.LOG_WARNING) 700 if self.socket != None: 701 try: 702 self.socket.shutdown(socket.SHUT_RDWR) 703 except Exception as e: 704 RNS.log("Error while shutting down socket for "+str(self)+": "+str(e)) 705 706 try: 707 self.socket.close() 708 except Exception as e: 709 RNS.log("Error while closing socket for "+str(self)+": "+str(e)) 710 711 should_run = False 712 713 self.wd_reset = False 714 715 finally: 716 self.wd_reset = False 717 718 def read_loop(self): 719 try: 720 self.last_read = time.time() 721 self.last_write = time.time() 722 723 wd_thread = threading.Thread(target=self.read_watchdog, daemon=True).start() 724 725 in_frame = False 726 escape = False 727 data_buffer = b"" 728 command = KISS.CMD_UNKNOWN 729 730 while True: 731 data_in = self.socket.recv(4096) 732 if len(data_in) > 0: 733 pointer = 0 734 self.last_read = time.time() 735 while pointer < len(data_in): 736 byte = data_in[pointer] 737 pointer += 1 738 739 if self.kiss_framing: 740 # Read loop for KISS framing 741 if (in_frame and byte == KISS.FEND and command == KISS.CMD_DATA): 742 in_frame = False 743 self.process_incoming(data_buffer) 744 elif (byte == KISS.FEND): 745 in_frame = True 746 command = KISS.CMD_UNKNOWN 747 data_buffer = b"" 748 elif (in_frame and len(data_buffer) < self.HW_MTU): 749 if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN): 750 # We only support one HDLC port for now, so 751 # strip off the port nibble 752 byte = byte & 0x0F 753 command = byte 754 elif (command == KISS.CMD_DATA): 755 if (byte == KISS.FESC): 756 escape = True 757 else: 758 if (escape): 759 if (byte == KISS.TFEND): 760 byte = KISS.FEND 761 if (byte == KISS.TFESC): 762 byte = KISS.FESC 763 escape = False 764 data_buffer = data_buffer+bytes([byte]) 765 766 else: 767 # Read loop for HDLC framing 768 if (in_frame and byte == HDLC.FLAG): 769 in_frame = False 770 self.process_incoming(data_buffer) 771 elif (byte == HDLC.FLAG): 772 in_frame = True 773 data_buffer = b"" 774 elif (in_frame and len(data_buffer) < self.HW_MTU): 775 if (byte == HDLC.ESC): 776 escape = True 777 else: 778 if (escape): 779 if (byte == HDLC.FLAG ^ HDLC.ESC_MASK): 780 byte = HDLC.FLAG 781 if (byte == HDLC.ESC ^ HDLC.ESC_MASK): 782 byte = HDLC.ESC 783 escape = False 784 data_buffer = data_buffer+bytes([byte]) 785 else: 786 self.online = False 787 788 self.wd_reset = True 789 time.sleep(2) 790 self.wd_reset = False 791 792 if self.initiator and not self.detached: 793 RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) 794 self.reconnect() 795 else: 796 RNS.log("Socket for remote client "+str(self)+" was closed.", RNS.LOG_VERBOSE) 797 self.teardown() 798 799 break 800 801 802 except Exception as e: 803 self.online = False 804 RNS.log("An interface error occurred for "+str(self)+", the contained exception was: "+str(e), RNS.LOG_WARNING) 805 806 if self.initiator: 807 RNS.log("Attempting to reconnect...", RNS.LOG_WARNING) 808 self.reconnect() 809 else: 810 self.teardown() 811 812 def teardown(self): 813 if self.initiator and not self.detached: 814 RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) 815 if RNS.Reticulum.panic_on_interface_error: 816 RNS.panic() 817 818 else: 819 RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE) 820 821 self.online = False 822 self.OUT = False 823 self.IN = False 824 825 if hasattr(self, "parent_interface") and self.parent_interface != None: 826 while self in self.parent_interface.spawned_interfaces: 827 self.parent_interface.spawned_interfaces.remove(self) 828 829 if self in RNS.Transport.interfaces: 830 if not self.initiator: 831 RNS.Transport.interfaces.remove(self) 832 833 834 def __str__(self): 835 return "I2PInterfacePeer["+str(self.name)+"]" 836 837 838 class I2PInterface(Interface): 839 BITRATE_GUESS = 256*1000 840 DEFAULT_IFAC_SIZE = 16 841 842 @property 843 def clients(self): 844 return len(self.spawned_interfaces) 845 846 def __init__(self, owner, configuration): 847 super().__init__() 848 849 c = Interface.get_config_obj(configuration) 850 name = c["name"] 851 rns_storagepath = c["storagepath"] 852 peers = c.as_list("peers") if "peers" in c else None 853 connectable = c.as_bool("connectable") if "connectable" in c else False 854 ifac_size = c["ifac_size"] if "ifac_size" in c else None 855 ifac_netname = c["ifac_netname"] if "ifac_netname" in c else None 856 ifac_netkey = c["ifac_netkey"] if "ifac_netkey" in c else None 857 858 self.HW_MTU = 1064 859 860 self.online = False 861 self.spawned_interfaces = [] 862 self.owner = owner 863 self.connectable = connectable 864 self.i2p_tunneled = True 865 self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL 866 867 self.b32 = None 868 self.i2p = I2PController(rns_storagepath) 869 870 self.IN = True 871 self.OUT = False 872 self.name = name 873 874 875 self.receives = True 876 self.bind_ip = "127.0.0.1" 877 self.bind_port = self.i2p.get_free_port() 878 self.address = (self.bind_ip, self.bind_port) 879 self.bitrate = I2PInterface.BITRATE_GUESS 880 self.ifac_size = ifac_size 881 self.ifac_netname = ifac_netname 882 self.ifac_netkey = ifac_netkey 883 self.supports_discovery = True 884 885 self.online = False 886 887 i2p_thread = threading.Thread(target=self.i2p.start) 888 i2p_thread.daemon = True 889 i2p_thread.start() 890 891 i2p_notready_warning = False 892 time.sleep(0.25) 893 894 if not self.i2p.ready: 895 RNS.log("I2P controller did not become available in time, waiting for controller", RNS.LOG_VERBOSE) 896 i2p_notready_warning = True 897 898 while not self.i2p.ready: 899 time.sleep(0.25) 900 901 if i2p_notready_warning == True: 902 RNS.log("I2P controller ready, continuing setup", RNS.LOG_VERBOSE) 903 904 def handlerFactory(callback): 905 def createHandler(*args, **keys): 906 return I2PInterfaceHandler(callback, *args, **keys) 907 return createHandler 908 909 ThreadingI2PServer.allow_reuse_address = True 910 self.server = ThreadingI2PServer(self.address, handlerFactory(self.incoming_connection)) 911 912 thread = threading.Thread(target=self.server.serve_forever) 913 thread.daemon = True 914 thread.start() 915 916 if self.connectable: 917 def tunnel_job(): 918 while True: 919 try: 920 if not self.i2p.server_tunnel(self): 921 RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR) 922 self.online = False 923 924 except Exception as e: 925 RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR) 926 RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR) 927 928 time.sleep(15) 929 930 931 thread = threading.Thread(target=tunnel_job) 932 thread.daemon = True 933 thread.start() 934 935 if peers != None: 936 for peer_addr in peers: 937 interface_name = self.name+" to "+peer_addr 938 peer_interface = I2PInterfacePeer(self, self.owner, interface_name, peer_addr) 939 peer_interface.OUT = True 940 peer_interface.IN = True 941 peer_interface.parent_interface = self 942 peer_interface.parent_count = False 943 RNS.Transport.interfaces.append(peer_interface) 944 945 def incoming_connection(self, handler): 946 RNS.log("Accepting incoming I2P connection", RNS.LOG_VERBOSE) 947 interface_name = "Connected peer on "+self.name 948 spawned_interface = I2PInterfacePeer(self, self.owner, interface_name, connected_socket=handler.request) 949 spawned_interface.OUT = True 950 spawned_interface.IN = True 951 spawned_interface.parent_interface = self 952 spawned_interface.online = True 953 spawned_interface.bitrate = self.bitrate 954 955 spawned_interface.ifac_size = self.ifac_size 956 spawned_interface.ifac_netname = self.ifac_netname 957 spawned_interface.ifac_netkey = self.ifac_netkey 958 if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None: 959 ifac_origin = b"" 960 if spawned_interface.ifac_netname != None: 961 ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8")) 962 if spawned_interface.ifac_netkey != None: 963 ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8")) 964 965 ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) 966 spawned_interface.ifac_key = RNS.Cryptography.hkdf( 967 length=64, 968 derive_from=ifac_origin_hash, 969 salt=RNS.Reticulum.IFAC_SALT, 970 context=None 971 ) 972 spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key) 973 spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key)) 974 975 spawned_interface.announce_rate_target = self.announce_rate_target 976 spawned_interface.announce_rate_grace = self.announce_rate_grace 977 spawned_interface.announce_rate_penalty = self.announce_rate_penalty 978 spawned_interface.mode = self.mode 979 spawned_interface.HW_MTU = self.HW_MTU 980 RNS.log("Spawned new I2PInterface Peer: "+str(spawned_interface), RNS.LOG_VERBOSE) 981 RNS.Transport.interfaces.append(spawned_interface) 982 while spawned_interface in self.spawned_interfaces: 983 self.spawned_interfaces.remove(spawned_interface) 984 self.spawned_interfaces.append(spawned_interface) 985 spawned_interface.read_loop() 986 987 def process_outgoing(self, data): 988 pass 989 990 def received_announce(self, from_spawned=False): 991 if from_spawned: self.ia_freq_deque.append(time.time()) 992 993 def sent_announce(self, from_spawned=False): 994 if from_spawned: self.oa_freq_deque.append(time.time()) 995 996 def detach(self): 997 RNS.log("Detaching "+str(self), RNS.LOG_DEBUG) 998 self.i2p.stop() 999 1000 def __str__(self): 1001 return "I2PInterface["+self.name+"]" 1002 1003 class I2PInterfaceHandler(socketserver.BaseRequestHandler): 1004 def __init__(self, callback, *args, **keys): 1005 self.callback = callback 1006 socketserver.BaseRequestHandler.__init__(self, *args, **keys) 1007 1008 def handle(self): 1009 self.callback(handler=self)