/ RNS / Link.py
Link.py
   1  # Reticulum License
   2  #
   3  # Copyright (c) 2016-2025 Mark Qvist
   4  #
   5  # Permission is hereby granted, free of charge, to any person obtaining a copy
   6  # of this software and associated documentation files (the "Software"), to deal
   7  # in the Software without restriction, including without limitation the rights
   8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
   9  # copies of the Software, and to permit persons to whom the Software is
  10  # furnished to do so, subject to the following conditions:
  11  #
  12  # - The Software shall not be used in any kind of system which includes amongst
  13  #   its functions the ability to purposefully do harm to human beings.
  14  #
  15  # - The Software shall not be used, directly or indirectly, in the creation of
  16  #   an artificial intelligence, machine learning or language model training
  17  #   dataset, including but not limited to any use that contributes to the
  18  #   training or development of such a model or algorithm.
  19  #
  20  # - The above copyright notice and this permission notice shall be included in
  21  #   all copies or substantial portions of the Software.
  22  #
  23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  29  # SOFTWARE.
  30  
  31  from RNS.Cryptography import X25519PrivateKey, X25519PublicKey, Ed25519PrivateKey, Ed25519PublicKey
  32  from RNS.Cryptography import Token
  33  from RNS.Channel import Channel, LinkChannelOutlet
  34  
  35  from time import sleep
  36  from .vendor import umsgpack as umsgpack
  37  import threading
  38  import inspect
  39  import struct
  40  import math
  41  import time
  42  import RNS
  43  import io
  44  
  45  class LinkCallbacks:
  46      def __init__(self):
  47          self.link_established = None
  48          self.link_closed = None
  49          self.packet = None
  50          self.resource = None
  51          self.resource_started = None
  52          self.resource_concluded = None
  53          self.remote_identified = None
  54  
  55  class Link:
  56      """
  57      This class is used to establish and manage links to other peers. When a
  58      link instance is created, Reticulum will attempt to establish verified
  59      and encrypted connectivity with the specified destination.
  60  
  61      :param destination: A :ref:`RNS.Destination<api-destination>` instance which to establish a link to.
  62      :param established_callback: An optional function or method with the signature *callback(link)* to be called when the link has been established.
  63      :param closed_callback: An optional function or method with the signature *callback(link)* to be called when the link is closed.
  64      """
  65      CURVE = RNS.Identity.CURVE
  66      """
  67      The curve used for Elliptic Curve DH key exchanges
  68      """
  69  
  70      ECPUBSIZE         = 32+32
  71      KEYSIZE           = 32
  72  
  73      MDU = math.floor((RNS.Reticulum.MTU-RNS.Reticulum.IFAC_MIN_SIZE-RNS.Reticulum.HEADER_MINSIZE-RNS.Identity.TOKEN_OVERHEAD)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1
  74  
  75      ESTABLISHMENT_TIMEOUT_PER_HOP = RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT
  76      """
  77      Timeout for link establishment in seconds per hop to destination.
  78      """
  79  
  80      LINK_MTU_SIZE            = 3
  81      TRAFFIC_TIMEOUT_MIN_MS   = 5
  82      TRAFFIC_TIMEOUT_FACTOR   = 6
  83      KEEPALIVE_MAX_RTT        = 1.75
  84      KEEPALIVE_TIMEOUT_FACTOR = 4
  85      """
  86      RTT timeout factor used in link timeout calculation.
  87      """
  88      STALE_GRACE = 5
  89      """
  90      Grace period in seconds used in link timeout calculation.
  91      """
  92      KEEPALIVE_MAX = 360
  93      KEEPALIVE_MIN = 5
  94      KEEPALIVE     = KEEPALIVE_MAX
  95      """
  96      Default interval for sending keep-alive packets on established links in seconds.
  97      """
  98      STALE_FACTOR = 2
  99      STALE_TIME = STALE_FACTOR*KEEPALIVE
 100      """
 101      If no traffic or keep-alive packets are received within this period, the
 102      link will be marked as stale, and a final keep-alive packet will be sent.
 103      If after this no traffic or keep-alive packets are received within ``RTT`` *
 104      ``KEEPALIVE_TIMEOUT_FACTOR`` + ``STALE_GRACE``, the link is considered timed out,
 105      and will be torn down.
 106      """
 107  
 108      WATCHDOG_MAX_SLEEP  = 5
 109  
 110      PENDING             = 0x00
 111      HANDSHAKE           = 0x01
 112      ACTIVE              = 0x02
 113      STALE               = 0x03
 114      CLOSED              = 0x04
 115  
 116      TIMEOUT             = 0x01
 117      INITIATOR_CLOSED    = 0x02
 118      DESTINATION_CLOSED  = 0x03
 119  
 120      ACCEPT_NONE         = 0x00
 121      ACCEPT_APP          = 0x01
 122      ACCEPT_ALL          = 0x02
 123      resource_strategies = [ACCEPT_NONE, ACCEPT_APP, ACCEPT_ALL]
 124  
 125      MODE_AES128_CBC     = 0x00
 126      MODE_AES256_CBC     = 0x01
 127      MODE_AES256_GCM     = 0x02
 128      MODE_OTP_RESERVED   = 0x03
 129      MODE_PQ_RESERVED_1  = 0x04
 130      MODE_PQ_RESERVED_2  = 0x05
 131      MODE_PQ_RESERVED_3  = 0x06
 132      MODE_PQ_RESERVED_4  = 0x07
 133      ENABLED_MODES       = [MODE_AES256_CBC]
 134      MODE_DEFAULT        =  MODE_AES256_CBC
 135      MODE_DESCRIPTIONS   = {MODE_AES128_CBC: "AES_128_CBC",
 136                             MODE_AES256_CBC: "AES_256_CBC",
 137                             MODE_AES256_GCM: "MODE_AES256_GCM",
 138                             MODE_OTP_RESERVED: "MODE_OTP_RESERVED",
 139                             MODE_PQ_RESERVED_1: "MODE_PQ_RESERVED_1",
 140                             MODE_PQ_RESERVED_2: "MODE_PQ_RESERVED_2",
 141                             MODE_PQ_RESERVED_3: "MODE_PQ_RESERVED_3",
 142                             MODE_PQ_RESERVED_4: "MODE_PQ_RESERVED_4"}
 143  
 144      MTU_BYTEMASK        = 0x1FFFFF
 145      MODE_BYTEMASK       = 0xE0
 146  
 147      @staticmethod
 148      def signalling_bytes(mtu, mode):
 149          if not mode in Link.ENABLED_MODES: raise TypeError(f"Requested link mode {Link.MODE_DESCRIPTIONS[mode]} not enabled")
 150          signalling_value = (mtu & Link.MTU_BYTEMASK)+(((mode<<5) & Link.MODE_BYTEMASK)<<16)
 151          return struct.pack(">I", signalling_value)[1:]
 152  
 153      @staticmethod
 154      def mtu_from_lr_packet(packet):
 155          if len(packet.data) == Link.ECPUBSIZE+Link.LINK_MTU_SIZE:
 156              return (packet.data[Link.ECPUBSIZE] << 16) + (packet.data[Link.ECPUBSIZE+1] << 8) + (packet.data[Link.ECPUBSIZE+2]) & Link.MTU_BYTEMASK
 157          else: return None
 158  
 159      @staticmethod
 160      def mtu_from_lp_packet(packet):
 161          if len(packet.data) == RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2+Link.LINK_MTU_SIZE:
 162              mtu_bytes = packet.data[RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2:RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2+Link.LINK_MTU_SIZE]
 163              return (mtu_bytes[0] << 16) + (mtu_bytes[1] << 8) + (mtu_bytes[2]) & Link.MTU_BYTEMASK
 164          else: return None
 165  
 166      @staticmethod
 167      def mode_byte(mode):
 168          if mode in Link.ENABLED_MODES: return (mode << 5) & Link.MODE_BYTEMASK
 169          else: raise TypeError(f"Requested link mode {mode} not enabled")
 170  
 171      @staticmethod
 172      def mode_from_lr_packet(packet):
 173          if len(packet.data) > Link.ECPUBSIZE:
 174              mode = (packet.data[Link.ECPUBSIZE] & Link.MODE_BYTEMASK) >> 5
 175              return mode
 176          else: return Link.MODE_DEFAULT
 177  
 178      @staticmethod
 179      def mode_from_lp_packet(packet):
 180          if len(packet.data) > RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2:
 181              mode = packet.data[RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2] >> 5
 182              return mode
 183          else: return Link.MODE_DEFAULT
 184  
 185      @staticmethod
 186      def validate_request(owner, data, packet):
 187          if len(data) == Link.ECPUBSIZE or len(data) == Link.ECPUBSIZE+Link.LINK_MTU_SIZE:
 188              try:
 189                  link = Link(owner = owner, peer_pub_bytes=data[:Link.ECPUBSIZE//2], peer_sig_pub_bytes=data[Link.ECPUBSIZE//2:Link.ECPUBSIZE])
 190                  link.set_link_id(packet)
 191  
 192                  if len(data) == Link.ECPUBSIZE+Link.LINK_MTU_SIZE:
 193                      RNS.log("Link request includes MTU signalling", RNS.LOG_DEBUG) # TODO: Remove debug
 194                      try:
 195                          link.mtu = Link.mtu_from_lr_packet(packet) or Reticulum.MTU
 196                      except Exception as e:
 197                          RNS.trace_exception(e)
 198                          link.mtu = RNS.Reticulum.MTU
 199  
 200                  link.mode = Link.mode_from_lr_packet(packet)
 201                  
 202                  # TODO: Remove debug
 203                  RNS.log(f"Incoming link request with mode {Link.MODE_DESCRIPTIONS[link.mode]}", RNS.LOG_DEBUG)
 204  
 205                  link.update_mdu()
 206                  link.destination = packet.destination
 207                  link.establishment_timeout = Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, packet.hops) + Link.KEEPALIVE
 208                  link.establishment_cost += len(packet.raw)
 209                  RNS.log(f"Validating link request {RNS.prettyhexrep(link.link_id)}", RNS.LOG_DEBUG)
 210                  RNS.log(f"Link MTU configured to {RNS.prettysize(link.mtu)}", RNS.LOG_EXTREME)
 211                  RNS.log(f"Establishment timeout is {RNS.prettytime(link.establishment_timeout)} for incoming link request "+RNS.prettyhexrep(link.link_id), RNS.LOG_EXTREME)
 212                  link.handshake()
 213                  link.attached_interface = packet.receiving_interface
 214                  link.prove()
 215                  link.request_time = time.time()
 216                  RNS.Transport.register_link(link)
 217                  link.last_inbound = time.time()
 218                  link.__update_phy_stats(packet, force_update=True)
 219                  link.start_watchdog()
 220  
 221                  RNS.log("Incoming link request "+str(link)+" accepted on "+str(link.attached_interface), RNS.LOG_DEBUG)
 222                  return link
 223  
 224              except Exception as e:
 225                  RNS.log(f"Validating link request failed: {e}", RNS.LOG_VERBOSE)
 226                  return None
 227  
 228          else:
 229              RNS.log(f"Invalid link request payload size of {len(data)} bytes, dropping request", RNS.LOG_DEBUG)
 230              return None
 231  
 232  
 233      def __init__(self, destination=None, established_callback=None, closed_callback=None, owner=None, peer_pub_bytes=None, peer_sig_pub_bytes=None, mode=MODE_DEFAULT):
 234          if destination != None and destination.type != RNS.Destination.SINGLE: raise TypeError("Links can only be established to the \"single\" destination type")
 235          self.mode = mode
 236          self.rtt = None
 237          self.mtu = RNS.Reticulum.MTU
 238          self.establishment_cost = 0
 239          self.establishment_rate = None
 240          self.expected_rate = None
 241          self.callbacks = LinkCallbacks()
 242          self.resource_strategy = Link.ACCEPT_NONE
 243          self.last_resource_window = None
 244          self.last_resource_eifr = None
 245          self.outgoing_resources = []
 246          self.incoming_resources = []
 247          self.pending_requests   = []
 248          self.last_inbound = 0
 249          self.last_outbound = 0
 250          self.last_keepalive = 0
 251          self.last_proof = 0
 252          self.last_data = 0
 253          self.tx = 0
 254          self.rx = 0
 255          self.txbytes = 0
 256          self.rxbytes = 0
 257          self.rssi = None
 258          self.snr = None
 259          self.q = None
 260          self.traffic_timeout_factor = Link.TRAFFIC_TIMEOUT_FACTOR
 261          self.keepalive_timeout_factor = Link.KEEPALIVE_TIMEOUT_FACTOR
 262          self.keepalive = Link.KEEPALIVE
 263          self.stale_time = Link.STALE_TIME
 264          self.watchdog_lock = False
 265          self.status = Link.PENDING
 266          self.activated_at = None
 267          self.type = RNS.Destination.LINK
 268          self.owner = owner
 269          self.destination = destination
 270          self.expected_hops = None
 271          self.attached_interface = None
 272          self.__remote_identity = None
 273          self.__track_phy_stats = False
 274          self._channel = None
 275  
 276          if self.destination == None:
 277              self.initiator = False
 278              self.prv     = X25519PrivateKey.generate()
 279              self.sig_prv = self.owner.identity.sig_prv
 280          else:
 281              self.initiator = True
 282              self.expected_hops = RNS.Transport.hops_to(self.destination.hash)
 283              self.establishment_timeout  = RNS.Reticulum.get_instance().get_first_hop_timeout(destination.hash)
 284              self.establishment_timeout += Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, RNS.Transport.hops_to(destination.hash))
 285              self.prv     = X25519PrivateKey.generate()
 286              self.sig_prv = Ed25519PrivateKey.generate()
 287  
 288          self.token  = None
 289          
 290          self.pub = self.prv.public_key()
 291          self.pub_bytes = self.pub.public_bytes()
 292  
 293          self.sig_pub = self.sig_prv.public_key()
 294          self.sig_pub_bytes = self.sig_pub.public_bytes()
 295  
 296          if peer_pub_bytes == None:
 297              self.peer_pub = None
 298              self.peer_pub_bytes = None
 299          else:
 300              self.load_peer(peer_pub_bytes, peer_sig_pub_bytes)
 301  
 302          if established_callback != None:
 303              self.set_link_established_callback(established_callback)
 304  
 305          if closed_callback != None:
 306              self.set_link_closed_callback(closed_callback)
 307  
 308          if self.initiator:
 309              signalling_bytes = b""
 310              nh_hw_mtu = RNS.Transport.next_hop_interface_hw_mtu(destination.hash)
 311              if RNS.Reticulum.link_mtu_discovery() and nh_hw_mtu:
 312                  signalling_bytes = Link.signalling_bytes(nh_hw_mtu, self.mode)
 313                  RNS.log(f"Signalling link MTU of {RNS.prettysize(nh_hw_mtu)} for link", RNS.LOG_DEBUG) # TODO: Remove debug
 314              else: signalling_bytes = Link.signalling_bytes(RNS.Reticulum.MTU, self.mode)
 315              RNS.log(f"Establishing link with mode {Link.MODE_DESCRIPTIONS[self.mode]}", RNS.LOG_DEBUG) # TODO: Remove debug
 316              self.request_data = self.pub_bytes+self.sig_pub_bytes+signalling_bytes
 317              self.packet = RNS.Packet(destination, self.request_data, packet_type=RNS.Packet.LINKREQUEST)
 318              self.packet.pack()
 319              self.establishment_cost += len(self.packet.raw)
 320              self.set_link_id(self.packet)
 321              RNS.Transport.register_link(self)
 322              self.request_time = time.time()
 323              self.start_watchdog()
 324              self.packet.send()
 325              self.had_outbound()
 326              RNS.log("Link request "+RNS.prettyhexrep(self.link_id)+" sent to "+str(self.destination), RNS.LOG_DEBUG)
 327              RNS.log(f"Establishment timeout is {RNS.prettytime(self.establishment_timeout)} for link request "+RNS.prettyhexrep(self.link_id), RNS.LOG_EXTREME)
 328  
 329  
 330      def load_peer(self, peer_pub_bytes, peer_sig_pub_bytes):
 331          self.peer_pub_bytes = peer_pub_bytes
 332          self.peer_pub = X25519PublicKey.from_public_bytes(self.peer_pub_bytes)
 333  
 334          self.peer_sig_pub_bytes = peer_sig_pub_bytes
 335          self.peer_sig_pub = Ed25519PublicKey.from_public_bytes(self.peer_sig_pub_bytes)
 336  
 337          if not hasattr(self.peer_pub, "curve"):
 338              self.peer_pub.curve = Link.CURVE
 339  
 340      @staticmethod
 341      def link_id_from_lr_packet(packet):
 342          hashable_part = packet.get_hashable_part()
 343          if len(packet.data) > Link.ECPUBSIZE:
 344              diff = len(packet.data) - Link.ECPUBSIZE
 345              hashable_part = hashable_part[:-diff]
 346  
 347          return RNS.Identity.truncated_hash(hashable_part)
 348  
 349      def set_link_id(self, packet):
 350          self.link_id = Link.link_id_from_lr_packet(packet)
 351          self.hash = self.link_id
 352  
 353      def handshake(self):
 354          if self.status == Link.PENDING and self.prv != None:
 355              self.status = Link.HANDSHAKE
 356              self.shared_key = self.prv.exchange(self.peer_pub)
 357  
 358              if   self.mode == Link.MODE_AES128_CBC: derived_key_length = 32
 359              elif self.mode == Link.MODE_AES256_CBC: derived_key_length = 64
 360              else: raise TypeError(f"Invalid link mode {self.mode} on {self}")
 361  
 362              self.derived_key = RNS.Cryptography.hkdf(
 363                  length=derived_key_length,
 364                  derive_from=self.shared_key,
 365                  salt=self.get_salt(),
 366                  context=self.get_context())
 367  
 368          else: RNS.log("Handshake attempt on "+str(self)+" with invalid state "+str(self.status), RNS.LOG_ERROR)
 369  
 370  
 371      def prove(self):
 372          signalling_bytes = Link.signalling_bytes(self.mtu, self.mode)
 373          signed_data = self.link_id+self.pub_bytes+self.sig_pub_bytes+signalling_bytes
 374          signature = self.owner.identity.sign(signed_data)
 375  
 376          proof_data = signature+self.pub_bytes+signalling_bytes
 377          proof = RNS.Packet(self, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.LRPROOF)
 378          proof.send()
 379          self.establishment_cost += len(proof.raw)
 380          self.had_outbound()
 381  
 382  
 383      def prove_packet(self, packet):
 384          signature = self.sign(packet.packet_hash)
 385          # TODO: Hardcoded as explicit proof for now
 386          # if RNS.Reticulum.should_use_implicit_proof():
 387          #   proof_data = signature
 388          # else:
 389          #   proof_data = packet.packet_hash + signature
 390          proof_data = packet.packet_hash + signature
 391  
 392          proof = RNS.Packet(self, proof_data, RNS.Packet.PROOF)
 393          proof.send()
 394          self.had_outbound()
 395  
 396      def validate_proof(self, packet):
 397          try:
 398              if self.status == Link.PENDING:
 399                  signalling_bytes = b""
 400                  confirmed_mtu = None
 401                  mode = Link.mode_from_lp_packet(packet)
 402                  RNS.log(f"Validating link request proof with mode {Link.MODE_DESCRIPTIONS[mode]}", RNS.LOG_DEBUG) # TODO: Remove debug
 403                  if mode != self.mode: raise TypeError(f"Invalid link mode {mode} in link request proof")
 404                  if len(packet.data) == RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2+Link.LINK_MTU_SIZE:
 405                      confirmed_mtu = Link.mtu_from_lp_packet(packet)
 406                      signalling_bytes = Link.signalling_bytes(confirmed_mtu, mode)
 407                      packet.data = packet.data[:RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2]
 408                      RNS.log(f"Destination confirmed link MTU of {RNS.prettysize(confirmed_mtu)}", RNS.LOG_DEBUG) # TODO: Remove debug
 409  
 410                  if self.initiator and len(packet.data) == RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2:
 411                      peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2]
 412                      peer_sig_pub_bytes = self.destination.identity.get_public_key()[Link.ECPUBSIZE//2:Link.ECPUBSIZE]
 413                      self.load_peer(peer_pub_bytes, peer_sig_pub_bytes)
 414                      self.handshake()
 415  
 416                      self.establishment_cost += len(packet.raw)
 417                      signed_data = self.link_id+self.peer_pub_bytes+self.peer_sig_pub_bytes+signalling_bytes
 418                      signature = packet.data[:RNS.Identity.SIGLENGTH//8]
 419                      
 420                      if self.destination.identity.validate(signature, signed_data):
 421                          if self.status != Link.HANDSHAKE:
 422                              raise IOError("Invalid link state for proof validation: "+str(self.status))
 423  
 424                          self.rtt = time.time() - self.request_time
 425                          self.attached_interface = packet.receiving_interface
 426                          self.__remote_identity = self.destination.identity
 427                          self.mtu = confirmed_mtu or RNS.Reticulum.MTU
 428                          self.update_mdu()
 429                          self.status = Link.ACTIVE
 430                          self.activated_at = time.time()
 431                          self.last_proof = self.activated_at
 432                          RNS.Transport.activate_link(self)
 433                          RNS.log("Link "+str(self)+" established with "+str(self.destination)+", RTT is "+RNS.prettyshorttime(self.rtt), RNS.LOG_DEBUG)
 434                          
 435                          if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0:
 436                              self.establishment_rate = self.establishment_cost/self.rtt
 437  
 438                          self.__update_keepalive()
 439  
 440                          rtt_data = umsgpack.packb(self.rtt)
 441                          rtt_packet = RNS.Packet(self, rtt_data, context=RNS.Packet.LRRTT)
 442                          rtt_packet.send()
 443                          self.had_outbound()
 444                          self.__update_phy_stats(packet)
 445  
 446                          if self.callbacks.link_established != None:
 447                              thread = threading.Thread(target=self.callbacks.link_established, args=(self,))
 448                              thread.daemon = True
 449                              thread.start()
 450                      else:
 451                          RNS.log("Invalid link proof signature received by "+str(self)+". Ignoring.", RNS.LOG_DEBUG)
 452          
 453          except Exception as e:
 454              self.status = Link.CLOSED
 455              RNS.log("An error ocurred while validating link request proof on "+str(self)+".", RNS.LOG_ERROR)
 456              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
 457  
 458  
 459      def identify(self, identity):
 460          """
 461          Identifies the initiator of the link to the remote peer. This can only happen
 462          once the link has been established, and is carried out over the encrypted link.
 463          The identity is only revealed to the remote peer, and initiator anonymity is
 464          thus preserved. This method can be used for authentication.
 465  
 466          :param identity: An RNS.Identity instance to identify as.
 467          """
 468          if self.initiator and self.status == Link.ACTIVE:
 469              signed_data = self.link_id + identity.get_public_key()
 470              signature = identity.sign(signed_data)
 471              proof_data = identity.get_public_key() + signature
 472  
 473              proof = RNS.Packet(self, proof_data, RNS.Packet.DATA, context = RNS.Packet.LINKIDENTIFY)
 474              proof.send()
 475              self.had_outbound()
 476  
 477  
 478      def request(self, path, data = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None):
 479          """
 480          Sends a request to the remote peer.
 481  
 482          :param path: The request path.
 483          :param response_callback: An optional function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example<example-request>` for more info.
 484          :param failed_callback: An optional function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example<example-request>` for more info.
 485          :param progress_callback: An optional function or method with the signature *progress_callback(request_receipt)* to be called when progress is made receiving the response. Progress can be accessed as a float between 0.0 and 1.0 by the *request_receipt.progress* property.
 486          :param timeout: An optional timeout in seconds for the request. If *None* is supplied it will be calculated based on link RTT.
 487          :returns: A :ref:`RNS.RequestReceipt<api-requestreceipt>` instance if the request was sent, or *False* if it was not.
 488          """
 489          request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8"))
 490          unpacked_request  = [time.time(), request_path_hash, data]
 491          packed_request    = umsgpack.packb(unpacked_request)
 492  
 493          if timeout == None:
 494              timeout = self.rtt * self.traffic_timeout_factor + RNS.Resource.RESPONSE_MAX_GRACE_TIME*1.125
 495  
 496          if len(packed_request) <= self.mdu:
 497              request_packet   = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
 498              packet_receipt   = request_packet.send()
 499  
 500              if packet_receipt == False:
 501                  return False
 502              else:
 503                  packet_receipt.set_timeout(timeout)
 504                  return RequestReceipt(
 505                      self,
 506                      packet_receipt = packet_receipt,
 507                      response_callback = response_callback,
 508                      failed_callback = failed_callback,
 509                      progress_callback = progress_callback,
 510                      timeout = timeout,
 511                      request_size = len(packed_request),
 512                  )
 513              
 514          else:
 515              request_id = RNS.Identity.truncated_hash(packed_request)
 516              RNS.log("Sending request "+RNS.prettyhexrep(request_id)+" as resource.", RNS.LOG_DEBUG)
 517              request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False, timeout = timeout)
 518  
 519              return RequestReceipt(
 520                  self,
 521                  resource = request_resource,
 522                  response_callback = response_callback,
 523                  failed_callback = failed_callback,
 524                  progress_callback = progress_callback,
 525                  timeout = timeout,
 526                  request_size = len(packed_request),
 527              )
 528  
 529  
 530      def update_mdu(self):
 531          self.mdu = self.mtu - RNS.Reticulum.HEADER_MAXSIZE - RNS.Reticulum.IFAC_MIN_SIZE
 532          self.mdu = math.floor((self.mtu-RNS.Reticulum.IFAC_MIN_SIZE-RNS.Reticulum.HEADER_MINSIZE-RNS.Identity.TOKEN_OVERHEAD)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1
 533  
 534      def rtt_packet(self, packet):
 535          try:
 536              measured_rtt = time.time() - self.request_time
 537              plaintext = self.decrypt(packet.data)
 538              if plaintext != None:
 539                  rtt = umsgpack.unpackb(plaintext)
 540                  self.rtt = max(measured_rtt, rtt)
 541                  self.status = Link.ACTIVE
 542                  self.activated_at = time.time()
 543  
 544                  if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0:
 545                      self.establishment_rate = self.establishment_cost/self.rtt
 546  
 547                  self.__update_keepalive()
 548  
 549                  try:
 550                      if self.owner.callbacks.link_established != None:
 551                              self.owner.callbacks.link_established(self)
 552                  except Exception as e:
 553                      RNS.log("Error occurred in external link establishment callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
 554  
 555          except Exception as e:
 556              RNS.log("Error occurred while processing RTT packet, tearing down link. The contained exception was: "+str(e), RNS.LOG_ERROR)
 557              self.teardown()
 558  
 559      def track_phy_stats(self, track):
 560          """
 561          You can enable physical layer statistics on a per-link basis. If this is enabled,
 562          and the link is running over an interface that supports reporting physical layer
 563          statistics, you will be able to retrieve stats such as *RSSI*, *SNR* and physical
 564          *Link Quality* for the link.
 565  
 566          :param track: Whether or not to keep track of physical layer statistics. Value must be ``True`` or ``False``.
 567          """
 568          if track:
 569              self.__track_phy_stats = True
 570          else:
 571              self.__track_phy_stats = False
 572  
 573      def get_rssi(self):
 574          """
 575          :returns: The physical layer *Received Signal Strength Indication* if available, otherwise ``None``. Physical layer statistics must be enabled on the link for this method to return a value.
 576          """
 577          if self.__track_phy_stats:
 578              return self.rssi
 579          else:
 580              return None
 581  
 582      def get_snr(self):
 583          """
 584          :returns: The physical layer *Signal-to-Noise Ratio* if available, otherwise ``None``. Physical layer statistics must be enabled on the link for this method to return a value.
 585          """
 586          if self.__track_phy_stats:
 587              return self.snr
 588          else:
 589              return None
 590  
 591      def get_q(self):
 592          """
 593          :returns: The physical layer *Link Quality* if available, otherwise ``None``. Physical layer statistics must be enabled on the link for this method to return a value.
 594          """
 595          if self.__track_phy_stats:
 596              return self.q
 597          else:
 598              return None
 599  
 600      def get_establishment_rate(self):
 601          """
 602          :returns: The data transfer rate at which the link establishment procedure ocurred, in bits per second.
 603          """
 604          if self.establishment_rate != None:
 605              return self.establishment_rate*8
 606          else:
 607              return None
 608  
 609      def get_mtu(self):
 610          """
 611          :returns: The MTU of an established link.
 612          """
 613          if self.status == Link.ACTIVE:
 614              return self.mtu
 615          else:
 616              return None
 617  
 618      def get_mdu(self):
 619          """
 620          :returns: The packet MDU of an established link.
 621          """
 622          if self.status == Link.ACTIVE:
 623              return self.mdu
 624          else:
 625              return None
 626  
 627      def get_expected_rate(self):
 628          """
 629          :returns: The packet expected in-flight data rate of an established link.
 630          """
 631          if self.status == Link.ACTIVE:
 632              return self.expected_rate
 633          else:
 634              return None
 635  
 636      def get_mode(self):
 637          """
 638          :returns: The mode of an established link.
 639          """
 640          return self.mode
 641  
 642      def get_salt(self):
 643          return self.link_id
 644  
 645      def get_context(self):
 646          return None
 647  
 648      def get_age(self):
 649          """
 650          :returns: The time in seconds since this link was established.
 651          """
 652          if self.activated_at:
 653              return time.time() - self.activated_at
 654          else:
 655              return None
 656  
 657      def no_inbound_for(self):
 658          """
 659          :returns: The time in seconds since last inbound packet on the link. This includes keepalive packets.
 660          """
 661          activated_at = self.activated_at if self.activated_at != None else 0
 662          last_inbound = max(self.last_inbound, activated_at)
 663          return time.time() - last_inbound
 664  
 665      def no_outbound_for(self):
 666          """
 667          :returns: The time in seconds since last outbound packet on the link. This includes keepalive packets.
 668          """
 669          return time.time() - self.last_outbound
 670  
 671      def no_data_for(self):
 672          """
 673          :returns: The time in seconds since payload data traversed the link. This excludes keepalive packets.
 674          """
 675          return time.time() - self.last_data
 676  
 677      def inactive_for(self):
 678          """
 679          :returns: The time in seconds since activity on the link. This includes keepalive packets.
 680          """
 681          return min(self.no_inbound_for(), self.no_outbound_for())
 682  
 683      def get_remote_identity(self):
 684          """
 685          :returns: The identity of the remote peer, if it is known. Calling this method will not query the remote initiator to reveal its identity. Returns ``None`` if the link initiator has not already independently called the ``identify(identity)`` method.
 686          """
 687          return self.__remote_identity
 688  
 689      def had_outbound(self, is_keepalive=False):
 690          self.last_outbound = time.time()
 691          if not is_keepalive: self.last_data = self.last_outbound
 692          else:                self.last_keepalive = self.last_outbound
 693  
 694      def __teardown_packet(self):
 695          teardown_packet = RNS.Packet(self, self.link_id, context=RNS.Packet.LINKCLOSE)
 696          teardown_packet.send()
 697          self.had_outbound()
 698  
 699      def teardown(self):
 700          """
 701          Closes the link and purges encryption keys. New keys will
 702          be used if a new link to the same destination is established.
 703          """
 704          if self.status != Link.PENDING and self.status != Link.CLOSED: self.__teardown_packet()
 705          self.status = Link.CLOSED
 706          if self.initiator: self.teardown_reason = Link.INITIATOR_CLOSED
 707          else: self.teardown_reason = Link.DESTINATION_CLOSED
 708          self.link_closed()
 709  
 710      def teardown_packet(self, packet):
 711          try:
 712              plaintext = self.decrypt(packet.data)
 713              if plaintext == self.link_id:
 714                  self.status = Link.CLOSED
 715                  if self.initiator:
 716                      self.teardown_reason = Link.DESTINATION_CLOSED
 717                  else:
 718                      self.teardown_reason = Link.INITIATOR_CLOSED
 719                  self.__update_phy_stats(packet)
 720                  self.link_closed()
 721          except Exception as e:
 722              pass
 723  
 724      def link_closed(self):
 725          for resource in self.incoming_resources:
 726              resource.cancel()
 727          for resource in self.outgoing_resources:
 728              resource.cancel()
 729          if self._channel:
 730              self._channel._shutdown()
 731              
 732          self.prv = None
 733          self.pub = None
 734          self.pub_bytes = None
 735          self.shared_key = None
 736          self.derived_key = None
 737  
 738          if self.destination != None:
 739              if self.destination.direction == RNS.Destination.IN:
 740                  if self in self.destination.links:
 741                      self.destination.links.remove(self)
 742  
 743          if self.callbacks.link_closed != None:
 744              try:
 745                  self.callbacks.link_closed(self)
 746              except Exception as e:
 747                  RNS.log("Error while executing link closed callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
 748  
 749  
 750      def start_watchdog(self):
 751          thread = threading.Thread(target=self.__watchdog_job)
 752          thread.daemon = True
 753          thread.start()
 754  
 755      def __watchdog_job(self):
 756          while not self.status == Link.CLOSED:
 757              while (self.watchdog_lock):
 758                  rtt_wait = 0.025
 759                  if hasattr(self, "rtt") and self.rtt:
 760                      rtt_wait = self.rtt
 761  
 762                  sleep(max(rtt_wait, 0.025))
 763  
 764              if not self.status == Link.CLOSED:
 765                  # Link was initiated, but no response
 766                  # from destination yet
 767                  if self.status == Link.PENDING:
 768                      next_check = self.request_time + self.establishment_timeout
 769                      sleep_time = next_check - time.time()
 770                      if time.time() >= self.request_time + self.establishment_timeout:
 771                          RNS.log("Link establishment timed out", RNS.LOG_VERBOSE)
 772                          self.status = Link.CLOSED
 773                          self.teardown_reason = Link.TIMEOUT
 774                          self.link_closed()
 775                          sleep_time = 0.001
 776  
 777                  elif self.status == Link.HANDSHAKE:
 778                      next_check = self.request_time + self.establishment_timeout
 779                      sleep_time = next_check - time.time()
 780                      if time.time() >= self.request_time + self.establishment_timeout:
 781                          self.status = Link.CLOSED
 782                          self.teardown_reason = Link.TIMEOUT
 783                          self.link_closed()
 784                          sleep_time = 0.001
 785  
 786                          if self.initiator:
 787                              RNS.log("Timeout waiting for link request proof", RNS.LOG_DEBUG)
 788                          else:
 789                              RNS.log("Timeout waiting for RTT packet from link initiator", RNS.LOG_DEBUG)
 790  
 791                  elif self.status == Link.ACTIVE:
 792                      activated_at = self.activated_at if self.activated_at != None else 0
 793                      last_inbound = max(max(self.last_inbound, self.last_proof), activated_at)
 794                      now = time.time()
 795  
 796                      if now >= last_inbound + self.keepalive:
 797                          if self.initiator and now >= self.last_keepalive + self.keepalive:
 798                              self.send_keepalive()
 799  
 800                          if time.time() >= last_inbound + self.stale_time:
 801                              sleep_time = self.rtt * self.keepalive_timeout_factor + Link.STALE_GRACE
 802                              self.status = Link.STALE
 803                          else:
 804                              sleep_time = self.keepalive
 805                      
 806                      else:
 807                          sleep_time = (last_inbound + self.keepalive) - time.time()
 808  
 809                  elif self.status == Link.STALE:
 810                      sleep_time = 0.001
 811                      self.__teardown_packet()
 812                      self.status = Link.CLOSED
 813                      self.teardown_reason = Link.TIMEOUT
 814                      self.link_closed()
 815  
 816  
 817                  if sleep_time == 0:
 818                      RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_ERROR)
 819                  if sleep_time == None or sleep_time < 0:
 820                      RNS.log("Timing error! Tearing down link "+str(self)+" now.", RNS.LOG_ERROR)
 821                      self.teardown()
 822                      sleep_time = 0.1
 823  
 824                  sleep_time = min(sleep_time, Link.WATCHDOG_MAX_SLEEP)
 825                  sleep(sleep_time)
 826  
 827                  if not self.__track_phy_stats:
 828                      self.rssi = None
 829                      self.snr  = None
 830                      self.q    = None
 831  
 832  
 833      def __update_phy_stats(self, packet, query_shared = True, force_update = False):
 834          if self.__track_phy_stats or force_update:
 835              if query_shared:
 836                  reticulum = RNS.Reticulum.get_instance()
 837                  if packet.rssi == None: packet.rssi = reticulum.get_packet_rssi(packet.packet_hash)
 838                  if packet.snr  == None: packet.snr  = reticulum.get_packet_snr(packet.packet_hash)
 839                  if packet.q    == None: packet.q    = reticulum.get_packet_q(packet.packet_hash)
 840  
 841              if packet.rssi != None:
 842                  self.rssi = packet.rssi
 843              if packet.snr != None:
 844                  self.snr = packet.snr
 845              if packet.q != None:
 846                  self.q = packet.q
 847  
 848      def __update_keepalive(self):
 849          self.keepalive = max(min(self.rtt*(Link.KEEPALIVE_MAX/Link.KEEPALIVE_MAX_RTT), Link.KEEPALIVE_MAX), Link.KEEPALIVE_MIN)
 850          self.stale_time = self.keepalive * Link.STALE_FACTOR
 851      
 852      def send_keepalive(self):
 853          keepalive_packet = RNS.Packet(self, bytes([0xFF]), context=RNS.Packet.KEEPALIVE)
 854          keepalive_packet.send()
 855          self.had_outbound(is_keepalive = True)
 856  
 857      def handle_request(self, request_id, unpacked_request):
 858          if self.status == Link.ACTIVE:
 859              requested_at = unpacked_request[0]
 860              path_hash    = unpacked_request[1]
 861              request_data = unpacked_request[2]
 862  
 863              if path_hash in self.destination.request_handlers:
 864                  request_handler = self.destination.request_handlers[path_hash]
 865                  path               = request_handler[0]
 866                  response_generator = request_handler[1]
 867                  allow              = request_handler[2]
 868                  allowed_list       = request_handler[3]
 869                  auto_compress      = request_handler[4]
 870  
 871                  allowed = False
 872                  if not allow == RNS.Destination.ALLOW_NONE:
 873                      if allow == RNS.Destination.ALLOW_LIST:
 874                          if self.__remote_identity != None and self.__remote_identity.hash in allowed_list:
 875                              allowed = True
 876                      elif allow == RNS.Destination.ALLOW_ALL:
 877                          allowed = True
 878  
 879                  if allowed:
 880                      RNS.log("Handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_DEBUG)
 881                      if len(inspect.signature(response_generator).parameters) == 5:
 882                          response = response_generator(path, request_data, request_id, self.__remote_identity, requested_at)
 883                      elif len(inspect.signature(response_generator).parameters) == 6:
 884                          response = response_generator(path, request_data, request_id, self.link_id, self.__remote_identity, requested_at)
 885                      else:
 886                          raise TypeError("Invalid signature for response generator callback")
 887  
 888                      file_response = False
 889                      file_handle   = None
 890                      if type(response) == list or type(response) == tuple:
 891                          metadata = None
 892                          if len(response) > 0 and type(response[0]) == io.BufferedReader:
 893                              if len(response) > 1: metadata = response[1]
 894                              file_handle = response[0]
 895                              file_response = True
 896  
 897                      if response != None:
 898                          if file_response:
 899                              response_resource = RNS.Resource(file_handle, self, metadata=metadata, request_id = request_id, is_response = True, auto_compress=auto_compress)
 900                          else:
 901                              packed_response = umsgpack.packb([request_id, response])
 902                              if len(packed_response) <= self.mdu:
 903                                  RNS.Packet(self, packed_response, RNS.Packet.DATA, context = RNS.Packet.RESPONSE).send()
 904                              else:
 905                                  response_resource = RNS.Resource(packed_response, self, request_id = request_id, is_response = True, auto_compress=auto_compress)
 906                  else:
 907                      identity_string = str(self.get_remote_identity()) if self.get_remote_identity() != None else "<Unknown>"
 908                      RNS.log("Request "+RNS.prettyhexrep(request_id)+" from "+identity_string+" not allowed for: "+str(path), RNS.LOG_DEBUG)
 909  
 910      def handle_response(self, request_id, response_data, response_size, response_transfer_size, metadata=None):
 911          if self.status == Link.ACTIVE:
 912              remove = None
 913              for pending_request in self.pending_requests:
 914                  if pending_request.request_id == request_id:
 915                      remove = pending_request
 916                      try:
 917                          pending_request.response_size = response_size
 918                          if pending_request.response_transfer_size == None:
 919                              pending_request.response_transfer_size = 0
 920                          pending_request.response_transfer_size += response_transfer_size
 921                          pending_request.response_received(response_data, metadata)
 922                      except Exception as e:
 923                          RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
 924  
 925                      break
 926  
 927              if remove != None:
 928                  if remove in self.pending_requests:
 929                      self.pending_requests.remove(remove)
 930  
 931      def request_resource_concluded(self, resource):
 932          if resource.status == RNS.Resource.COMPLETE:
 933              packed_request    = resource.data.read()
 934              unpacked_request  = umsgpack.unpackb(packed_request)
 935              request_id        = RNS.Identity.truncated_hash(packed_request)
 936              request_data      = unpacked_request
 937  
 938              def job(): self.handle_request(request_id, request_data)
 939              threading.Thread(target=job, daemon=True).start()
 940          else:
 941              RNS.log("Incoming request resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
 942  
 943      def response_resource_concluded(self, resource):
 944          if resource.status == RNS.Resource.COMPLETE:
 945              # If the response resource has metadata, this
 946              # is a file response, and we'll pass the open
 947              # file handle directly.
 948              if resource.has_metadata:
 949                  self.handle_response(resource.request_id, resource.data, resource.total_size, resource.size, metadata=resource.metadata)
 950  
 951              # If not, we'll unpack the response data and
 952              # pass the unpacked structure to the handler
 953              else:
 954                  packed_response   = resource.data.read()
 955                  unpacked_response = umsgpack.unpackb(packed_response)
 956                  request_id        = unpacked_response[0]
 957                  response_data     = unpacked_response[1]
 958                  self.handle_response(request_id, response_data, resource.total_size, resource.size)
 959  
 960          else:
 961              RNS.log("Incoming response resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
 962              for pending_request in self.pending_requests:
 963                  if pending_request.request_id == resource.request_id:
 964                      pending_request.request_timed_out(None)
 965  
 966      def get_channel(self):
 967          """
 968          Get the ``Channel`` for this link.
 969  
 970          :return: ``Channel`` object
 971          """
 972          if self._channel is None:
 973              self._channel = Channel(LinkChannelOutlet(self))
 974          return self._channel
 975  
 976      def receive(self, packet):
 977          self.watchdog_lock = True
 978          if not self.status == Link.CLOSED and not (self.initiator and packet.context == RNS.Packet.KEEPALIVE and packet.data == bytes([0xFF])):
 979              if packet.receiving_interface != self.attached_interface:
 980                  RNS.log(f"Link-associated packet received on unexpected interface {packet.receiving_interface} instead of {self.attached_interface}! Someone might be trying to manipulate your communication!", RNS.LOG_ERROR)
 981              else:
 982                  self.last_inbound = time.time()
 983                  if packet.context != RNS.Packet.KEEPALIVE:
 984                      self.last_data = self.last_inbound
 985                  self.rx += 1
 986                  self.rxbytes += len(packet.data)
 987                  if self.status == Link.STALE:
 988                      self.status = Link.ACTIVE
 989  
 990                  if packet.packet_type == RNS.Packet.DATA:
 991                      should_query = False
 992                      if packet.context == RNS.Packet.NONE:
 993                          plaintext = self.decrypt(packet.data)
 994                          packet.ratchet_id = self.link_id
 995                          if plaintext != None:
 996                              self.__update_phy_stats(packet, query_shared=True)
 997  
 998                              if self.callbacks.packet != None:
 999                                  thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet))
1000                                  thread.daemon = True
1001                                  thread.start()
1002                              
1003                              if self.destination.proof_strategy == RNS.Destination.PROVE_ALL:
1004                                  packet.prove()
1005  
1006                              elif self.destination.proof_strategy == RNS.Destination.PROVE_APP:
1007                                  if self.destination.callbacks.proof_requested:
1008                                      try:
1009                                          if self.destination.callbacks.proof_requested(packet):
1010                                              packet.prove()
1011                                      except Exception as e:
1012                                          RNS.log("Error while executing proof request callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1013  
1014                      elif packet.context == RNS.Packet.LINKIDENTIFY:
1015                          plaintext = self.decrypt(packet.data)
1016                          if plaintext != None:
1017                              if not self.initiator and len(plaintext) == RNS.Identity.KEYSIZE//8 + RNS.Identity.SIGLENGTH//8:
1018                                  public_key   = plaintext[:RNS.Identity.KEYSIZE//8]
1019                                  signed_data  = self.link_id+public_key
1020                                  signature    = plaintext[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.SIGLENGTH//8]
1021                                  identity     = RNS.Identity(create_keys=False)
1022                                  identity.load_public_key(public_key)
1023  
1024                                  if identity.validate(signature, signed_data):
1025                                      self.__remote_identity = identity
1026                                      if self.callbacks.remote_identified != None:
1027                                          try:
1028                                              self.callbacks.remote_identified(self, self.__remote_identity)
1029                                          except Exception as e:
1030                                              RNS.log("Error while executing remote identified callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1031                                  
1032                                      self.__update_phy_stats(packet, query_shared=True)
1033  
1034                      elif packet.context == RNS.Packet.REQUEST:
1035                          try:
1036                              request_id = packet.getTruncatedHash()
1037                              packed_request = self.decrypt(packet.data)
1038                              if packed_request != None:
1039                                  unpacked_request = umsgpack.unpackb(packed_request)
1040                                  def job(): self.handle_request(request_id, unpacked_request)
1041                                  threading.Thread(target=job, daemon=True).start()
1042                                  self.__update_phy_stats(packet, query_shared=True)
1043                          except Exception as e:
1044                              RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR)
1045  
1046                      elif packet.context == RNS.Packet.RESPONSE:
1047                          try:
1048                              packed_response = self.decrypt(packet.data)
1049                              if packed_response != None:
1050                                  unpacked_response = umsgpack.unpackb(packed_response)
1051                                  request_id = unpacked_response[0]
1052                                  response_data = unpacked_response[1]
1053                                  transfer_size = len(umsgpack.packb(response_data))-2
1054                                  def job(): self.handle_response(request_id, response_data, transfer_size, transfer_size)
1055                                  threading.Thread(target=job, daemon=True).start()
1056                                  self.__update_phy_stats(packet, query_shared=True)
1057                          except Exception as e:
1058                              RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
1059  
1060                      elif packet.context == RNS.Packet.LRRTT:
1061                          if not self.initiator:
1062                              self.rtt_packet(packet)
1063                              self.__update_phy_stats(packet, query_shared=True)
1064  
1065                      elif packet.context == RNS.Packet.LINKCLOSE:
1066                          self.teardown_packet(packet)
1067                          self.__update_phy_stats(packet, query_shared=True)
1068  
1069                      elif packet.context == RNS.Packet.RESOURCE_ADV:
1070                          packet.plaintext = self.decrypt(packet.data)
1071                          if packet.plaintext != None:
1072                              self.__update_phy_stats(packet, query_shared=True)
1073  
1074                              if RNS.ResourceAdvertisement.is_request(packet):
1075                                  RNS.Resource.accept(packet, callback=self.request_resource_concluded)
1076                              elif RNS.ResourceAdvertisement.is_response(packet):
1077                                  request_id = RNS.ResourceAdvertisement.read_request_id(packet)
1078                                  for pending_request in self.pending_requests:
1079                                      if pending_request.request_id == request_id:
1080                                          response_resource = RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id)
1081                                          if response_resource != None:
1082                                              if pending_request.response_size == None:
1083                                                  pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet)
1084                                              if pending_request.response_transfer_size == None:
1085                                                  pending_request.response_transfer_size = 0
1086                                              pending_request.response_transfer_size += RNS.ResourceAdvertisement.read_transfer_size(packet)
1087                                              if pending_request.started_at == None:
1088                                                  pending_request.started_at = time.time()
1089                                              pending_request.response_resource_progress(response_resource)
1090  
1091                              elif self.resource_strategy == Link.ACCEPT_NONE: pass
1092                              elif self.resource_strategy == Link.ACCEPT_APP:
1093                                  if self.callbacks.resource != None:
1094                                      try:
1095                                          resource_advertisement = RNS.ResourceAdvertisement.unpack(packet.plaintext)
1096                                          resource_advertisement.link = self
1097                                          if self.callbacks.resource(resource_advertisement): RNS.Resource.accept(packet, self.callbacks.resource_concluded)
1098                                          else:                                               RNS.Resource.reject(packet)
1099                                      except Exception as e:
1100                                          RNS.log("Error while executing resource accept callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1101                              elif self.resource_strategy == Link.ACCEPT_ALL:
1102                                  RNS.Resource.accept(packet, self.callbacks.resource_concluded)
1103  
1104                      elif packet.context == RNS.Packet.RESOURCE_REQ:
1105                          plaintext = self.decrypt(packet.data)
1106                          if plaintext != None:
1107                              self.__update_phy_stats(packet, query_shared=True)
1108                              if ord(plaintext[:1]) == RNS.Resource.HASHMAP_IS_EXHAUSTED:
1109                                  resource_hash = plaintext[1+RNS.Resource.MAPHASH_LEN:RNS.Identity.HASHLENGTH//8+1+RNS.Resource.MAPHASH_LEN]
1110                              else:
1111                                  resource_hash = plaintext[1:RNS.Identity.HASHLENGTH//8+1]
1112  
1113                              for resource in self.outgoing_resources:
1114                                  if resource.hash == resource_hash:
1115                                      # We need to check that this request has not been
1116                                      # received before in order to avoid sequencing errors.
1117                                      if not packet.packet_hash in resource.req_hashlist:
1118                                          resource.req_hashlist.append(packet.packet_hash)
1119                                          resource.request(plaintext)
1120                                          
1121                                          # TODO: Test and possibly enable this at some point
1122                                          # def request_job():
1123                                          #     resource.request(plaintext)
1124                                          # threading.Thread(target=request_job, daemon=True).start()
1125  
1126                      elif packet.context == RNS.Packet.RESOURCE_HMU:
1127                          plaintext = self.decrypt(packet.data)
1128                          if plaintext != None:
1129                              self.__update_phy_stats(packet, query_shared=True)
1130                              resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
1131                              for resource in self.incoming_resources:
1132                                  if resource_hash == resource.hash:
1133                                      resource.hashmap_update_packet(plaintext)
1134  
1135                      elif packet.context == RNS.Packet.RESOURCE_ICL:
1136                          plaintext = self.decrypt(packet.data)
1137                          if plaintext != None:
1138                              self.__update_phy_stats(packet)
1139                              resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
1140                              for resource in self.incoming_resources:
1141                                  if resource_hash == resource.hash:
1142                                      resource.cancel()
1143  
1144                      elif packet.context == RNS.Packet.RESOURCE_RCL:
1145                          plaintext = self.decrypt(packet.data)
1146                          if plaintext != None:
1147                              self.__update_phy_stats(packet)
1148                              resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
1149                              for resource in self.outgoing_resources:
1150                                  if resource_hash == resource.hash:
1151                                      resource._rejected()
1152  
1153                      elif packet.context == RNS.Packet.KEEPALIVE:
1154                          if not self.initiator and packet.data == bytes([0xFF]):
1155                              keepalive_packet = RNS.Packet(self, bytes([0xFE]), context=RNS.Packet.KEEPALIVE)
1156                              keepalive_packet.send()
1157                              self.had_outbound(is_keepalive = True)
1158  
1159  
1160                      # TODO: find the most efficient way to allow multiple
1161                      # transfers at the same time, sending resource hash on
1162                      # each packet is a huge overhead. Probably some kind
1163                      # of hash -> sequence map
1164                      elif packet.context == RNS.Packet.RESOURCE:
1165                          for resource in self.incoming_resources:
1166                              resource.receive_part(packet)
1167                              self.__update_phy_stats(packet)
1168  
1169                      elif packet.context == RNS.Packet.CHANNEL:
1170                          if not self._channel:
1171                              RNS.log(f"Channel data received without open channel", RNS.LOG_DEBUG)
1172                          else:
1173                              packet.prove()
1174                              plaintext = self.decrypt(packet.data)
1175                              if plaintext != None:
1176                                  self.__update_phy_stats(packet)
1177                                  self._channel._receive(plaintext)
1178  
1179                  elif packet.packet_type == RNS.Packet.PROOF:
1180                      if packet.context == RNS.Packet.RESOURCE_PRF:
1181                          resource_hash = packet.data[0:RNS.Identity.HASHLENGTH//8]
1182                          for resource in self.outgoing_resources:
1183                              if resource_hash == resource.hash:
1184                                  def job(): resource.validate_proof(packet.data)
1185                                  threading.Thread(target=job, daemon=True).start()
1186                                  self.__update_phy_stats(packet, query_shared=True)
1187  
1188          self.watchdog_lock = False
1189  
1190  
1191      def encrypt(self, plaintext):
1192          try:
1193              if not self.token:
1194                  try: self.token = Token(self.derived_key)
1195                  except Exception as e:
1196                      RNS.log("Could not instantiate token while performing encryption on link "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1197                      raise e
1198  
1199              return self.token.encrypt(plaintext)
1200  
1201          except Exception as e:
1202              RNS.log("Encryption on link "+str(self)+" failed. The contained exception was: "+str(e), RNS.LOG_ERROR)
1203              raise e
1204  
1205  
1206      def decrypt(self, ciphertext):
1207          try:
1208              if not self.token: self.token = Token(self.derived_key)
1209              return self.token.decrypt(ciphertext)
1210  
1211          except Exception as e:
1212              RNS.log("Decryption failed on link "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1213              return None
1214  
1215  
1216      def sign(self, message):
1217          return self.sig_prv.sign(message)
1218  
1219      def validate(self, signature, message):
1220          try:
1221              self.peer_sig_pub.verify(signature, message)
1222              return True
1223          except Exception as e:
1224              return False
1225  
1226      def set_link_established_callback(self, callback):
1227          self.callbacks.link_established = callback
1228  
1229      def set_link_closed_callback(self, callback):
1230          """
1231          Registers a function to be called when a link has been
1232          torn down.
1233  
1234          :param callback: A function or method with the signature *callback(link)* to be called.
1235          """
1236          self.callbacks.link_closed = callback
1237  
1238      def set_packet_callback(self, callback):
1239          """
1240          Registers a function to be called when a packet has been
1241          received over this link.
1242  
1243          :param callback: A function or method with the signature *callback(message, packet)* to be called.
1244          """
1245          self.callbacks.packet = callback
1246  
1247      def set_resource_callback(self, callback):
1248          """
1249          Registers a function to be called when a resource has been
1250          advertised over this link. If the function returns *True*
1251          the resource will be accepted. If it returns *False* it will
1252          be ignored.
1253  
1254          :param callback: A function or method with the signature *callback(resource)* to be called. Please note that only the basic information of the resource is available at this time, such as *get_transfer_size()*, *get_data_size()*, *get_parts()* and *is_compressed()*.
1255          """
1256          self.callbacks.resource = callback
1257  
1258      def set_resource_started_callback(self, callback):
1259          """
1260          Registers a function to be called when a resource has begun
1261          transferring over this link.
1262  
1263          :param callback: A function or method with the signature *callback(resource)* to be called.
1264          """
1265          self.callbacks.resource_started = callback
1266  
1267      def set_resource_concluded_callback(self, callback):
1268          """
1269          Registers a function to be called when a resource has concluded
1270          transferring over this link.
1271  
1272          :param callback: A function or method with the signature *callback(resource)* to be called.
1273          """
1274          self.callbacks.resource_concluded = callback
1275  
1276      def set_remote_identified_callback(self, callback):
1277          """
1278          Registers a function to be called when an initiating peer has
1279          identified over this link.
1280  
1281          :param callback: A function or method with the signature *callback(link, identity)* to be called.
1282          """
1283          self.callbacks.remote_identified = callback
1284  
1285      def resource_concluded(self, resource):
1286          concluded_at = time.time()
1287          if resource in self.incoming_resources:
1288              self.last_resource_window = resource.window
1289              self.last_resource_eifr = resource.eifr
1290              self.incoming_resources.remove(resource)
1291              self.expected_rate = (resource.size*8)/(max(concluded_at-resource.started_transferring, 0.0001))
1292          if resource in self.outgoing_resources:
1293              self.outgoing_resources.remove(resource)
1294              self.expected_rate = (resource.size*8)/(max(concluded_at-resource.started_transferring, 0.0001))
1295  
1296      def set_resource_strategy(self, resource_strategy):
1297          """
1298          Sets the resource strategy for the link.
1299  
1300          :param resource_strategy: One of ``RNS.Link.ACCEPT_NONE``, ``RNS.Link.ACCEPT_ALL`` or ``RNS.Link.ACCEPT_APP``. If ``RNS.Link.ACCEPT_APP`` is set, the `resource_callback` will be called to determine whether the resource should be accepted or not.
1301          :raises: *TypeError* if the resource strategy is unsupported.
1302          """
1303          if not resource_strategy in Link.resource_strategies:
1304              raise TypeError("Unsupported resource strategy")
1305          else:
1306              self.resource_strategy = resource_strategy
1307  
1308      def register_outgoing_resource(self, resource):
1309          self.outgoing_resources.append(resource)
1310  
1311      def register_incoming_resource(self, resource):
1312          self.incoming_resources.append(resource)
1313  
1314      def has_incoming_resource(self, resource):
1315          for incoming_resource in self.incoming_resources:
1316              if incoming_resource.hash == resource.hash:
1317                  return True
1318  
1319          return False
1320  
1321      def get_last_resource_window(self):
1322          return self.last_resource_window
1323  
1324      def get_last_resource_eifr(self):
1325          return self.last_resource_eifr
1326  
1327      def cancel_outgoing_resource(self, resource):
1328          if resource in self.outgoing_resources:
1329              self.outgoing_resources.remove(resource)
1330          else:
1331              RNS.log("Attempt to cancel a non-existing outgoing resource", RNS.LOG_ERROR)
1332  
1333      def cancel_incoming_resource(self, resource):
1334          if resource in self.incoming_resources:
1335              self.incoming_resources.remove(resource)
1336          else:
1337              RNS.log("Attempt to cancel a non-existing incoming resource", RNS.LOG_ERROR)
1338  
1339      def ready_for_new_resource(self):
1340          if len(self.outgoing_resources) > 0:
1341              return False
1342          else:
1343              return True
1344  
1345      def __str__(self):
1346          return RNS.prettyhexrep(self.link_id)
1347  
1348  
1349  class RequestReceipt():
1350      """
1351      An instance of this class is returned by the ``request`` method of ``RNS.Link``
1352      instances. It should never be instantiated manually. It provides methods to
1353      check status, response time and response data when the request concludes.
1354      """
1355  
1356      FAILED    = 0x00
1357      SENT      = 0x01
1358      DELIVERED = 0x02
1359      RECEIVING = 0x03
1360      READY     = 0x04
1361  
1362      def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None, request_size = None):
1363          self.packet_receipt = packet_receipt
1364          self.resource = resource
1365          self.started_at = None
1366  
1367          if self.packet_receipt != None:
1368              self.hash = packet_receipt.truncated_hash
1369              self.packet_receipt.set_timeout_callback(self.request_timed_out)
1370              self.started_at = time.time()
1371  
1372          elif self.resource != None:
1373              self.hash = resource.request_id
1374              resource.set_callback(self.request_resource_concluded)
1375          
1376          self.link                   = link
1377          self.request_id             = self.hash
1378          self.request_size           = request_size
1379  
1380          self.response               = None
1381          self.response_transfer_size = None
1382          self.response_size          = None
1383          self.metadata               = None
1384          self.status                 = RequestReceipt.SENT
1385          self.sent_at                = time.time()
1386          self.progress               = 0
1387          self.concluded_at           = None
1388          self.response_concluded_at  = None
1389  
1390          if timeout != None:
1391              self.timeout        = timeout
1392          else:
1393              raise ValueError("No timeout specified for request receipt")
1394  
1395          self.callbacks          = RequestReceiptCallbacks()
1396          self.callbacks.response = response_callback
1397          self.callbacks.failed   = failed_callback
1398          self.callbacks.progress = progress_callback
1399  
1400          self.link.pending_requests.append(self)
1401  
1402  
1403      def request_resource_concluded(self, resource):
1404          if resource.status == RNS.Resource.COMPLETE:
1405              RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG)
1406              if self.started_at == None:
1407                  self.started_at = time.time()
1408              self.status = RequestReceipt.DELIVERED
1409              self.__resource_response_timeout = time.time()+self.timeout
1410              response_timeout_thread = threading.Thread(target=self.__response_timeout_job)
1411              response_timeout_thread.daemon = True
1412              response_timeout_thread.start()
1413          else:
1414              RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
1415              self.status = RequestReceipt.FAILED
1416              self.concluded_at = time.time()
1417              self.link.pending_requests.remove(self)
1418  
1419              if self.callbacks.failed != None:
1420                  try:
1421                      self.callbacks.failed(self)
1422                  except Exception as e:
1423                      RNS.log("Error while executing request failed callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1424  
1425  
1426      def __response_timeout_job(self):
1427          while self.status == RequestReceipt.DELIVERED:
1428              now = time.time()
1429              if now > self.__resource_response_timeout:
1430                  self.request_timed_out(None)
1431                  break
1432  
1433              time.sleep(0.1)
1434  
1435  
1436      def request_timed_out(self, packet_receipt):
1437          if self in self.link.pending_requests and self.status == RequestReceipt.DELIVERED:
1438              self.status = RequestReceipt.FAILED
1439              self.concluded_at = time.time()
1440              self.link.pending_requests.remove(self)
1441  
1442              if self.callbacks.failed != None:
1443                  try: self.callbacks.failed(self)
1444                  except Exception as e:
1445                      RNS.log("Error while executing request timed out callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1446  
1447  
1448      def response_resource_progress(self, resource):
1449          if resource != None:
1450              if not self.status == RequestReceipt.FAILED:
1451                  self.status = RequestReceipt.RECEIVING
1452                  if self.packet_receipt != None:
1453                      if self.packet_receipt.status != RNS.PacketReceipt.DELIVERED:
1454                          self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
1455                          self.packet_receipt.proved = True
1456                          self.packet_receipt.concluded_at = time.time()
1457                          if self.packet_receipt.callbacks.delivery != None:
1458                              self.packet_receipt.callbacks.delivery(self.packet_receipt)
1459  
1460                  self.progress = resource.get_progress()
1461                  
1462                  if self.callbacks.progress != None:
1463                      try:
1464                          self.callbacks.progress(self)
1465                      except Exception as e:
1466                          RNS.log("Error while executing response progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1467              else:
1468                  resource.cancel()
1469  
1470      
1471      def response_received(self, response, metadata=None):
1472          if not self.status == RequestReceipt.FAILED:
1473              self.progress = 1.0
1474              self.response = response
1475              self.metadata = metadata
1476              self.status = RequestReceipt.READY
1477              self.response_concluded_at = time.time()
1478  
1479              if self.packet_receipt != None:
1480                  self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
1481                  self.packet_receipt.proved = True
1482                  self.packet_receipt.concluded_at = time.time()
1483                  if self.packet_receipt.callbacks.delivery != None:
1484                      self.packet_receipt.callbacks.delivery(self.packet_receipt)
1485  
1486              if self.callbacks.progress != None:
1487                  try: self.callbacks.progress(self)
1488                  except Exception as e:
1489                      RNS.log("Error while executing response progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1490  
1491              if self.callbacks.response != None:
1492                  try: self.callbacks.response(self)
1493                  except Exception as e:
1494                      RNS.log("Error while executing response received callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1495  
1496      def get_request_id(self):
1497          """
1498          :returns: The request ID as *bytes*.
1499          """
1500          return self.request_id
1501  
1502      def get_status(self):
1503          """
1504          :returns: The current status of the request, one of ``RNS.RequestReceipt.FAILED``, ``RNS.RequestReceipt.SENT``, ``RNS.RequestReceipt.DELIVERED``, ``RNS.RequestReceipt.READY``.
1505          """
1506          return self.status
1507  
1508      def get_progress(self):
1509          """
1510          :returns: The progress of a response being received as a *float* between 0.0 and 1.0.
1511          """
1512          return self.progress
1513  
1514      def get_response(self):
1515          """
1516          :returns: The response as *bytes* if it is ready, otherwise *None*.
1517          """
1518          if self.status == RequestReceipt.READY:
1519              return self.response
1520          else:
1521              return None
1522  
1523      def get_response_time(self):
1524          """
1525          :returns: The response time of the request in seconds.
1526          """
1527          if self.status == RequestReceipt.READY:
1528              return self.response_concluded_at - self.started_at
1529          else:
1530              return None
1531  
1532      def concluded(self):
1533          """
1534          :returns: True if the associated request has concluded (successfully or with a failure), otherwise False.
1535          """
1536          if self.status == RequestReceipt.READY:
1537              return True
1538          elif self.status == RequestReceipt.FAILED:
1539              return True
1540          else:
1541              return False
1542  
1543  
1544  
1545  class RequestReceiptCallbacks:
1546      def __init__(self):
1547          self.response = None
1548          self.failed   = None
1549          self.progress = None