/ RNS / Interfaces / I2PInterface.py
I2PInterface.py
   1  # Reticulum License
   2  #
   3  # Copyright (c) 2016-2025 Mark Qvist
   4  #
   5  # Permission is hereby granted, free of charge, to any person obtaining a copy
   6  # of this software and associated documentation files (the "Software"), to deal
   7  # in the Software without restriction, including without limitation the rights
   8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
   9  # copies of the Software, and to permit persons to whom the Software is
  10  # furnished to do so, subject to the following conditions:
  11  #
  12  # - The Software shall not be used in any kind of system which includes amongst
  13  #   its functions the ability to purposefully do harm to human beings.
  14  #
  15  # - The Software shall not be used, directly or indirectly, in the creation of
  16  #   an artificial intelligence, machine learning or language model training
  17  #   dataset, including but not limited to any use that contributes to the
  18  #   training or development of such a model or algorithm.
  19  #
  20  # - The above copyright notice and this permission notice shall be included in
  21  #   all copies or substantial portions of the Software.
  22  #
  23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  29  # SOFTWARE.
  30  
  31  from RNS.Interfaces.Interface import Interface
  32  import socketserver
  33  import threading
  34  import platform
  35  import socket
  36  import time
  37  import sys
  38  import os
  39  import RNS
  40  import asyncio
  41  
  42  class HDLC():
  43      FLAG              = 0x7E
  44      ESC               = 0x7D
  45      ESC_MASK          = 0x20
  46  
  47      @staticmethod
  48      def escape(data):
  49          data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK]))
  50          data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK]))
  51          return data
  52  
  53  class KISS():
  54      FEND              = 0xC0
  55      FESC              = 0xDB
  56      TFEND             = 0xDC
  57      TFESC             = 0xDD
  58      CMD_DATA          = 0x00
  59      CMD_UNKNOWN       = 0xFE
  60  
  61      @staticmethod
  62      def escape(data):
  63          data = data.replace(bytes([0xdb]), bytes([0xdb, 0xdd]))
  64          data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc]))
  65          return data
  66  
  67  # TODO: Neater shutdown of the event loop and
  68  # better error handling is needed. Sometimes
  69  # errors occur in I2P that leave tunnel setup
  70  # hanging indefinitely, and right now we have
  71  # no way of catching it. Sometimes the server
  72  # and client tasks are also not cancelled on
  73  # shutdown, which leads to errors dumped to
  74  # the console. This should also be remedied.
  75  
  76  class I2PController:
  77      def __init__(self, rns_storagepath):
  78          import RNS.vendor.i2plib as i2plib
  79          import RNS.vendor.i2plib.utils
  80  
  81          self.client_tunnels = {}
  82          self.server_tunnels = {}
  83          self.i2plib_tunnels = {}
  84          self.loop = None
  85          self.i2plib = i2plib
  86          self.utils = i2plib.utils
  87          self.sam_address = i2plib.get_sam_address()
  88          self.ready = False
  89  
  90          self.storagepath = rns_storagepath+"/i2p"
  91          if not os.path.isdir(self.storagepath):
  92              os.makedirs(self.storagepath)
  93  
  94  
  95      def start(self):
  96          asyncio.set_event_loop(asyncio.new_event_loop())
  97          self.loop = asyncio.get_event_loop()
  98  
  99          time.sleep(0.10)
 100          if self.loop == None:
 101              RNS.log("Could not get event loop for "+str(self)+", waiting for event loop to appear", RNS.LOG_VERBOSE)
 102  
 103          while self.loop == None:
 104              self.loop = asyncio.get_event_loop()
 105              sleep(0.25)
 106  
 107          try:
 108              self.ready = True
 109              self.loop.run_forever()
 110          except Exception as e:
 111              self.ready = False
 112              RNS.log("Exception on event loop for "+str(self)+": "+str(e), RNS.LOG_ERROR)
 113          finally:
 114              self.loop.close()
 115  
 116  
 117      def stop(self):
 118          for i2ptunnel in self.i2plib_tunnels:
 119              if hasattr(i2ptunnel, "stop") and callable(i2ptunnel.stop):
 120                  i2ptunnel.stop()
 121  
 122          if hasattr(asyncio.Task, "all_tasks") and callable(asyncio.Task.all_tasks):
 123              for task in asyncio.Task.all_tasks(loop=self.loop):
 124                  task.cancel()
 125  
 126          time.sleep(0.2)
 127  
 128          self.loop.stop()
 129  
 130  
 131      def get_free_port(self):
 132          return self.i2plib.utils.get_free_port()
 133  
 134  
 135      def stop_tunnel(self, i2ptunnel):
 136          if hasattr(i2ptunnel, "stop") and callable(i2ptunnel.stop):
 137              i2ptunnel.stop()
 138  
 139      def client_tunnel(self, owner, i2p_destination):
 140          self.client_tunnels[i2p_destination] = False
 141          self.i2plib_tunnels[i2p_destination] = None
 142  
 143          while True:
 144              if not self.client_tunnels[i2p_destination]:
 145                  try:
 146                      async def tunnel_up():
 147                          RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO)
 148                          tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop)
 149                          self.i2plib_tunnels[i2p_destination] = tunnel
 150                          await tunnel.run()
 151  
 152                      self.loop.ext_owner = self
 153                      result = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
 154                      
 155                      if not i2p_destination in self.i2plib_tunnels:
 156                          raise IOError("No tunnel control instance was created")
 157  
 158                      else: 
 159                          tn = self.i2plib_tunnels[i2p_destination]
 160                          if tn != None and hasattr(tn, "status"):
 161  
 162                              RNS.log("Waiting for status from I2P control process", RNS.LOG_EXTREME)
 163                              while not tn.status["setup_ran"]:
 164                                  time.sleep(0.1)
 165                              RNS.log("Got status from I2P control process", RNS.LOG_EXTREME)
 166  
 167                              if tn.status["setup_failed"]:
 168                                  self.stop_tunnel(tn)
 169                                  raise tn.status["exception"]
 170  
 171                              else:
 172                                  if owner.socket != None:
 173                                      if hasattr(owner.socket, "close"):
 174                                          if callable(owner.socket.close):
 175                                              try:
 176                                                  owner.socket.shutdown(socket.SHUT_RDWR)
 177                                              except Exception as e:
 178                                                  RNS.log("Error while shutting down socket for "+str(owner)+": "+str(e))
 179  
 180                                              try:
 181                                                  owner.socket.close()
 182                                              except Exception as e:
 183                                                  RNS.log("Error while closing socket for "+str(owner)+": "+str(e))
 184                                  self.client_tunnels[i2p_destination] = True
 185                                  owner.awaiting_i2p_tunnel = False
 186  
 187                                  RNS.log(str(owner)+" tunnel setup complete", RNS.LOG_VERBOSE)
 188  
 189                          else:
 190                              raise IOError("Got no status response from SAM API")
 191  
 192                  except ConnectionRefusedError as e:
 193                      raise e
 194                      
 195                  except ConnectionAbortedError as e:
 196                      raise e
 197  
 198                  except Exception as e:
 199                      RNS.log("Unexpected error type from I2P SAM: "+str(e), RNS.LOG_ERROR)
 200                      raise e
 201  
 202              else:
 203                  i2ptunnel = self.i2plib_tunnels[i2p_destination]
 204                  if hasattr(i2ptunnel, "status"):
 205                      i2p_exception = i2ptunnel.status["exception"]
 206  
 207                      if i2ptunnel.status["setup_ran"] == False:
 208                          RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR)
 209  
 210                          self.stop_tunnel(i2ptunnel)
 211                          return False
 212  
 213                      elif i2p_exception != None:
 214                          RNS.log("An error ocurred while setting up I2P tunnel to "+str(i2p_destination), RNS.LOG_ERROR)
 215  
 216                          if isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.CantReachPeer):
 217                              RNS.log("The I2P daemon can't reach peer "+str(i2p_destination), RNS.LOG_ERROR)
 218  
 219                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedDest):
 220                              RNS.log("The I2P daemon reported that the destination is already in use", RNS.LOG_ERROR)
 221  
 222                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedId):
 223                              RNS.log("The I2P daemon reported that the ID is arleady in use", RNS.LOG_ERROR)
 224  
 225                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidId):
 226                              RNS.log("The I2P daemon reported that the stream session ID doesn't exist", RNS.LOG_ERROR)
 227  
 228                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidKey):
 229                              RNS.log("The I2P daemon reported that the key for "+str(i2p_destination)+" is invalid", RNS.LOG_ERROR)
 230  
 231                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.KeyNotFound):
 232                              RNS.log("The I2P daemon could not find the key for "+str(i2p_destination), RNS.LOG_ERROR)
 233                          
 234                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.PeerNotFound):
 235                              RNS.log("The I2P daemon mould not find the peer "+str(i2p_destination), RNS.LOG_ERROR)
 236                          
 237                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.I2PError):
 238                              RNS.log("The I2P daemon experienced an unspecified error", RNS.LOG_ERROR)
 239                          
 240                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.Timeout):
 241                              RNS.log("I2P daemon timed out while setting up client tunnel to "+str(i2p_destination), RNS.LOG_ERROR)
 242  
 243                          RNS.log("Resetting I2P tunnel and retrying later", RNS.LOG_ERROR)
 244  
 245                          self.stop_tunnel(i2ptunnel)
 246                          return False
 247  
 248                      elif i2ptunnel.status["setup_failed"] == True:
 249                          RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR)
 250  
 251                          self.stop_tunnel(i2ptunnel)
 252                          return False
 253  
 254                  else:
 255                      RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR)
 256  
 257                      self.stop_tunnel(i2ptunnel)
 258                      return False
 259  
 260              # Wait for status from I2P control process
 261              time.sleep(5)
 262  
 263  
 264      def server_tunnel(self, owner):
 265          while RNS.Transport.identity == None:
 266              time.sleep(1)
 267  
 268          # Old format
 269          i2p_dest_hash_of = RNS.Identity.full_hash(RNS.Identity.full_hash(owner.name.encode("utf-8")))
 270          i2p_keyfile_of   = self.storagepath+"/"+RNS.hexrep(i2p_dest_hash_of, delimit=False)+".i2p"
 271  
 272          # New format
 273          i2p_dest_hash_nf = RNS.Identity.full_hash(RNS.Identity.full_hash(owner.name.encode("utf-8"))+RNS.Identity.full_hash(RNS.Transport.identity.hash))
 274          i2p_keyfile_nf   = self.storagepath+"/"+RNS.hexrep(i2p_dest_hash_nf, delimit=False)+".i2p"
 275  
 276          # Use old format if a key is already present
 277          if os.path.isfile(i2p_keyfile_of):
 278              i2p_keyfile = i2p_keyfile_of
 279          else:
 280              i2p_keyfile = i2p_keyfile_nf
 281  
 282          i2p_dest = None
 283          if not os.path.isfile(i2p_keyfile):
 284              coro = self.i2plib.new_destination(sam_address=self.sam_address, loop=self.loop)
 285              i2p_dest = asyncio.run_coroutine_threadsafe(coro, self.loop).result()
 286              key_file = open(i2p_keyfile, "w")
 287              key_file.write(i2p_dest.private_key.base64)
 288              key_file.close()
 289          else:
 290              key_file = open(i2p_keyfile, "r")
 291              prvd = key_file.read()
 292              key_file.close()
 293              i2p_dest = self.i2plib.Destination(data=prvd, has_private_key=True)
 294  
 295          i2p_b32 = i2p_dest.base32
 296          owner.b32 = i2p_b32
 297  
 298          self.server_tunnels[i2p_b32] = False
 299          self.i2plib_tunnels[i2p_b32] = None
 300  
 301          while True:
 302              if self.server_tunnels[i2p_b32] == False:
 303                  try:
 304                      async def tunnel_up():
 305                          RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO)
 306                          tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address)
 307                          self.i2plib_tunnels[i2p_b32] = tunnel
 308                          await tunnel.run()
 309                          owner.online = True
 310                          RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)
 311  
 312                      asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
 313                      self.server_tunnels[i2p_b32] = True
 314  
 315                  except Exception as e:
 316                      raise e
 317  
 318              else:
 319                  i2ptunnel = self.i2plib_tunnels[i2p_b32]
 320                  if hasattr(i2ptunnel, "status"):
 321                      i2p_exception = i2ptunnel.status["exception"]
 322  
 323                      if i2ptunnel.status["setup_ran"] == False:
 324                          RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR)
 325  
 326                          self.stop_tunnel(i2ptunnel)
 327                          return False
 328  
 329                      elif i2p_exception != None:
 330                          RNS.log("An error ocurred while setting up I2P tunnel", RNS.LOG_ERROR)
 331                          
 332                          if isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.CantReachPeer):
 333                              RNS.log("The I2P daemon can't reach peer "+str(i2p_destination), RNS.LOG_ERROR)
 334  
 335                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedDest):
 336                              RNS.log("The I2P daemon reported that the destination is already in use", RNS.LOG_ERROR)
 337  
 338                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.DuplicatedId):
 339                              RNS.log("The I2P daemon reported that the ID is arleady in use", RNS.LOG_ERROR)
 340  
 341                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidId):
 342                              RNS.log("The I2P daemon reported that the stream session ID doesn't exist", RNS.LOG_ERROR)
 343  
 344                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.InvalidKey):
 345                              RNS.log("The I2P daemon reported that the key for "+str(i2p_destination)+" is invalid", RNS.LOG_ERROR)
 346  
 347                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.KeyNotFound):
 348                              RNS.log("The I2P daemon could not find the key for "+str(i2p_destination), RNS.LOG_ERROR)
 349                          
 350                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.PeerNotFound):
 351                              RNS.log("The I2P daemon mould not find the peer "+str(i2p_destination), RNS.LOG_ERROR)
 352                          
 353                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.I2PError):
 354                              RNS.log("The I2P daemon experienced an unspecified error", RNS.LOG_ERROR)
 355                          
 356                          elif isinstance(i2p_exception, RNS.vendor.i2plib.exceptions.Timeout):
 357                              RNS.log("I2P daemon timed out while setting up client tunnel to "+str(i2p_destination), RNS.LOG_ERROR)
 358  
 359                          RNS.log("Resetting I2P tunnel and retrying later", RNS.LOG_ERROR)
 360  
 361                          self.stop_tunnel(i2ptunnel)
 362                          return False
 363  
 364                      elif i2ptunnel.status["setup_failed"] == True:
 365                          RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR)
 366  
 367                          self.stop_tunnel(i2ptunnel)
 368                          return False
 369  
 370                  else:
 371                      RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR)
 372  
 373                      self.stop_tunnel(i2ptunnel)
 374                      return False
 375  
 376              time.sleep(5)
 377  
 378      def get_loop(self):
 379          return asyncio.get_event_loop()
 380  
 381  
 382  class ThreadingI2PServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
 383      pass
 384  
 385  class I2PInterfacePeer(Interface):
 386      RECONNECT_WAIT = 15
 387      RECONNECT_MAX_TRIES = None
 388  
 389      # TCP socket options
 390      I2P_USER_TIMEOUT = 45
 391      I2P_PROBE_AFTER = 10
 392      I2P_PROBE_INTERVAL = 9
 393      I2P_PROBES = 5
 394      I2P_READ_TIMEOUT = (I2P_PROBE_INTERVAL * I2P_PROBES + I2P_PROBE_AFTER)*2
 395  
 396      TUNNEL_STATE_INIT    = 0x00
 397      TUNNEL_STATE_ACTIVE  = 0x01
 398      TUNNEL_STATE_STALE   = 0x02
 399  
 400      def __init__(self, parent_interface, owner, name, target_i2p_dest=None, connected_socket=None, max_reconnect_tries=None):
 401          super().__init__()
 402  
 403          self.HW_MTU = 1064
 404          
 405          self.IN               = True
 406          self.OUT              = False
 407          self.socket           = None
 408          self.parent_interface = parent_interface
 409          self.parent_count     = True
 410          self.name             = name
 411          self.initiator        = False
 412          self.reconnecting     = False
 413          self.never_connected  = True
 414          self.owner            = owner
 415          self.writing          = False
 416          self.online           = False
 417          self.detached         = False
 418          self.kiss_framing     = False
 419          self.i2p_tunneled     = True
 420          self.i2p_dest         = None
 421          self.i2p_tunnel_ready = False
 422          self.mode             = RNS.Interfaces.Interface.Interface.MODE_FULL
 423          self.bitrate          = I2PInterface.BITRATE_GUESS
 424          self.last_read        = 0
 425          self.last_write       = 0
 426          self.wd_reset         = False
 427          self.i2p_tunnel_state = I2PInterfacePeer.TUNNEL_STATE_INIT
 428  
 429          self.ifac_size = self.parent_interface.ifac_size
 430          self.ifac_netname = self.parent_interface.ifac_netname
 431          self.ifac_netkey = self.parent_interface.ifac_netkey
 432          if self.ifac_netname != None or self.ifac_netkey != None:
 433              ifac_origin = b""
 434              if self.ifac_netname != None:
 435                  ifac_origin += RNS.Identity.full_hash(self.ifac_netname.encode("utf-8"))
 436              if self.ifac_netkey != None:
 437                  ifac_origin += RNS.Identity.full_hash(self.ifac_netkey.encode("utf-8"))
 438  
 439              ifac_origin_hash = RNS.Identity.full_hash(ifac_origin)
 440              self.ifac_key = RNS.Cryptography.hkdf(
 441                  length=64,
 442                  derive_from=ifac_origin_hash,
 443                  salt=RNS.Reticulum.IFAC_SALT,
 444                  context=None
 445              )
 446              self.ifac_identity = RNS.Identity.from_bytes(self.ifac_key)
 447              self.ifac_signature = self.ifac_identity.sign(RNS.Identity.full_hash(self.ifac_key))
 448  
 449          self.announce_rate_target  = None
 450          self.announce_rate_grace   = None
 451          self.announce_rate_penalty = None
 452  
 453          if max_reconnect_tries == None:
 454              self.max_reconnect_tries = I2PInterfacePeer.RECONNECT_MAX_TRIES
 455          else:
 456              self.max_reconnect_tries = max_reconnect_tries
 457  
 458          if connected_socket != None:
 459              self.receives    = True
 460              self.target_ip   = None
 461              self.target_port = None
 462              self.socket      = connected_socket
 463  
 464              if platform.system() == "Linux":
 465                  self.set_timeouts_linux()
 466              elif platform.system() == "Darwin":
 467                  self.set_timeouts_osx()
 468  
 469          elif target_i2p_dest != None:
 470              self.receives    = True
 471              self.initiator   = True
 472  
 473              self.bind_ip     = "127.0.0.1"
 474  
 475              self.awaiting_i2p_tunnel = True
 476  
 477              def tunnel_job():
 478                  while self.awaiting_i2p_tunnel:
 479                      try:
 480                          self.bind_port   = self.parent_interface.i2p.get_free_port()
 481                          self.local_addr  = (self.bind_ip, self.bind_port)
 482                          self.target_ip = self.bind_ip
 483                          self.target_port = self.bind_port
 484  
 485                          if not self.parent_interface.i2p.client_tunnel(self, target_i2p_dest):
 486                              RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR)
 487                              self.awaiting_i2p_tunnel = True
 488  
 489                      except Exception as e:
 490                          RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR)
 491                          RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR)
 492  
 493                      time.sleep(8)
 494  
 495              thread = threading.Thread(target=tunnel_job)
 496              thread.daemon = True
 497              thread.start()
 498  
 499              def wait_job():
 500                  while self.awaiting_i2p_tunnel:
 501                      time.sleep(0.25)
 502                  time.sleep(2)
 503                  
 504                  if not self.kiss_framing:
 505                      self.wants_tunnel = True
 506  
 507                  if not self.connect(initial=True):
 508                      thread = threading.Thread(target=self.reconnect)
 509                      thread.daemon = True
 510                      thread.start()
 511                  else:
 512                      thread = threading.Thread(target=self.read_loop)
 513                      thread.daemon = True
 514                      thread.start()
 515  
 516              thread = threading.Thread(target=wait_job)
 517              thread.daemon = True
 518              thread.start()
 519  
 520  
 521      def set_timeouts_linux(self):
 522          self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(I2PInterfacePeer.I2P_USER_TIMEOUT * 1000))
 523          self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
 524          self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER))
 525          self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(I2PInterfacePeer.I2P_PROBE_INTERVAL))
 526          self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(I2PInterfacePeer.I2P_PROBES))
 527  
 528      def set_timeouts_osx(self):
 529          if hasattr(socket, "TCP_KEEPALIVE"):
 530              TCP_KEEPIDLE = socket.TCP_KEEPALIVE
 531          else:
 532              TCP_KEEPIDLE = 0x10
 533  
 534          self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
 535          self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER))
 536      
 537      def shutdown_socket(self, target_socket):
 538          if callable(target_socket.close):
 539              try:
 540                  if socket != None:
 541                      target_socket.shutdown(socket.SHUT_RDWR)
 542              except Exception as e:
 543                  RNS.log("Error while shutting down socket for "+str(self)+": "+str(e))
 544  
 545              try:
 546                  if socket != None:
 547                      target_socket.close()
 548              except Exception as e:
 549                  RNS.log("Error while closing socket for "+str(self)+": "+str(e))    
 550      
 551      def detach(self):
 552          RNS.log("Detaching "+str(self), RNS.LOG_DEBUG)
 553          if self.socket != None:
 554              if hasattr(self.socket, "close"):
 555                  if callable(self.socket.close):
 556                      self.detached = True
 557                      
 558                      try:
 559                          self.socket.shutdown(socket.SHUT_RDWR)
 560                      except Exception as e:
 561                          RNS.log("Error while shutting down socket for "+str(self)+": "+str(e))
 562  
 563                      try:
 564                          self.socket.close()
 565                      except Exception as e:
 566                          RNS.log("Error while closing socket for "+str(self)+": "+str(e))
 567  
 568                      self.socket = None
 569  
 570      def connect(self, initial=False):
 571          try:
 572              self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 573              self.socket.connect((self.target_ip, self.target_port))
 574              self.online  = True
 575          
 576          except Exception as e:
 577              if initial:
 578                  if not self.awaiting_i2p_tunnel:
 579                      RNS.log("Initial connection for "+str(self)+" could not be established: "+str(e), RNS.LOG_ERROR)
 580                      RNS.log("Leaving unconnected and retrying connection in "+str(I2PInterfacePeer.RECONNECT_WAIT)+" seconds.", RNS.LOG_ERROR)
 581  
 582                  return False
 583              
 584              else:
 585                  raise e
 586  
 587          if platform.system() == "Linux":
 588              self.set_timeouts_linux()
 589          elif platform.system() == "Darwin":
 590              self.set_timeouts_osx()
 591          
 592          self.online  = True
 593          self.writing = False
 594          self.never_connected = False
 595  
 596          if not self.kiss_framing and self.wants_tunnel:
 597              RNS.Transport.synthesize_tunnel(self)
 598  
 599          return True
 600  
 601      def reconnect(self):
 602          if self.initiator:
 603              if not self.reconnecting:
 604                  self.reconnecting = True
 605                  attempts = 0
 606                  while not self.online:
 607                      time.sleep(I2PInterfacePeer.RECONNECT_WAIT)
 608                      attempts += 1
 609  
 610                      if self.max_reconnect_tries != None and attempts > self.max_reconnect_tries:
 611                          RNS.log("Max reconnection attempts reached for "+str(self), RNS.LOG_ERROR)
 612                          self.teardown()
 613                          break
 614  
 615                      try:
 616                          self.connect()
 617  
 618                      except Exception as e:
 619                          if not self.awaiting_i2p_tunnel:
 620                              RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG)
 621                          else:
 622                              RNS.log(str(self)+" still waiting for I2P tunnel to appear", RNS.LOG_VERBOSE)
 623  
 624                  if not self.never_connected:
 625                      RNS.log(str(self)+" Re-established connection via I2P tunnel", RNS.LOG_INFO)
 626  
 627                  self.reconnecting = False
 628                  thread = threading.Thread(target=self.read_loop)
 629                  thread.daemon = True
 630                  thread.start()
 631                  if not self.kiss_framing:
 632                      RNS.Transport.synthesize_tunnel(self)
 633  
 634          else:
 635              RNS.log("Attempt to reconnect on a non-initiator I2P interface. This should not happen.", RNS.LOG_ERROR)
 636              raise IOError("Attempt to reconnect on a non-initiator I2P interface")
 637  
 638      def process_incoming(self, data):
 639          self.rxb += len(data)
 640          if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count:
 641              self.parent_interface.rxb += len(data)
 642                      
 643          self.owner.inbound(data, self)
 644  
 645      def process_outgoing(self, data):
 646          if self.online:
 647              while self.writing:
 648                  time.sleep(0.001)
 649  
 650              try:
 651                  self.writing = True
 652  
 653                  if self.kiss_framing:
 654                      data = bytes([KISS.FEND])+bytes([KISS.CMD_DATA])+KISS.escape(data)+bytes([KISS.FEND])
 655                  else:
 656                      data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
 657  
 658                  self.socket.sendall(data)
 659                  self.writing = False
 660                  self.txb += len(data)
 661                  self.last_write = time.time()
 662                  
 663                  if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count:
 664                      self.parent_interface.txb += len(data)
 665  
 666              except Exception as e:
 667                  RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR)
 668                  RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
 669                  self.teardown()
 670  
 671  
 672      def read_watchdog(self):
 673          while self.wd_reset:
 674              time.sleep(0.25)
 675  
 676          should_run = True
 677          try:
 678              while should_run and not self.wd_reset:
 679                  time.sleep(1)
 680  
 681                  if (time.time()-self.last_read > I2PInterfacePeer.I2P_PROBE_AFTER*2):
 682                      if self.i2p_tunnel_state != I2PInterfacePeer.TUNNEL_STATE_STALE:
 683                          RNS.log("I2P tunnel became unresponsive", RNS.LOG_DEBUG)
 684  
 685                      self.i2p_tunnel_state = I2PInterfacePeer.TUNNEL_STATE_STALE
 686                  else:
 687                      self.i2p_tunnel_state = I2PInterfacePeer.TUNNEL_STATE_ACTIVE
 688  
 689                  if (time.time()-self.last_write > I2PInterfacePeer.I2P_PROBE_AFTER*1):
 690                      try:
 691                          if self.socket != None:
 692                              self.socket.sendall(bytes([HDLC.FLAG, HDLC.FLAG]))
 693                      except Exception as e:
 694                          RNS.log("An error ocurred while sending I2P keepalive. The contained exception was: "+str(e), RNS.LOG_ERROR)
 695                          self.shutdown_socket(self.socket)
 696                          should_run = False
 697                  
 698                  if (time.time()-self.last_read > I2PInterfacePeer.I2P_READ_TIMEOUT):
 699                      RNS.log("I2P socket is unresponsive, restarting...", RNS.LOG_WARNING)
 700                      if self.socket != None:
 701                          try:
 702                              self.socket.shutdown(socket.SHUT_RDWR)
 703                          except Exception as e:
 704                              RNS.log("Error while shutting down socket for "+str(self)+": "+str(e))
 705  
 706                          try:
 707                              self.socket.close()
 708                          except Exception as e:
 709                              RNS.log("Error while closing socket for "+str(self)+": "+str(e))
 710  
 711                      should_run = False
 712  
 713                  self.wd_reset = False
 714  
 715          finally:
 716              self.wd_reset = False
 717  
 718      def read_loop(self):
 719          try:
 720              self.last_read  = time.time()
 721              self.last_write = time.time()
 722  
 723              wd_thread = threading.Thread(target=self.read_watchdog, daemon=True).start()
 724  
 725              in_frame = False
 726              escape = False
 727              data_buffer = b""
 728              command = KISS.CMD_UNKNOWN
 729  
 730              while True:
 731                  data_in = self.socket.recv(4096)
 732                  if len(data_in) > 0:
 733                      pointer = 0
 734                      self.last_read = time.time()
 735                      while pointer < len(data_in):
 736                          byte = data_in[pointer]
 737                          pointer += 1
 738  
 739                          if self.kiss_framing:
 740                              # Read loop for KISS framing
 741                              if (in_frame and byte == KISS.FEND and command == KISS.CMD_DATA):
 742                                  in_frame = False
 743                                  self.process_incoming(data_buffer)
 744                              elif (byte == KISS.FEND):
 745                                  in_frame = True
 746                                  command = KISS.CMD_UNKNOWN
 747                                  data_buffer = b""
 748                              elif (in_frame and len(data_buffer) < self.HW_MTU):
 749                                  if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN):
 750                                      # We only support one HDLC port for now, so
 751                                      # strip off the port nibble
 752                                      byte = byte & 0x0F
 753                                      command = byte
 754                                  elif (command == KISS.CMD_DATA):
 755                                      if (byte == KISS.FESC):
 756                                          escape = True
 757                                      else:
 758                                          if (escape):
 759                                              if (byte == KISS.TFEND):
 760                                                  byte = KISS.FEND
 761                                              if (byte == KISS.TFESC):
 762                                                  byte = KISS.FESC
 763                                              escape = False
 764                                          data_buffer = data_buffer+bytes([byte])
 765  
 766                          else:
 767                              # Read loop for HDLC framing
 768                              if (in_frame and byte == HDLC.FLAG):
 769                                  in_frame = False
 770                                  self.process_incoming(data_buffer)
 771                              elif (byte == HDLC.FLAG):
 772                                  in_frame = True
 773                                  data_buffer = b""
 774                              elif (in_frame and len(data_buffer) < self.HW_MTU):
 775                                  if (byte == HDLC.ESC):
 776                                      escape = True
 777                                  else:
 778                                      if (escape):
 779                                          if (byte == HDLC.FLAG ^ HDLC.ESC_MASK):
 780                                              byte = HDLC.FLAG
 781                                          if (byte == HDLC.ESC  ^ HDLC.ESC_MASK):
 782                                              byte = HDLC.ESC
 783                                          escape = False
 784                                      data_buffer = data_buffer+bytes([byte])
 785                  else:
 786                      self.online = False
 787  
 788                      self.wd_reset = True
 789                      time.sleep(2)
 790                      self.wd_reset = False
 791  
 792                      if self.initiator and not self.detached:
 793                          RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING)
 794                          self.reconnect()
 795                      else:
 796                          RNS.log("Socket for remote client "+str(self)+" was closed.", RNS.LOG_VERBOSE)
 797                          self.teardown()
 798  
 799                      break
 800  
 801                  
 802          except Exception as e:
 803              self.online = False
 804              RNS.log("An interface error occurred for "+str(self)+", the contained exception was: "+str(e), RNS.LOG_WARNING)
 805  
 806              if self.initiator:
 807                  RNS.log("Attempting to reconnect...", RNS.LOG_WARNING)
 808                  self.reconnect()
 809              else:
 810                  self.teardown()
 811  
 812      def teardown(self):
 813          if self.initiator and not self.detached:
 814              RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR)
 815              if RNS.Reticulum.panic_on_interface_error:
 816                  RNS.panic()
 817  
 818          else:
 819              RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE)
 820  
 821          self.online = False
 822          self.OUT = False
 823          self.IN = False
 824  
 825          if hasattr(self, "parent_interface") and self.parent_interface != None:
 826              while self in self.parent_interface.spawned_interfaces:
 827                  self.parent_interface.spawned_interfaces.remove(self)
 828  
 829          if self in RNS.Transport.interfaces:
 830              if not self.initiator:
 831                  RNS.Transport.interfaces.remove(self)
 832  
 833  
 834      def __str__(self):
 835          return "I2PInterfacePeer["+str(self.name)+"]"
 836  
 837  
 838  class I2PInterface(Interface):
 839      BITRATE_GUESS      = 256*1000
 840      DEFAULT_IFAC_SIZE  = 16
 841  
 842      @property
 843      def clients(self):
 844          return len(self.spawned_interfaces)
 845  
 846      def __init__(self, owner, configuration):
 847          super().__init__()
 848  
 849          c = Interface.get_config_obj(configuration)
 850          name = c["name"]
 851          rns_storagepath = c["storagepath"]
 852          peers = c.as_list("peers") if "peers" in c else None
 853          connectable = c.as_bool("connectable") if "connectable" in c else False
 854          ifac_size = c["ifac_size"] if "ifac_size" in c else None
 855          ifac_netname = c["ifac_netname"] if "ifac_netname" in c else None
 856          ifac_netkey = c["ifac_netkey"] if "ifac_netkey" in c else None
 857          
 858          self.HW_MTU = 1064
 859  
 860          self.online = False
 861          self.spawned_interfaces = []
 862          self.owner = owner
 863          self.connectable = connectable
 864          self.i2p_tunneled = True
 865          self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL
 866  
 867          self.b32 = None
 868          self.i2p = I2PController(rns_storagepath)
 869  
 870          self.IN  = True
 871          self.OUT = False
 872          self.name = name
 873  
 874  
 875          self.receives = True
 876          self.bind_ip     = "127.0.0.1"
 877          self.bind_port   = self.i2p.get_free_port()
 878          self.address = (self.bind_ip, self.bind_port)
 879          self.bitrate = I2PInterface.BITRATE_GUESS
 880          self.ifac_size = ifac_size
 881          self.ifac_netname = ifac_netname
 882          self.ifac_netkey = ifac_netkey
 883          self.supports_discovery = True
 884  
 885          self.online = False
 886  
 887          i2p_thread = threading.Thread(target=self.i2p.start)
 888          i2p_thread.daemon = True
 889          i2p_thread.start()
 890  
 891          i2p_notready_warning = False
 892          time.sleep(0.25)
 893  
 894          if not self.i2p.ready:
 895              RNS.log("I2P controller did not become available in time, waiting for controller", RNS.LOG_VERBOSE)
 896              i2p_notready_warning = True
 897  
 898          while not self.i2p.ready:
 899              time.sleep(0.25)
 900  
 901          if i2p_notready_warning == True:
 902              RNS.log("I2P controller ready, continuing setup", RNS.LOG_VERBOSE)
 903  
 904          def handlerFactory(callback):
 905              def createHandler(*args, **keys):
 906                  return I2PInterfaceHandler(callback, *args, **keys)
 907              return createHandler
 908          
 909          ThreadingI2PServer.allow_reuse_address = True
 910          self.server = ThreadingI2PServer(self.address, handlerFactory(self.incoming_connection))
 911  
 912          thread = threading.Thread(target=self.server.serve_forever)
 913          thread.daemon = True
 914          thread.start()
 915  
 916          if self.connectable:
 917              def tunnel_job():
 918                  while True:
 919                      try:
 920                          if not self.i2p.server_tunnel(self):
 921                              RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR)
 922                              self.online = False
 923  
 924                      except Exception as e:
 925                          RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR)
 926                          RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR)
 927  
 928                      time.sleep(15)
 929  
 930  
 931              thread = threading.Thread(target=tunnel_job)
 932              thread.daemon = True
 933              thread.start()
 934  
 935          if peers != None:
 936              for peer_addr in peers:
 937                  interface_name = self.name+" to "+peer_addr
 938                  peer_interface = I2PInterfacePeer(self, self.owner, interface_name, peer_addr)
 939                  peer_interface.OUT = True
 940                  peer_interface.IN  = True
 941                  peer_interface.parent_interface = self
 942                  peer_interface.parent_count = False
 943                  RNS.Transport.interfaces.append(peer_interface)
 944  
 945      def incoming_connection(self, handler):
 946          RNS.log("Accepting incoming I2P connection", RNS.LOG_VERBOSE)
 947          interface_name = "Connected peer on "+self.name
 948          spawned_interface = I2PInterfacePeer(self, self.owner, interface_name, connected_socket=handler.request)
 949          spawned_interface.OUT = True
 950          spawned_interface.IN  = True
 951          spawned_interface.parent_interface = self
 952          spawned_interface.online = True
 953          spawned_interface.bitrate = self.bitrate
 954  
 955          spawned_interface.ifac_size = self.ifac_size
 956          spawned_interface.ifac_netname = self.ifac_netname
 957          spawned_interface.ifac_netkey = self.ifac_netkey
 958          if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None:
 959              ifac_origin = b""
 960              if spawned_interface.ifac_netname != None:
 961                  ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8"))
 962              if spawned_interface.ifac_netkey != None:
 963                  ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8"))
 964  
 965              ifac_origin_hash = RNS.Identity.full_hash(ifac_origin)
 966              spawned_interface.ifac_key = RNS.Cryptography.hkdf(
 967                  length=64,
 968                  derive_from=ifac_origin_hash,
 969                  salt=RNS.Reticulum.IFAC_SALT,
 970                  context=None
 971              )
 972              spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key)
 973              spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key))
 974  
 975          spawned_interface.announce_rate_target = self.announce_rate_target
 976          spawned_interface.announce_rate_grace = self.announce_rate_grace
 977          spawned_interface.announce_rate_penalty = self.announce_rate_penalty
 978          spawned_interface.mode = self.mode
 979          spawned_interface.HW_MTU = self.HW_MTU
 980          RNS.log("Spawned new I2PInterface Peer: "+str(spawned_interface), RNS.LOG_VERBOSE)
 981          RNS.Transport.interfaces.append(spawned_interface)
 982          while spawned_interface in self.spawned_interfaces:
 983              self.spawned_interfaces.remove(spawned_interface)
 984          self.spawned_interfaces.append(spawned_interface)
 985          spawned_interface.read_loop()
 986  
 987      def process_outgoing(self, data):
 988          pass
 989  
 990      def received_announce(self, from_spawned=False):
 991          if from_spawned: self.ia_freq_deque.append(time.time())
 992  
 993      def sent_announce(self, from_spawned=False):
 994          if from_spawned: self.oa_freq_deque.append(time.time())
 995  
 996      def detach(self):
 997          RNS.log("Detaching "+str(self), RNS.LOG_DEBUG)
 998          self.i2p.stop()
 999  
1000      def __str__(self):
1001          return "I2PInterface["+self.name+"]"
1002  
1003  class I2PInterfaceHandler(socketserver.BaseRequestHandler):
1004      def __init__(self, callback, *args, **keys):
1005          self.callback = callback
1006          socketserver.BaseRequestHandler.__init__(self, *args, **keys)
1007  
1008      def handle(self):
1009          self.callback(handler=self)