Link.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 from RNS.Cryptography import X25519PrivateKey, X25519PublicKey, Ed25519PrivateKey, Ed25519PublicKey 32 from RNS.Cryptography import Token 33 from RNS.Channel import Channel, LinkChannelOutlet 34 35 from time import sleep 36 from .vendor import umsgpack as umsgpack 37 import threading 38 import inspect 39 import struct 40 import math 41 import time 42 import RNS 43 import io 44 45 class LinkCallbacks: 46 def __init__(self): 47 self.link_established = None 48 self.link_closed = None 49 self.packet = None 50 self.resource = None 51 self.resource_started = None 52 self.resource_concluded = None 53 self.remote_identified = None 54 55 class Link: 56 """ 57 This class is used to establish and manage links to other peers. When a 58 link instance is created, Reticulum will attempt to establish verified 59 and encrypted connectivity with the specified destination. 60 61 :param destination: A :ref:`RNS.Destination<api-destination>` instance which to establish a link to. 62 :param established_callback: An optional function or method with the signature *callback(link)* to be called when the link has been established. 63 :param closed_callback: An optional function or method with the signature *callback(link)* to be called when the link is closed. 64 """ 65 CURVE = RNS.Identity.CURVE 66 """ 67 The curve used for Elliptic Curve DH key exchanges 68 """ 69 70 ECPUBSIZE = 32+32 71 KEYSIZE = 32 72 73 MDU = math.floor((RNS.Reticulum.MTU-RNS.Reticulum.IFAC_MIN_SIZE-RNS.Reticulum.HEADER_MINSIZE-RNS.Identity.TOKEN_OVERHEAD)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1 74 75 ESTABLISHMENT_TIMEOUT_PER_HOP = RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT 76 """ 77 Timeout for link establishment in seconds per hop to destination. 78 """ 79 80 LINK_MTU_SIZE = 3 81 TRAFFIC_TIMEOUT_MIN_MS = 5 82 TRAFFIC_TIMEOUT_FACTOR = 6 83 KEEPALIVE_MAX_RTT = 1.75 84 KEEPALIVE_TIMEOUT_FACTOR = 4 85 """ 86 RTT timeout factor used in link timeout calculation. 87 """ 88 STALE_GRACE = 5 89 """ 90 Grace period in seconds used in link timeout calculation. 91 """ 92 KEEPALIVE_MAX = 360 93 KEEPALIVE_MIN = 5 94 KEEPALIVE = KEEPALIVE_MAX 95 """ 96 Default interval for sending keep-alive packets on established links in seconds. 97 """ 98 STALE_FACTOR = 2 99 STALE_TIME = STALE_FACTOR*KEEPALIVE 100 """ 101 If no traffic or keep-alive packets are received within this period, the 102 link will be marked as stale, and a final keep-alive packet will be sent. 103 If after this no traffic or keep-alive packets are received within ``RTT`` * 104 ``KEEPALIVE_TIMEOUT_FACTOR`` + ``STALE_GRACE``, the link is considered timed out, 105 and will be torn down. 106 """ 107 108 WATCHDOG_MAX_SLEEP = 5 109 110 PENDING = 0x00 111 HANDSHAKE = 0x01 112 ACTIVE = 0x02 113 STALE = 0x03 114 CLOSED = 0x04 115 116 TIMEOUT = 0x01 117 INITIATOR_CLOSED = 0x02 118 DESTINATION_CLOSED = 0x03 119 120 ACCEPT_NONE = 0x00 121 ACCEPT_APP = 0x01 122 ACCEPT_ALL = 0x02 123 resource_strategies = [ACCEPT_NONE, ACCEPT_APP, ACCEPT_ALL] 124 125 MODE_AES128_CBC = 0x00 126 MODE_AES256_CBC = 0x01 127 MODE_AES256_GCM = 0x02 128 MODE_OTP_RESERVED = 0x03 129 MODE_PQ_RESERVED_1 = 0x04 130 MODE_PQ_RESERVED_2 = 0x05 131 MODE_PQ_RESERVED_3 = 0x06 132 MODE_PQ_RESERVED_4 = 0x07 133 ENABLED_MODES = [MODE_AES256_CBC] 134 MODE_DEFAULT = MODE_AES256_CBC 135 MODE_DESCRIPTIONS = {MODE_AES128_CBC: "AES_128_CBC", 136 MODE_AES256_CBC: "AES_256_CBC", 137 MODE_AES256_GCM: "MODE_AES256_GCM", 138 MODE_OTP_RESERVED: "MODE_OTP_RESERVED", 139 MODE_PQ_RESERVED_1: "MODE_PQ_RESERVED_1", 140 MODE_PQ_RESERVED_2: "MODE_PQ_RESERVED_2", 141 MODE_PQ_RESERVED_3: "MODE_PQ_RESERVED_3", 142 MODE_PQ_RESERVED_4: "MODE_PQ_RESERVED_4"} 143 144 MTU_BYTEMASK = 0x1FFFFF 145 MODE_BYTEMASK = 0xE0 146 147 @staticmethod 148 def signalling_bytes(mtu, mode): 149 if not mode in Link.ENABLED_MODES: raise TypeError(f"Requested link mode {Link.MODE_DESCRIPTIONS[mode]} not enabled") 150 signalling_value = (mtu & Link.MTU_BYTEMASK)+(((mode<<5) & Link.MODE_BYTEMASK)<<16) 151 return struct.pack(">I", signalling_value)[1:] 152 153 @staticmethod 154 def mtu_from_lr_packet(packet): 155 if len(packet.data) == Link.ECPUBSIZE+Link.LINK_MTU_SIZE: 156 return (packet.data[Link.ECPUBSIZE] << 16) + (packet.data[Link.ECPUBSIZE+1] << 8) + (packet.data[Link.ECPUBSIZE+2]) & Link.MTU_BYTEMASK 157 else: return None 158 159 @staticmethod 160 def mtu_from_lp_packet(packet): 161 if len(packet.data) == RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2+Link.LINK_MTU_SIZE: 162 mtu_bytes = packet.data[RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2:RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2+Link.LINK_MTU_SIZE] 163 return (mtu_bytes[0] << 16) + (mtu_bytes[1] << 8) + (mtu_bytes[2]) & Link.MTU_BYTEMASK 164 else: return None 165 166 @staticmethod 167 def mode_byte(mode): 168 if mode in Link.ENABLED_MODES: return (mode << 5) & Link.MODE_BYTEMASK 169 else: raise TypeError(f"Requested link mode {mode} not enabled") 170 171 @staticmethod 172 def mode_from_lr_packet(packet): 173 if len(packet.data) > Link.ECPUBSIZE: 174 mode = (packet.data[Link.ECPUBSIZE] & Link.MODE_BYTEMASK) >> 5 175 return mode 176 else: return Link.MODE_DEFAULT 177 178 @staticmethod 179 def mode_from_lp_packet(packet): 180 if len(packet.data) > RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2: 181 mode = packet.data[RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2] >> 5 182 return mode 183 else: return Link.MODE_DEFAULT 184 185 @staticmethod 186 def validate_request(owner, data, packet): 187 if len(data) == Link.ECPUBSIZE or len(data) == Link.ECPUBSIZE+Link.LINK_MTU_SIZE: 188 try: 189 link = Link(owner = owner, peer_pub_bytes=data[:Link.ECPUBSIZE//2], peer_sig_pub_bytes=data[Link.ECPUBSIZE//2:Link.ECPUBSIZE]) 190 link.set_link_id(packet) 191 192 if len(data) == Link.ECPUBSIZE+Link.LINK_MTU_SIZE: 193 RNS.log("Link request includes MTU signalling", RNS.LOG_DEBUG) # TODO: Remove debug 194 try: 195 link.mtu = Link.mtu_from_lr_packet(packet) or Reticulum.MTU 196 except Exception as e: 197 RNS.trace_exception(e) 198 link.mtu = RNS.Reticulum.MTU 199 200 link.mode = Link.mode_from_lr_packet(packet) 201 202 # TODO: Remove debug 203 RNS.log(f"Incoming link request with mode {Link.MODE_DESCRIPTIONS[link.mode]}", RNS.LOG_DEBUG) 204 205 link.update_mdu() 206 link.destination = packet.destination 207 link.establishment_timeout = Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, packet.hops) + Link.KEEPALIVE 208 link.establishment_cost += len(packet.raw) 209 RNS.log(f"Validating link request {RNS.prettyhexrep(link.link_id)}", RNS.LOG_DEBUG) 210 RNS.log(f"Link MTU configured to {RNS.prettysize(link.mtu)}", RNS.LOG_EXTREME) 211 RNS.log(f"Establishment timeout is {RNS.prettytime(link.establishment_timeout)} for incoming link request "+RNS.prettyhexrep(link.link_id), RNS.LOG_EXTREME) 212 link.handshake() 213 link.attached_interface = packet.receiving_interface 214 link.prove() 215 link.request_time = time.time() 216 RNS.Transport.register_link(link) 217 link.last_inbound = time.time() 218 link.__update_phy_stats(packet, force_update=True) 219 link.start_watchdog() 220 221 RNS.log("Incoming link request "+str(link)+" accepted on "+str(link.attached_interface), RNS.LOG_DEBUG) 222 return link 223 224 except Exception as e: 225 RNS.log(f"Validating link request failed: {e}", RNS.LOG_VERBOSE) 226 return None 227 228 else: 229 RNS.log(f"Invalid link request payload size of {len(data)} bytes, dropping request", RNS.LOG_DEBUG) 230 return None 231 232 233 def __init__(self, destination=None, established_callback=None, closed_callback=None, owner=None, peer_pub_bytes=None, peer_sig_pub_bytes=None, mode=MODE_DEFAULT): 234 if destination != None and destination.type != RNS.Destination.SINGLE: raise TypeError("Links can only be established to the \"single\" destination type") 235 self.mode = mode 236 self.rtt = None 237 self.mtu = RNS.Reticulum.MTU 238 self.establishment_cost = 0 239 self.establishment_rate = None 240 self.expected_rate = None 241 self.callbacks = LinkCallbacks() 242 self.resource_strategy = Link.ACCEPT_NONE 243 self.last_resource_window = None 244 self.last_resource_eifr = None 245 self.outgoing_resources = [] 246 self.incoming_resources = [] 247 self.pending_requests = [] 248 self.last_inbound = 0 249 self.last_outbound = 0 250 self.last_keepalive = 0 251 self.last_proof = 0 252 self.last_data = 0 253 self.tx = 0 254 self.rx = 0 255 self.txbytes = 0 256 self.rxbytes = 0 257 self.rssi = None 258 self.snr = None 259 self.q = None 260 self.traffic_timeout_factor = Link.TRAFFIC_TIMEOUT_FACTOR 261 self.keepalive_timeout_factor = Link.KEEPALIVE_TIMEOUT_FACTOR 262 self.keepalive = Link.KEEPALIVE 263 self.stale_time = Link.STALE_TIME 264 self.watchdog_lock = False 265 self.status = Link.PENDING 266 self.activated_at = None 267 self.type = RNS.Destination.LINK 268 self.owner = owner 269 self.destination = destination 270 self.expected_hops = None 271 self.attached_interface = None 272 self.__remote_identity = None 273 self.__track_phy_stats = False 274 self._channel = None 275 276 if self.destination == None: 277 self.initiator = False 278 self.prv = X25519PrivateKey.generate() 279 self.sig_prv = self.owner.identity.sig_prv 280 else: 281 self.initiator = True 282 self.expected_hops = RNS.Transport.hops_to(self.destination.hash) 283 self.establishment_timeout = RNS.Reticulum.get_instance().get_first_hop_timeout(destination.hash) 284 self.establishment_timeout += Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, RNS.Transport.hops_to(destination.hash)) 285 self.prv = X25519PrivateKey.generate() 286 self.sig_prv = Ed25519PrivateKey.generate() 287 288 self.token = None 289 290 self.pub = self.prv.public_key() 291 self.pub_bytes = self.pub.public_bytes() 292 293 self.sig_pub = self.sig_prv.public_key() 294 self.sig_pub_bytes = self.sig_pub.public_bytes() 295 296 if peer_pub_bytes == None: 297 self.peer_pub = None 298 self.peer_pub_bytes = None 299 else: 300 self.load_peer(peer_pub_bytes, peer_sig_pub_bytes) 301 302 if established_callback != None: 303 self.set_link_established_callback(established_callback) 304 305 if closed_callback != None: 306 self.set_link_closed_callback(closed_callback) 307 308 if self.initiator: 309 signalling_bytes = b"" 310 nh_hw_mtu = RNS.Transport.next_hop_interface_hw_mtu(destination.hash) 311 if RNS.Reticulum.link_mtu_discovery() and nh_hw_mtu: 312 signalling_bytes = Link.signalling_bytes(nh_hw_mtu, self.mode) 313 RNS.log(f"Signalling link MTU of {RNS.prettysize(nh_hw_mtu)} for link", RNS.LOG_DEBUG) # TODO: Remove debug 314 else: signalling_bytes = Link.signalling_bytes(RNS.Reticulum.MTU, self.mode) 315 RNS.log(f"Establishing link with mode {Link.MODE_DESCRIPTIONS[self.mode]}", RNS.LOG_DEBUG) # TODO: Remove debug 316 self.request_data = self.pub_bytes+self.sig_pub_bytes+signalling_bytes 317 self.packet = RNS.Packet(destination, self.request_data, packet_type=RNS.Packet.LINKREQUEST) 318 self.packet.pack() 319 self.establishment_cost += len(self.packet.raw) 320 self.set_link_id(self.packet) 321 RNS.Transport.register_link(self) 322 self.request_time = time.time() 323 self.start_watchdog() 324 self.packet.send() 325 self.had_outbound() 326 RNS.log("Link request "+RNS.prettyhexrep(self.link_id)+" sent to "+str(self.destination), RNS.LOG_DEBUG) 327 RNS.log(f"Establishment timeout is {RNS.prettytime(self.establishment_timeout)} for link request "+RNS.prettyhexrep(self.link_id), RNS.LOG_EXTREME) 328 329 330 def load_peer(self, peer_pub_bytes, peer_sig_pub_bytes): 331 self.peer_pub_bytes = peer_pub_bytes 332 self.peer_pub = X25519PublicKey.from_public_bytes(self.peer_pub_bytes) 333 334 self.peer_sig_pub_bytes = peer_sig_pub_bytes 335 self.peer_sig_pub = Ed25519PublicKey.from_public_bytes(self.peer_sig_pub_bytes) 336 337 if not hasattr(self.peer_pub, "curve"): 338 self.peer_pub.curve = Link.CURVE 339 340 @staticmethod 341 def link_id_from_lr_packet(packet): 342 hashable_part = packet.get_hashable_part() 343 if len(packet.data) > Link.ECPUBSIZE: 344 diff = len(packet.data) - Link.ECPUBSIZE 345 hashable_part = hashable_part[:-diff] 346 347 return RNS.Identity.truncated_hash(hashable_part) 348 349 def set_link_id(self, packet): 350 self.link_id = Link.link_id_from_lr_packet(packet) 351 self.hash = self.link_id 352 353 def handshake(self): 354 if self.status == Link.PENDING and self.prv != None: 355 self.status = Link.HANDSHAKE 356 self.shared_key = self.prv.exchange(self.peer_pub) 357 358 if self.mode == Link.MODE_AES128_CBC: derived_key_length = 32 359 elif self.mode == Link.MODE_AES256_CBC: derived_key_length = 64 360 else: raise TypeError(f"Invalid link mode {self.mode} on {self}") 361 362 self.derived_key = RNS.Cryptography.hkdf( 363 length=derived_key_length, 364 derive_from=self.shared_key, 365 salt=self.get_salt(), 366 context=self.get_context()) 367 368 else: RNS.log("Handshake attempt on "+str(self)+" with invalid state "+str(self.status), RNS.LOG_ERROR) 369 370 371 def prove(self): 372 signalling_bytes = Link.signalling_bytes(self.mtu, self.mode) 373 signed_data = self.link_id+self.pub_bytes+self.sig_pub_bytes+signalling_bytes 374 signature = self.owner.identity.sign(signed_data) 375 376 proof_data = signature+self.pub_bytes+signalling_bytes 377 proof = RNS.Packet(self, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.LRPROOF) 378 proof.send() 379 self.establishment_cost += len(proof.raw) 380 self.had_outbound() 381 382 383 def prove_packet(self, packet): 384 signature = self.sign(packet.packet_hash) 385 # TODO: Hardcoded as explicit proof for now 386 # if RNS.Reticulum.should_use_implicit_proof(): 387 # proof_data = signature 388 # else: 389 # proof_data = packet.packet_hash + signature 390 proof_data = packet.packet_hash + signature 391 392 proof = RNS.Packet(self, proof_data, RNS.Packet.PROOF) 393 proof.send() 394 self.had_outbound() 395 396 def validate_proof(self, packet): 397 try: 398 if self.status == Link.PENDING: 399 signalling_bytes = b"" 400 confirmed_mtu = None 401 mode = Link.mode_from_lp_packet(packet) 402 RNS.log(f"Validating link request proof with mode {Link.MODE_DESCRIPTIONS[mode]}", RNS.LOG_DEBUG) # TODO: Remove debug 403 if mode != self.mode: raise TypeError(f"Invalid link mode {mode} in link request proof") 404 if len(packet.data) == RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2+Link.LINK_MTU_SIZE: 405 confirmed_mtu = Link.mtu_from_lp_packet(packet) 406 signalling_bytes = Link.signalling_bytes(confirmed_mtu, mode) 407 packet.data = packet.data[:RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2] 408 RNS.log(f"Destination confirmed link MTU of {RNS.prettysize(confirmed_mtu)}", RNS.LOG_DEBUG) # TODO: Remove debug 409 410 if self.initiator and len(packet.data) == RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2: 411 peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+Link.ECPUBSIZE//2] 412 peer_sig_pub_bytes = self.destination.identity.get_public_key()[Link.ECPUBSIZE//2:Link.ECPUBSIZE] 413 self.load_peer(peer_pub_bytes, peer_sig_pub_bytes) 414 self.handshake() 415 416 self.establishment_cost += len(packet.raw) 417 signed_data = self.link_id+self.peer_pub_bytes+self.peer_sig_pub_bytes+signalling_bytes 418 signature = packet.data[:RNS.Identity.SIGLENGTH//8] 419 420 if self.destination.identity.validate(signature, signed_data): 421 if self.status != Link.HANDSHAKE: 422 raise IOError("Invalid link state for proof validation: "+str(self.status)) 423 424 self.rtt = time.time() - self.request_time 425 self.attached_interface = packet.receiving_interface 426 self.__remote_identity = self.destination.identity 427 self.mtu = confirmed_mtu or RNS.Reticulum.MTU 428 self.update_mdu() 429 self.status = Link.ACTIVE 430 self.activated_at = time.time() 431 self.last_proof = self.activated_at 432 RNS.Transport.activate_link(self) 433 RNS.log("Link "+str(self)+" established with "+str(self.destination)+", RTT is "+RNS.prettyshorttime(self.rtt), RNS.LOG_DEBUG) 434 435 if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0: 436 self.establishment_rate = self.establishment_cost/self.rtt 437 438 self.__update_keepalive() 439 440 rtt_data = umsgpack.packb(self.rtt) 441 rtt_packet = RNS.Packet(self, rtt_data, context=RNS.Packet.LRRTT) 442 rtt_packet.send() 443 self.had_outbound() 444 self.__update_phy_stats(packet) 445 446 if self.callbacks.link_established != None: 447 thread = threading.Thread(target=self.callbacks.link_established, args=(self,)) 448 thread.daemon = True 449 thread.start() 450 else: 451 RNS.log("Invalid link proof signature received by "+str(self)+". Ignoring.", RNS.LOG_DEBUG) 452 453 except Exception as e: 454 self.status = Link.CLOSED 455 RNS.log("An error ocurred while validating link request proof on "+str(self)+".", RNS.LOG_ERROR) 456 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 457 458 459 def identify(self, identity): 460 """ 461 Identifies the initiator of the link to the remote peer. This can only happen 462 once the link has been established, and is carried out over the encrypted link. 463 The identity is only revealed to the remote peer, and initiator anonymity is 464 thus preserved. This method can be used for authentication. 465 466 :param identity: An RNS.Identity instance to identify as. 467 """ 468 if self.initiator and self.status == Link.ACTIVE: 469 signed_data = self.link_id + identity.get_public_key() 470 signature = identity.sign(signed_data) 471 proof_data = identity.get_public_key() + signature 472 473 proof = RNS.Packet(self, proof_data, RNS.Packet.DATA, context = RNS.Packet.LINKIDENTIFY) 474 proof.send() 475 self.had_outbound() 476 477 478 def request(self, path, data = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None): 479 """ 480 Sends a request to the remote peer. 481 482 :param path: The request path. 483 :param response_callback: An optional function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example<example-request>` for more info. 484 :param failed_callback: An optional function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example<example-request>` for more info. 485 :param progress_callback: An optional function or method with the signature *progress_callback(request_receipt)* to be called when progress is made receiving the response. Progress can be accessed as a float between 0.0 and 1.0 by the *request_receipt.progress* property. 486 :param timeout: An optional timeout in seconds for the request. If *None* is supplied it will be calculated based on link RTT. 487 :returns: A :ref:`RNS.RequestReceipt<api-requestreceipt>` instance if the request was sent, or *False* if it was not. 488 """ 489 request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) 490 unpacked_request = [time.time(), request_path_hash, data] 491 packed_request = umsgpack.packb(unpacked_request) 492 493 if timeout == None: 494 timeout = self.rtt * self.traffic_timeout_factor + RNS.Resource.RESPONSE_MAX_GRACE_TIME*1.125 495 496 if len(packed_request) <= self.mdu: 497 request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST) 498 packet_receipt = request_packet.send() 499 500 if packet_receipt == False: 501 return False 502 else: 503 packet_receipt.set_timeout(timeout) 504 return RequestReceipt( 505 self, 506 packet_receipt = packet_receipt, 507 response_callback = response_callback, 508 failed_callback = failed_callback, 509 progress_callback = progress_callback, 510 timeout = timeout, 511 request_size = len(packed_request), 512 ) 513 514 else: 515 request_id = RNS.Identity.truncated_hash(packed_request) 516 RNS.log("Sending request "+RNS.prettyhexrep(request_id)+" as resource.", RNS.LOG_DEBUG) 517 request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False, timeout = timeout) 518 519 return RequestReceipt( 520 self, 521 resource = request_resource, 522 response_callback = response_callback, 523 failed_callback = failed_callback, 524 progress_callback = progress_callback, 525 timeout = timeout, 526 request_size = len(packed_request), 527 ) 528 529 530 def update_mdu(self): 531 self.mdu = self.mtu - RNS.Reticulum.HEADER_MAXSIZE - RNS.Reticulum.IFAC_MIN_SIZE 532 self.mdu = math.floor((self.mtu-RNS.Reticulum.IFAC_MIN_SIZE-RNS.Reticulum.HEADER_MINSIZE-RNS.Identity.TOKEN_OVERHEAD)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1 533 534 def rtt_packet(self, packet): 535 try: 536 measured_rtt = time.time() - self.request_time 537 plaintext = self.decrypt(packet.data) 538 if plaintext != None: 539 rtt = umsgpack.unpackb(plaintext) 540 self.rtt = max(measured_rtt, rtt) 541 self.status = Link.ACTIVE 542 self.activated_at = time.time() 543 544 if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0: 545 self.establishment_rate = self.establishment_cost/self.rtt 546 547 self.__update_keepalive() 548 549 try: 550 if self.owner.callbacks.link_established != None: 551 self.owner.callbacks.link_established(self) 552 except Exception as e: 553 RNS.log("Error occurred in external link establishment callback. The contained exception was: "+str(e), RNS.LOG_ERROR) 554 555 except Exception as e: 556 RNS.log("Error occurred while processing RTT packet, tearing down link. The contained exception was: "+str(e), RNS.LOG_ERROR) 557 self.teardown() 558 559 def track_phy_stats(self, track): 560 """ 561 You can enable physical layer statistics on a per-link basis. If this is enabled, 562 and the link is running over an interface that supports reporting physical layer 563 statistics, you will be able to retrieve stats such as *RSSI*, *SNR* and physical 564 *Link Quality* for the link. 565 566 :param track: Whether or not to keep track of physical layer statistics. Value must be ``True`` or ``False``. 567 """ 568 if track: 569 self.__track_phy_stats = True 570 else: 571 self.__track_phy_stats = False 572 573 def get_rssi(self): 574 """ 575 :returns: The physical layer *Received Signal Strength Indication* if available, otherwise ``None``. Physical layer statistics must be enabled on the link for this method to return a value. 576 """ 577 if self.__track_phy_stats: 578 return self.rssi 579 else: 580 return None 581 582 def get_snr(self): 583 """ 584 :returns: The physical layer *Signal-to-Noise Ratio* if available, otherwise ``None``. Physical layer statistics must be enabled on the link for this method to return a value. 585 """ 586 if self.__track_phy_stats: 587 return self.snr 588 else: 589 return None 590 591 def get_q(self): 592 """ 593 :returns: The physical layer *Link Quality* if available, otherwise ``None``. Physical layer statistics must be enabled on the link for this method to return a value. 594 """ 595 if self.__track_phy_stats: 596 return self.q 597 else: 598 return None 599 600 def get_establishment_rate(self): 601 """ 602 :returns: The data transfer rate at which the link establishment procedure ocurred, in bits per second. 603 """ 604 if self.establishment_rate != None: 605 return self.establishment_rate*8 606 else: 607 return None 608 609 def get_mtu(self): 610 """ 611 :returns: The MTU of an established link. 612 """ 613 if self.status == Link.ACTIVE: 614 return self.mtu 615 else: 616 return None 617 618 def get_mdu(self): 619 """ 620 :returns: The packet MDU of an established link. 621 """ 622 if self.status == Link.ACTIVE: 623 return self.mdu 624 else: 625 return None 626 627 def get_expected_rate(self): 628 """ 629 :returns: The packet expected in-flight data rate of an established link. 630 """ 631 if self.status == Link.ACTIVE: 632 return self.expected_rate 633 else: 634 return None 635 636 def get_mode(self): 637 """ 638 :returns: The mode of an established link. 639 """ 640 return self.mode 641 642 def get_salt(self): 643 return self.link_id 644 645 def get_context(self): 646 return None 647 648 def get_age(self): 649 """ 650 :returns: The time in seconds since this link was established. 651 """ 652 if self.activated_at: 653 return time.time() - self.activated_at 654 else: 655 return None 656 657 def no_inbound_for(self): 658 """ 659 :returns: The time in seconds since last inbound packet on the link. This includes keepalive packets. 660 """ 661 activated_at = self.activated_at if self.activated_at != None else 0 662 last_inbound = max(self.last_inbound, activated_at) 663 return time.time() - last_inbound 664 665 def no_outbound_for(self): 666 """ 667 :returns: The time in seconds since last outbound packet on the link. This includes keepalive packets. 668 """ 669 return time.time() - self.last_outbound 670 671 def no_data_for(self): 672 """ 673 :returns: The time in seconds since payload data traversed the link. This excludes keepalive packets. 674 """ 675 return time.time() - self.last_data 676 677 def inactive_for(self): 678 """ 679 :returns: The time in seconds since activity on the link. This includes keepalive packets. 680 """ 681 return min(self.no_inbound_for(), self.no_outbound_for()) 682 683 def get_remote_identity(self): 684 """ 685 :returns: The identity of the remote peer, if it is known. Calling this method will not query the remote initiator to reveal its identity. Returns ``None`` if the link initiator has not already independently called the ``identify(identity)`` method. 686 """ 687 return self.__remote_identity 688 689 def had_outbound(self, is_keepalive=False): 690 self.last_outbound = time.time() 691 if not is_keepalive: self.last_data = self.last_outbound 692 else: self.last_keepalive = self.last_outbound 693 694 def __teardown_packet(self): 695 teardown_packet = RNS.Packet(self, self.link_id, context=RNS.Packet.LINKCLOSE) 696 teardown_packet.send() 697 self.had_outbound() 698 699 def teardown(self): 700 """ 701 Closes the link and purges encryption keys. New keys will 702 be used if a new link to the same destination is established. 703 """ 704 if self.status != Link.PENDING and self.status != Link.CLOSED: self.__teardown_packet() 705 self.status = Link.CLOSED 706 if self.initiator: self.teardown_reason = Link.INITIATOR_CLOSED 707 else: self.teardown_reason = Link.DESTINATION_CLOSED 708 self.link_closed() 709 710 def teardown_packet(self, packet): 711 try: 712 plaintext = self.decrypt(packet.data) 713 if plaintext == self.link_id: 714 self.status = Link.CLOSED 715 if self.initiator: 716 self.teardown_reason = Link.DESTINATION_CLOSED 717 else: 718 self.teardown_reason = Link.INITIATOR_CLOSED 719 self.__update_phy_stats(packet) 720 self.link_closed() 721 except Exception as e: 722 pass 723 724 def link_closed(self): 725 for resource in self.incoming_resources: 726 resource.cancel() 727 for resource in self.outgoing_resources: 728 resource.cancel() 729 if self._channel: 730 self._channel._shutdown() 731 732 self.prv = None 733 self.pub = None 734 self.pub_bytes = None 735 self.shared_key = None 736 self.derived_key = None 737 738 if self.destination != None: 739 if self.destination.direction == RNS.Destination.IN: 740 if self in self.destination.links: 741 self.destination.links.remove(self) 742 743 if self.callbacks.link_closed != None: 744 try: 745 self.callbacks.link_closed(self) 746 except Exception as e: 747 RNS.log("Error while executing link closed callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 748 749 750 def start_watchdog(self): 751 thread = threading.Thread(target=self.__watchdog_job) 752 thread.daemon = True 753 thread.start() 754 755 def __watchdog_job(self): 756 while not self.status == Link.CLOSED: 757 while (self.watchdog_lock): 758 rtt_wait = 0.025 759 if hasattr(self, "rtt") and self.rtt: 760 rtt_wait = self.rtt 761 762 sleep(max(rtt_wait, 0.025)) 763 764 if not self.status == Link.CLOSED: 765 # Link was initiated, but no response 766 # from destination yet 767 if self.status == Link.PENDING: 768 next_check = self.request_time + self.establishment_timeout 769 sleep_time = next_check - time.time() 770 if time.time() >= self.request_time + self.establishment_timeout: 771 RNS.log("Link establishment timed out", RNS.LOG_VERBOSE) 772 self.status = Link.CLOSED 773 self.teardown_reason = Link.TIMEOUT 774 self.link_closed() 775 sleep_time = 0.001 776 777 elif self.status == Link.HANDSHAKE: 778 next_check = self.request_time + self.establishment_timeout 779 sleep_time = next_check - time.time() 780 if time.time() >= self.request_time + self.establishment_timeout: 781 self.status = Link.CLOSED 782 self.teardown_reason = Link.TIMEOUT 783 self.link_closed() 784 sleep_time = 0.001 785 786 if self.initiator: 787 RNS.log("Timeout waiting for link request proof", RNS.LOG_DEBUG) 788 else: 789 RNS.log("Timeout waiting for RTT packet from link initiator", RNS.LOG_DEBUG) 790 791 elif self.status == Link.ACTIVE: 792 activated_at = self.activated_at if self.activated_at != None else 0 793 last_inbound = max(max(self.last_inbound, self.last_proof), activated_at) 794 now = time.time() 795 796 if now >= last_inbound + self.keepalive: 797 if self.initiator and now >= self.last_keepalive + self.keepalive: 798 self.send_keepalive() 799 800 if time.time() >= last_inbound + self.stale_time: 801 sleep_time = self.rtt * self.keepalive_timeout_factor + Link.STALE_GRACE 802 self.status = Link.STALE 803 else: 804 sleep_time = self.keepalive 805 806 else: 807 sleep_time = (last_inbound + self.keepalive) - time.time() 808 809 elif self.status == Link.STALE: 810 sleep_time = 0.001 811 self.__teardown_packet() 812 self.status = Link.CLOSED 813 self.teardown_reason = Link.TIMEOUT 814 self.link_closed() 815 816 817 if sleep_time == 0: 818 RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_ERROR) 819 if sleep_time == None or sleep_time < 0: 820 RNS.log("Timing error! Tearing down link "+str(self)+" now.", RNS.LOG_ERROR) 821 self.teardown() 822 sleep_time = 0.1 823 824 sleep_time = min(sleep_time, Link.WATCHDOG_MAX_SLEEP) 825 sleep(sleep_time) 826 827 if not self.__track_phy_stats: 828 self.rssi = None 829 self.snr = None 830 self.q = None 831 832 833 def __update_phy_stats(self, packet, query_shared = True, force_update = False): 834 if self.__track_phy_stats or force_update: 835 if query_shared: 836 reticulum = RNS.Reticulum.get_instance() 837 if packet.rssi == None: packet.rssi = reticulum.get_packet_rssi(packet.packet_hash) 838 if packet.snr == None: packet.snr = reticulum.get_packet_snr(packet.packet_hash) 839 if packet.q == None: packet.q = reticulum.get_packet_q(packet.packet_hash) 840 841 if packet.rssi != None: 842 self.rssi = packet.rssi 843 if packet.snr != None: 844 self.snr = packet.snr 845 if packet.q != None: 846 self.q = packet.q 847 848 def __update_keepalive(self): 849 self.keepalive = max(min(self.rtt*(Link.KEEPALIVE_MAX/Link.KEEPALIVE_MAX_RTT), Link.KEEPALIVE_MAX), Link.KEEPALIVE_MIN) 850 self.stale_time = self.keepalive * Link.STALE_FACTOR 851 852 def send_keepalive(self): 853 keepalive_packet = RNS.Packet(self, bytes([0xFF]), context=RNS.Packet.KEEPALIVE) 854 keepalive_packet.send() 855 self.had_outbound(is_keepalive = True) 856 857 def handle_request(self, request_id, unpacked_request): 858 if self.status == Link.ACTIVE: 859 requested_at = unpacked_request[0] 860 path_hash = unpacked_request[1] 861 request_data = unpacked_request[2] 862 863 if path_hash in self.destination.request_handlers: 864 request_handler = self.destination.request_handlers[path_hash] 865 path = request_handler[0] 866 response_generator = request_handler[1] 867 allow = request_handler[2] 868 allowed_list = request_handler[3] 869 auto_compress = request_handler[4] 870 871 allowed = False 872 if not allow == RNS.Destination.ALLOW_NONE: 873 if allow == RNS.Destination.ALLOW_LIST: 874 if self.__remote_identity != None and self.__remote_identity.hash in allowed_list: 875 allowed = True 876 elif allow == RNS.Destination.ALLOW_ALL: 877 allowed = True 878 879 if allowed: 880 RNS.log("Handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_DEBUG) 881 if len(inspect.signature(response_generator).parameters) == 5: 882 response = response_generator(path, request_data, request_id, self.__remote_identity, requested_at) 883 elif len(inspect.signature(response_generator).parameters) == 6: 884 response = response_generator(path, request_data, request_id, self.link_id, self.__remote_identity, requested_at) 885 else: 886 raise TypeError("Invalid signature for response generator callback") 887 888 file_response = False 889 file_handle = None 890 if type(response) == list or type(response) == tuple: 891 metadata = None 892 if len(response) > 0 and type(response[0]) == io.BufferedReader: 893 if len(response) > 1: metadata = response[1] 894 file_handle = response[0] 895 file_response = True 896 897 if response != None: 898 if file_response: 899 response_resource = RNS.Resource(file_handle, self, metadata=metadata, request_id = request_id, is_response = True, auto_compress=auto_compress) 900 else: 901 packed_response = umsgpack.packb([request_id, response]) 902 if len(packed_response) <= self.mdu: 903 RNS.Packet(self, packed_response, RNS.Packet.DATA, context = RNS.Packet.RESPONSE).send() 904 else: 905 response_resource = RNS.Resource(packed_response, self, request_id = request_id, is_response = True, auto_compress=auto_compress) 906 else: 907 identity_string = str(self.get_remote_identity()) if self.get_remote_identity() != None else "<Unknown>" 908 RNS.log("Request "+RNS.prettyhexrep(request_id)+" from "+identity_string+" not allowed for: "+str(path), RNS.LOG_DEBUG) 909 910 def handle_response(self, request_id, response_data, response_size, response_transfer_size, metadata=None): 911 if self.status == Link.ACTIVE: 912 remove = None 913 for pending_request in self.pending_requests: 914 if pending_request.request_id == request_id: 915 remove = pending_request 916 try: 917 pending_request.response_size = response_size 918 if pending_request.response_transfer_size == None: 919 pending_request.response_transfer_size = 0 920 pending_request.response_transfer_size += response_transfer_size 921 pending_request.response_received(response_data, metadata) 922 except Exception as e: 923 RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR) 924 925 break 926 927 if remove != None: 928 if remove in self.pending_requests: 929 self.pending_requests.remove(remove) 930 931 def request_resource_concluded(self, resource): 932 if resource.status == RNS.Resource.COMPLETE: 933 packed_request = resource.data.read() 934 unpacked_request = umsgpack.unpackb(packed_request) 935 request_id = RNS.Identity.truncated_hash(packed_request) 936 request_data = unpacked_request 937 938 def job(): self.handle_request(request_id, request_data) 939 threading.Thread(target=job, daemon=True).start() 940 else: 941 RNS.log("Incoming request resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) 942 943 def response_resource_concluded(self, resource): 944 if resource.status == RNS.Resource.COMPLETE: 945 # If the response resource has metadata, this 946 # is a file response, and we'll pass the open 947 # file handle directly. 948 if resource.has_metadata: 949 self.handle_response(resource.request_id, resource.data, resource.total_size, resource.size, metadata=resource.metadata) 950 951 # If not, we'll unpack the response data and 952 # pass the unpacked structure to the handler 953 else: 954 packed_response = resource.data.read() 955 unpacked_response = umsgpack.unpackb(packed_response) 956 request_id = unpacked_response[0] 957 response_data = unpacked_response[1] 958 self.handle_response(request_id, response_data, resource.total_size, resource.size) 959 960 else: 961 RNS.log("Incoming response resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) 962 for pending_request in self.pending_requests: 963 if pending_request.request_id == resource.request_id: 964 pending_request.request_timed_out(None) 965 966 def get_channel(self): 967 """ 968 Get the ``Channel`` for this link. 969 970 :return: ``Channel`` object 971 """ 972 if self._channel is None: 973 self._channel = Channel(LinkChannelOutlet(self)) 974 return self._channel 975 976 def receive(self, packet): 977 self.watchdog_lock = True 978 if not self.status == Link.CLOSED and not (self.initiator and packet.context == RNS.Packet.KEEPALIVE and packet.data == bytes([0xFF])): 979 if packet.receiving_interface != self.attached_interface: 980 RNS.log(f"Link-associated packet received on unexpected interface {packet.receiving_interface} instead of {self.attached_interface}! Someone might be trying to manipulate your communication!", RNS.LOG_ERROR) 981 else: 982 self.last_inbound = time.time() 983 if packet.context != RNS.Packet.KEEPALIVE: 984 self.last_data = self.last_inbound 985 self.rx += 1 986 self.rxbytes += len(packet.data) 987 if self.status == Link.STALE: 988 self.status = Link.ACTIVE 989 990 if packet.packet_type == RNS.Packet.DATA: 991 should_query = False 992 if packet.context == RNS.Packet.NONE: 993 plaintext = self.decrypt(packet.data) 994 packet.ratchet_id = self.link_id 995 if plaintext != None: 996 self.__update_phy_stats(packet, query_shared=True) 997 998 if self.callbacks.packet != None: 999 thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet)) 1000 thread.daemon = True 1001 thread.start() 1002 1003 if self.destination.proof_strategy == RNS.Destination.PROVE_ALL: 1004 packet.prove() 1005 1006 elif self.destination.proof_strategy == RNS.Destination.PROVE_APP: 1007 if self.destination.callbacks.proof_requested: 1008 try: 1009 if self.destination.callbacks.proof_requested(packet): 1010 packet.prove() 1011 except Exception as e: 1012 RNS.log("Error while executing proof request callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1013 1014 elif packet.context == RNS.Packet.LINKIDENTIFY: 1015 plaintext = self.decrypt(packet.data) 1016 if plaintext != None: 1017 if not self.initiator and len(plaintext) == RNS.Identity.KEYSIZE//8 + RNS.Identity.SIGLENGTH//8: 1018 public_key = plaintext[:RNS.Identity.KEYSIZE//8] 1019 signed_data = self.link_id+public_key 1020 signature = plaintext[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.SIGLENGTH//8] 1021 identity = RNS.Identity(create_keys=False) 1022 identity.load_public_key(public_key) 1023 1024 if identity.validate(signature, signed_data): 1025 self.__remote_identity = identity 1026 if self.callbacks.remote_identified != None: 1027 try: 1028 self.callbacks.remote_identified(self, self.__remote_identity) 1029 except Exception as e: 1030 RNS.log("Error while executing remote identified callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1031 1032 self.__update_phy_stats(packet, query_shared=True) 1033 1034 elif packet.context == RNS.Packet.REQUEST: 1035 try: 1036 request_id = packet.getTruncatedHash() 1037 packed_request = self.decrypt(packet.data) 1038 if packed_request != None: 1039 unpacked_request = umsgpack.unpackb(packed_request) 1040 def job(): self.handle_request(request_id, unpacked_request) 1041 threading.Thread(target=job, daemon=True).start() 1042 self.__update_phy_stats(packet, query_shared=True) 1043 except Exception as e: 1044 RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR) 1045 1046 elif packet.context == RNS.Packet.RESPONSE: 1047 try: 1048 packed_response = self.decrypt(packet.data) 1049 if packed_response != None: 1050 unpacked_response = umsgpack.unpackb(packed_response) 1051 request_id = unpacked_response[0] 1052 response_data = unpacked_response[1] 1053 transfer_size = len(umsgpack.packb(response_data))-2 1054 def job(): self.handle_response(request_id, response_data, transfer_size, transfer_size) 1055 threading.Thread(target=job, daemon=True).start() 1056 self.__update_phy_stats(packet, query_shared=True) 1057 except Exception as e: 1058 RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR) 1059 1060 elif packet.context == RNS.Packet.LRRTT: 1061 if not self.initiator: 1062 self.rtt_packet(packet) 1063 self.__update_phy_stats(packet, query_shared=True) 1064 1065 elif packet.context == RNS.Packet.LINKCLOSE: 1066 self.teardown_packet(packet) 1067 self.__update_phy_stats(packet, query_shared=True) 1068 1069 elif packet.context == RNS.Packet.RESOURCE_ADV: 1070 packet.plaintext = self.decrypt(packet.data) 1071 if packet.plaintext != None: 1072 self.__update_phy_stats(packet, query_shared=True) 1073 1074 if RNS.ResourceAdvertisement.is_request(packet): 1075 RNS.Resource.accept(packet, callback=self.request_resource_concluded) 1076 elif RNS.ResourceAdvertisement.is_response(packet): 1077 request_id = RNS.ResourceAdvertisement.read_request_id(packet) 1078 for pending_request in self.pending_requests: 1079 if pending_request.request_id == request_id: 1080 response_resource = RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id) 1081 if response_resource != None: 1082 if pending_request.response_size == None: 1083 pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet) 1084 if pending_request.response_transfer_size == None: 1085 pending_request.response_transfer_size = 0 1086 pending_request.response_transfer_size += RNS.ResourceAdvertisement.read_transfer_size(packet) 1087 if pending_request.started_at == None: 1088 pending_request.started_at = time.time() 1089 pending_request.response_resource_progress(response_resource) 1090 1091 elif self.resource_strategy == Link.ACCEPT_NONE: pass 1092 elif self.resource_strategy == Link.ACCEPT_APP: 1093 if self.callbacks.resource != None: 1094 try: 1095 resource_advertisement = RNS.ResourceAdvertisement.unpack(packet.plaintext) 1096 resource_advertisement.link = self 1097 if self.callbacks.resource(resource_advertisement): RNS.Resource.accept(packet, self.callbacks.resource_concluded) 1098 else: RNS.Resource.reject(packet) 1099 except Exception as e: 1100 RNS.log("Error while executing resource accept callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1101 elif self.resource_strategy == Link.ACCEPT_ALL: 1102 RNS.Resource.accept(packet, self.callbacks.resource_concluded) 1103 1104 elif packet.context == RNS.Packet.RESOURCE_REQ: 1105 plaintext = self.decrypt(packet.data) 1106 if plaintext != None: 1107 self.__update_phy_stats(packet, query_shared=True) 1108 if ord(plaintext[:1]) == RNS.Resource.HASHMAP_IS_EXHAUSTED: 1109 resource_hash = plaintext[1+RNS.Resource.MAPHASH_LEN:RNS.Identity.HASHLENGTH//8+1+RNS.Resource.MAPHASH_LEN] 1110 else: 1111 resource_hash = plaintext[1:RNS.Identity.HASHLENGTH//8+1] 1112 1113 for resource in self.outgoing_resources: 1114 if resource.hash == resource_hash: 1115 # We need to check that this request has not been 1116 # received before in order to avoid sequencing errors. 1117 if not packet.packet_hash in resource.req_hashlist: 1118 resource.req_hashlist.append(packet.packet_hash) 1119 resource.request(plaintext) 1120 1121 # TODO: Test and possibly enable this at some point 1122 # def request_job(): 1123 # resource.request(plaintext) 1124 # threading.Thread(target=request_job, daemon=True).start() 1125 1126 elif packet.context == RNS.Packet.RESOURCE_HMU: 1127 plaintext = self.decrypt(packet.data) 1128 if plaintext != None: 1129 self.__update_phy_stats(packet, query_shared=True) 1130 resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8] 1131 for resource in self.incoming_resources: 1132 if resource_hash == resource.hash: 1133 resource.hashmap_update_packet(plaintext) 1134 1135 elif packet.context == RNS.Packet.RESOURCE_ICL: 1136 plaintext = self.decrypt(packet.data) 1137 if plaintext != None: 1138 self.__update_phy_stats(packet) 1139 resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8] 1140 for resource in self.incoming_resources: 1141 if resource_hash == resource.hash: 1142 resource.cancel() 1143 1144 elif packet.context == RNS.Packet.RESOURCE_RCL: 1145 plaintext = self.decrypt(packet.data) 1146 if plaintext != None: 1147 self.__update_phy_stats(packet) 1148 resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8] 1149 for resource in self.outgoing_resources: 1150 if resource_hash == resource.hash: 1151 resource._rejected() 1152 1153 elif packet.context == RNS.Packet.KEEPALIVE: 1154 if not self.initiator and packet.data == bytes([0xFF]): 1155 keepalive_packet = RNS.Packet(self, bytes([0xFE]), context=RNS.Packet.KEEPALIVE) 1156 keepalive_packet.send() 1157 self.had_outbound(is_keepalive = True) 1158 1159 1160 # TODO: find the most efficient way to allow multiple 1161 # transfers at the same time, sending resource hash on 1162 # each packet is a huge overhead. Probably some kind 1163 # of hash -> sequence map 1164 elif packet.context == RNS.Packet.RESOURCE: 1165 for resource in self.incoming_resources: 1166 resource.receive_part(packet) 1167 self.__update_phy_stats(packet) 1168 1169 elif packet.context == RNS.Packet.CHANNEL: 1170 if not self._channel: 1171 RNS.log(f"Channel data received without open channel", RNS.LOG_DEBUG) 1172 else: 1173 packet.prove() 1174 plaintext = self.decrypt(packet.data) 1175 if plaintext != None: 1176 self.__update_phy_stats(packet) 1177 self._channel._receive(plaintext) 1178 1179 elif packet.packet_type == RNS.Packet.PROOF: 1180 if packet.context == RNS.Packet.RESOURCE_PRF: 1181 resource_hash = packet.data[0:RNS.Identity.HASHLENGTH//8] 1182 for resource in self.outgoing_resources: 1183 if resource_hash == resource.hash: 1184 def job(): resource.validate_proof(packet.data) 1185 threading.Thread(target=job, daemon=True).start() 1186 self.__update_phy_stats(packet, query_shared=True) 1187 1188 self.watchdog_lock = False 1189 1190 1191 def encrypt(self, plaintext): 1192 try: 1193 if not self.token: 1194 try: self.token = Token(self.derived_key) 1195 except Exception as e: 1196 RNS.log("Could not instantiate token while performing encryption on link "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1197 raise e 1198 1199 return self.token.encrypt(plaintext) 1200 1201 except Exception as e: 1202 RNS.log("Encryption on link "+str(self)+" failed. The contained exception was: "+str(e), RNS.LOG_ERROR) 1203 raise e 1204 1205 1206 def decrypt(self, ciphertext): 1207 try: 1208 if not self.token: self.token = Token(self.derived_key) 1209 return self.token.decrypt(ciphertext) 1210 1211 except Exception as e: 1212 RNS.log("Decryption failed on link "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1213 return None 1214 1215 1216 def sign(self, message): 1217 return self.sig_prv.sign(message) 1218 1219 def validate(self, signature, message): 1220 try: 1221 self.peer_sig_pub.verify(signature, message) 1222 return True 1223 except Exception as e: 1224 return False 1225 1226 def set_link_established_callback(self, callback): 1227 self.callbacks.link_established = callback 1228 1229 def set_link_closed_callback(self, callback): 1230 """ 1231 Registers a function to be called when a link has been 1232 torn down. 1233 1234 :param callback: A function or method with the signature *callback(link)* to be called. 1235 """ 1236 self.callbacks.link_closed = callback 1237 1238 def set_packet_callback(self, callback): 1239 """ 1240 Registers a function to be called when a packet has been 1241 received over this link. 1242 1243 :param callback: A function or method with the signature *callback(message, packet)* to be called. 1244 """ 1245 self.callbacks.packet = callback 1246 1247 def set_resource_callback(self, callback): 1248 """ 1249 Registers a function to be called when a resource has been 1250 advertised over this link. If the function returns *True* 1251 the resource will be accepted. If it returns *False* it will 1252 be ignored. 1253 1254 :param callback: A function or method with the signature *callback(resource)* to be called. Please note that only the basic information of the resource is available at this time, such as *get_transfer_size()*, *get_data_size()*, *get_parts()* and *is_compressed()*. 1255 """ 1256 self.callbacks.resource = callback 1257 1258 def set_resource_started_callback(self, callback): 1259 """ 1260 Registers a function to be called when a resource has begun 1261 transferring over this link. 1262 1263 :param callback: A function or method with the signature *callback(resource)* to be called. 1264 """ 1265 self.callbacks.resource_started = callback 1266 1267 def set_resource_concluded_callback(self, callback): 1268 """ 1269 Registers a function to be called when a resource has concluded 1270 transferring over this link. 1271 1272 :param callback: A function or method with the signature *callback(resource)* to be called. 1273 """ 1274 self.callbacks.resource_concluded = callback 1275 1276 def set_remote_identified_callback(self, callback): 1277 """ 1278 Registers a function to be called when an initiating peer has 1279 identified over this link. 1280 1281 :param callback: A function or method with the signature *callback(link, identity)* to be called. 1282 """ 1283 self.callbacks.remote_identified = callback 1284 1285 def resource_concluded(self, resource): 1286 concluded_at = time.time() 1287 if resource in self.incoming_resources: 1288 self.last_resource_window = resource.window 1289 self.last_resource_eifr = resource.eifr 1290 self.incoming_resources.remove(resource) 1291 self.expected_rate = (resource.size*8)/(max(concluded_at-resource.started_transferring, 0.0001)) 1292 if resource in self.outgoing_resources: 1293 self.outgoing_resources.remove(resource) 1294 self.expected_rate = (resource.size*8)/(max(concluded_at-resource.started_transferring, 0.0001)) 1295 1296 def set_resource_strategy(self, resource_strategy): 1297 """ 1298 Sets the resource strategy for the link. 1299 1300 :param resource_strategy: One of ``RNS.Link.ACCEPT_NONE``, ``RNS.Link.ACCEPT_ALL`` or ``RNS.Link.ACCEPT_APP``. If ``RNS.Link.ACCEPT_APP`` is set, the `resource_callback` will be called to determine whether the resource should be accepted or not. 1301 :raises: *TypeError* if the resource strategy is unsupported. 1302 """ 1303 if not resource_strategy in Link.resource_strategies: 1304 raise TypeError("Unsupported resource strategy") 1305 else: 1306 self.resource_strategy = resource_strategy 1307 1308 def register_outgoing_resource(self, resource): 1309 self.outgoing_resources.append(resource) 1310 1311 def register_incoming_resource(self, resource): 1312 self.incoming_resources.append(resource) 1313 1314 def has_incoming_resource(self, resource): 1315 for incoming_resource in self.incoming_resources: 1316 if incoming_resource.hash == resource.hash: 1317 return True 1318 1319 return False 1320 1321 def get_last_resource_window(self): 1322 return self.last_resource_window 1323 1324 def get_last_resource_eifr(self): 1325 return self.last_resource_eifr 1326 1327 def cancel_outgoing_resource(self, resource): 1328 if resource in self.outgoing_resources: 1329 self.outgoing_resources.remove(resource) 1330 else: 1331 RNS.log("Attempt to cancel a non-existing outgoing resource", RNS.LOG_ERROR) 1332 1333 def cancel_incoming_resource(self, resource): 1334 if resource in self.incoming_resources: 1335 self.incoming_resources.remove(resource) 1336 else: 1337 RNS.log("Attempt to cancel a non-existing incoming resource", RNS.LOG_ERROR) 1338 1339 def ready_for_new_resource(self): 1340 if len(self.outgoing_resources) > 0: 1341 return False 1342 else: 1343 return True 1344 1345 def __str__(self): 1346 return RNS.prettyhexrep(self.link_id) 1347 1348 1349 class RequestReceipt(): 1350 """ 1351 An instance of this class is returned by the ``request`` method of ``RNS.Link`` 1352 instances. It should never be instantiated manually. It provides methods to 1353 check status, response time and response data when the request concludes. 1354 """ 1355 1356 FAILED = 0x00 1357 SENT = 0x01 1358 DELIVERED = 0x02 1359 RECEIVING = 0x03 1360 READY = 0x04 1361 1362 def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None, request_size = None): 1363 self.packet_receipt = packet_receipt 1364 self.resource = resource 1365 self.started_at = None 1366 1367 if self.packet_receipt != None: 1368 self.hash = packet_receipt.truncated_hash 1369 self.packet_receipt.set_timeout_callback(self.request_timed_out) 1370 self.started_at = time.time() 1371 1372 elif self.resource != None: 1373 self.hash = resource.request_id 1374 resource.set_callback(self.request_resource_concluded) 1375 1376 self.link = link 1377 self.request_id = self.hash 1378 self.request_size = request_size 1379 1380 self.response = None 1381 self.response_transfer_size = None 1382 self.response_size = None 1383 self.metadata = None 1384 self.status = RequestReceipt.SENT 1385 self.sent_at = time.time() 1386 self.progress = 0 1387 self.concluded_at = None 1388 self.response_concluded_at = None 1389 1390 if timeout != None: 1391 self.timeout = timeout 1392 else: 1393 raise ValueError("No timeout specified for request receipt") 1394 1395 self.callbacks = RequestReceiptCallbacks() 1396 self.callbacks.response = response_callback 1397 self.callbacks.failed = failed_callback 1398 self.callbacks.progress = progress_callback 1399 1400 self.link.pending_requests.append(self) 1401 1402 1403 def request_resource_concluded(self, resource): 1404 if resource.status == RNS.Resource.COMPLETE: 1405 RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG) 1406 if self.started_at == None: 1407 self.started_at = time.time() 1408 self.status = RequestReceipt.DELIVERED 1409 self.__resource_response_timeout = time.time()+self.timeout 1410 response_timeout_thread = threading.Thread(target=self.__response_timeout_job) 1411 response_timeout_thread.daemon = True 1412 response_timeout_thread.start() 1413 else: 1414 RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) 1415 self.status = RequestReceipt.FAILED 1416 self.concluded_at = time.time() 1417 self.link.pending_requests.remove(self) 1418 1419 if self.callbacks.failed != None: 1420 try: 1421 self.callbacks.failed(self) 1422 except Exception as e: 1423 RNS.log("Error while executing request failed callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1424 1425 1426 def __response_timeout_job(self): 1427 while self.status == RequestReceipt.DELIVERED: 1428 now = time.time() 1429 if now > self.__resource_response_timeout: 1430 self.request_timed_out(None) 1431 break 1432 1433 time.sleep(0.1) 1434 1435 1436 def request_timed_out(self, packet_receipt): 1437 if self in self.link.pending_requests and self.status == RequestReceipt.DELIVERED: 1438 self.status = RequestReceipt.FAILED 1439 self.concluded_at = time.time() 1440 self.link.pending_requests.remove(self) 1441 1442 if self.callbacks.failed != None: 1443 try: self.callbacks.failed(self) 1444 except Exception as e: 1445 RNS.log("Error while executing request timed out callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1446 1447 1448 def response_resource_progress(self, resource): 1449 if resource != None: 1450 if not self.status == RequestReceipt.FAILED: 1451 self.status = RequestReceipt.RECEIVING 1452 if self.packet_receipt != None: 1453 if self.packet_receipt.status != RNS.PacketReceipt.DELIVERED: 1454 self.packet_receipt.status = RNS.PacketReceipt.DELIVERED 1455 self.packet_receipt.proved = True 1456 self.packet_receipt.concluded_at = time.time() 1457 if self.packet_receipt.callbacks.delivery != None: 1458 self.packet_receipt.callbacks.delivery(self.packet_receipt) 1459 1460 self.progress = resource.get_progress() 1461 1462 if self.callbacks.progress != None: 1463 try: 1464 self.callbacks.progress(self) 1465 except Exception as e: 1466 RNS.log("Error while executing response progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1467 else: 1468 resource.cancel() 1469 1470 1471 def response_received(self, response, metadata=None): 1472 if not self.status == RequestReceipt.FAILED: 1473 self.progress = 1.0 1474 self.response = response 1475 self.metadata = metadata 1476 self.status = RequestReceipt.READY 1477 self.response_concluded_at = time.time() 1478 1479 if self.packet_receipt != None: 1480 self.packet_receipt.status = RNS.PacketReceipt.DELIVERED 1481 self.packet_receipt.proved = True 1482 self.packet_receipt.concluded_at = time.time() 1483 if self.packet_receipt.callbacks.delivery != None: 1484 self.packet_receipt.callbacks.delivery(self.packet_receipt) 1485 1486 if self.callbacks.progress != None: 1487 try: self.callbacks.progress(self) 1488 except Exception as e: 1489 RNS.log("Error while executing response progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1490 1491 if self.callbacks.response != None: 1492 try: self.callbacks.response(self) 1493 except Exception as e: 1494 RNS.log("Error while executing response received callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1495 1496 def get_request_id(self): 1497 """ 1498 :returns: The request ID as *bytes*. 1499 """ 1500 return self.request_id 1501 1502 def get_status(self): 1503 """ 1504 :returns: The current status of the request, one of ``RNS.RequestReceipt.FAILED``, ``RNS.RequestReceipt.SENT``, ``RNS.RequestReceipt.DELIVERED``, ``RNS.RequestReceipt.READY``. 1505 """ 1506 return self.status 1507 1508 def get_progress(self): 1509 """ 1510 :returns: The progress of a response being received as a *float* between 0.0 and 1.0. 1511 """ 1512 return self.progress 1513 1514 def get_response(self): 1515 """ 1516 :returns: The response as *bytes* if it is ready, otherwise *None*. 1517 """ 1518 if self.status == RequestReceipt.READY: 1519 return self.response 1520 else: 1521 return None 1522 1523 def get_response_time(self): 1524 """ 1525 :returns: The response time of the request in seconds. 1526 """ 1527 if self.status == RequestReceipt.READY: 1528 return self.response_concluded_at - self.started_at 1529 else: 1530 return None 1531 1532 def concluded(self): 1533 """ 1534 :returns: True if the associated request has concluded (successfully or with a failure), otherwise False. 1535 """ 1536 if self.status == RequestReceipt.READY: 1537 return True 1538 elif self.status == RequestReceipt.FAILED: 1539 return True 1540 else: 1541 return False 1542 1543 1544 1545 class RequestReceiptCallbacks: 1546 def __init__(self): 1547 self.response = None 1548 self.failed = None 1549 self.progress = None