Destination.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import os 32 import math 33 import time 34 import threading 35 import RNS 36 37 from RNS.Cryptography import Token 38 from .vendor import umsgpack as umsgpack 39 40 class Callbacks: 41 def __init__(self): 42 self.link_established = None 43 self.packet = None 44 self.proof_requested = None 45 46 class Destination: 47 """ 48 A class used to describe endpoints in a Reticulum Network. Destination 49 instances are used both to create outgoing and incoming endpoints. The 50 destination type will decide if encryption, and what type, is used in 51 communication with the endpoint. A destination can also announce its 52 presence on the network, which will distribute necessary keys for 53 encrypted communication with it. 54 55 :param identity: An instance of :ref:`RNS.Identity<api-identity>`. Can hold only public keys for an outgoing destination, or holding private keys for an ingoing. 56 :param direction: ``RNS.Destination.IN`` or ``RNS.Destination.OUT``. 57 :param type: ``RNS.Destination.SINGLE``, ``RNS.Destination.GROUP`` or ``RNS.Destination.PLAIN``. 58 :param app_name: A string specifying the app name. 59 :param \\*aspects: Any non-zero number of string arguments. 60 """ 61 62 # Constants 63 SINGLE = 0x00 64 GROUP = 0x01 65 PLAIN = 0x02 66 LINK = 0x03 67 types = [SINGLE, GROUP, PLAIN, LINK] 68 69 PROVE_NONE = 0x21 70 PROVE_APP = 0x22 71 PROVE_ALL = 0x23 72 proof_strategies = [PROVE_NONE, PROVE_APP, PROVE_ALL] 73 74 ALLOW_NONE = 0x00 75 ALLOW_ALL = 0x01 76 ALLOW_LIST = 0x02 77 request_policies = [ALLOW_NONE, ALLOW_ALL, ALLOW_LIST] 78 79 IN = 0x11; 80 OUT = 0x12; 81 directions = [IN, OUT] 82 83 PR_TAG_WINDOW = 30 84 85 RATCHET_COUNT = 512 86 """ 87 The default number of generated ratchet keys a destination will retain, if it has ratchets enabled. 88 """ 89 90 RATCHET_INTERVAL = 30*60 91 """ 92 The minimum interval between rotating ratchet keys, in seconds. 93 """ 94 95 @staticmethod 96 def expand_name(identity, app_name, *aspects): 97 """ 98 :returns: A string containing the full human-readable name of the destination, for an app_name and a number of aspects. 99 """ 100 101 # Check input values and build name string 102 if "." in app_name: raise ValueError("Dots can't be used in app names") 103 104 name = app_name 105 for aspect in aspects: 106 if "." in aspect: raise ValueError("Dots can't be used in aspects") 107 name += "." + aspect 108 109 if identity != None: 110 name += "." + identity.hexhash 111 112 return name 113 114 115 @staticmethod 116 def hash(identity, app_name, *aspects): 117 """ 118 :returns: A destination name in adressable hash form, for an app_name and a number of aspects. 119 """ 120 name_hash = RNS.Identity.full_hash(Destination.expand_name(None, app_name, *aspects).encode("utf-8"))[:(RNS.Identity.NAME_HASH_LENGTH//8)] 121 addr_hash_material = name_hash 122 if identity != None: 123 if isinstance(identity, RNS.Identity): 124 addr_hash_material += identity.hash 125 elif isinstance(identity, bytes) and len(identity) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: 126 addr_hash_material += identity 127 else: 128 raise TypeError("Invalid material supplied for destination hash calculation") 129 130 return RNS.Identity.full_hash(addr_hash_material)[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8] 131 132 @staticmethod 133 def app_and_aspects_from_name(full_name): 134 """ 135 :returns: A tuple containing the app name and a list of aspects, for a full-name string. 136 """ 137 components = full_name.split(".") 138 return (components[0], components[1:]) 139 140 @staticmethod 141 def hash_from_name_and_identity(full_name, identity): 142 """ 143 :returns: A destination name in adressable hash form, for a full name string and Identity instance. 144 """ 145 app_name, aspects = Destination.app_and_aspects_from_name(full_name) 146 147 return Destination.hash(identity, app_name, *aspects) 148 149 def __init__(self, identity, direction, type, app_name, *aspects): 150 # Check input values and build name string 151 if "." in app_name: raise ValueError("Dots can't be used in app names") 152 if not type in Destination.types: raise ValueError("Unknown destination type") 153 if not direction in Destination.directions: raise ValueError("Unknown destination direction") 154 155 self.accept_link_requests = True 156 self.callbacks = Callbacks() 157 self.request_handlers = {} 158 self.type = type 159 self.direction = direction 160 self.proof_strategy = Destination.PROVE_NONE 161 self.ratchets = None 162 self.ratchets_path = None 163 self.ratchet_interval = Destination.RATCHET_INTERVAL 164 self.ratchet_file_lock = threading.Lock() 165 self.retained_ratchets = Destination.RATCHET_COUNT 166 self.latest_ratchet_time = None 167 self.latest_ratchet_id = None 168 self.__enforce_ratchets = False 169 self.mtu = 0 170 171 self.path_responses = {} 172 self.links = [] 173 174 if identity == None and direction == Destination.IN and self.type != Destination.PLAIN: 175 identity = RNS.Identity() 176 aspects = aspects+(identity.hexhash,) 177 178 if identity == None and direction == Destination.OUT and self.type != Destination.PLAIN: 179 raise ValueError("Can't create outbound SINGLE destination without an identity") 180 181 if identity != None and self.type == Destination.PLAIN: 182 raise TypeError("Selected destination type PLAIN cannot hold an identity") 183 184 self.identity = identity 185 self.name = Destination.expand_name(identity, app_name, *aspects) 186 187 # Generate the destination address hash 188 self.hash = Destination.hash(self.identity, app_name, *aspects) 189 self.name_hash = RNS.Identity.full_hash(self.expand_name(None, app_name, *aspects).encode("utf-8"))[:(RNS.Identity.NAME_HASH_LENGTH//8)] 190 self.hexhash = self.hash.hex() 191 192 self.default_app_data = None 193 self.callback = None 194 self.proofcallback = None 195 196 RNS.Transport.register_destination(self) 197 198 199 def __str__(self): 200 """ 201 :returns: A human-readable representation of the destination including addressable hash and full name. 202 """ 203 return "<"+self.name+":"+self.hexhash+">" 204 205 def _clean_ratchets(self): 206 if self.ratchets != None: 207 if len (self.ratchets) > self.retained_ratchets: 208 self.ratchets = self.ratchets[:Destination.RATCHET_COUNT] 209 210 def _persist_ratchets(self): 211 try: 212 with self.ratchet_file_lock: 213 temp_write_path = self.ratchets_path+".tmp" 214 packed_ratchets = umsgpack.packb(self.ratchets) 215 persisted_data = {"signature": self.sign(packed_ratchets), "ratchets": packed_ratchets} 216 ratchets_file = open(temp_write_path, "wb") 217 ratchets_file.write(umsgpack.packb(persisted_data)) 218 ratchets_file.close() 219 if os.path.isfile(self.ratchets_path): os.unlink(self.ratchets_path) 220 os.rename(temp_write_path, self.ratchets_path) 221 except Exception as e: 222 RNS.trace_exception(e) 223 self.ratchets = None 224 self.ratchets_path = None 225 raise OSError("Could not write ratchet file contents for "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 226 227 def rotate_ratchets(self): 228 if self.ratchets != None: 229 now = time.time() 230 if now > self.latest_ratchet_time+self.ratchet_interval: 231 RNS.log("Rotating ratchets for "+str(self), RNS.LOG_DEBUG) 232 new_ratchet = RNS.Identity._generate_ratchet() 233 self.ratchets.insert(0, new_ratchet) 234 self.latest_ratchet_time = now 235 self._clean_ratchets() 236 self._persist_ratchets() 237 return True 238 else: 239 raise SystemError("Cannot rotate ratchet on "+str(self)+", ratchets are not enabled") 240 241 return False 242 243 def announce(self, app_data=None, path_response=False, attached_interface=None, tag=None, send=True): 244 """ 245 Creates an announce packet for this destination and broadcasts it on all 246 relevant interfaces. Application specific data can be added to the announce. 247 248 :param app_data: *bytes* containing the app_data. 249 :param path_response: Internal flag used by :ref:`RNS.Transport<api-transport>`. Ignore. 250 """ 251 if self.type != Destination.SINGLE: 252 raise TypeError("Only SINGLE destination types can be announced") 253 254 if self.direction != Destination.IN: 255 raise TypeError("Only IN destination types can be announced") 256 257 ratchet = b"" 258 now = time.time() 259 stale_responses = [] 260 for entry_tag in self.path_responses: 261 entry = self.path_responses[entry_tag] 262 if now > entry[0]+Destination.PR_TAG_WINDOW: 263 stale_responses.append(entry_tag) 264 265 for entry_tag in stale_responses: 266 self.path_responses.pop(entry_tag) 267 268 if (path_response == True and tag != None) and tag in self.path_responses: 269 # This code is currently not used, since Transport will block duplicate 270 # path requests based on tags. When multi-path support is implemented in 271 # Transport, this will allow Transport to detect redundant paths to the 272 # same destination, and select the best one based on chosen criteria, 273 # since it will be able to detect that a single emitted announce was 274 # received via multiple paths. The difference in reception time will 275 # potentially also be useful in determining characteristics of the 276 # multiple available paths, and to choose the best one. 277 RNS.log("Using cached announce data for answering path request with tag "+RNS.prettyhexrep(tag), RNS.LOG_EXTREME) 278 announce_data = self.path_responses[tag][1] 279 280 else: 281 destination_hash = self.hash 282 random_hash = RNS.Identity.get_random_hash()[0:5]+int(time.time()).to_bytes(5, "big") 283 284 if self.ratchets != None: 285 self.rotate_ratchets() 286 ratchet = RNS.Identity._ratchet_public_bytes(self.ratchets[0]) 287 RNS.Identity._remember_ratchet(self.hash, ratchet) 288 289 if app_data == None and self.default_app_data != None: 290 if isinstance(self.default_app_data, bytes): 291 app_data = self.default_app_data 292 elif callable(self.default_app_data): 293 returned_app_data = self.default_app_data() 294 if isinstance(returned_app_data, bytes): 295 app_data = returned_app_data 296 297 signed_data = self.hash+self.identity.get_public_key()+self.name_hash+random_hash+ratchet 298 if app_data != None: 299 signed_data += app_data 300 301 signature = self.identity.sign(signed_data) 302 announce_data = self.identity.get_public_key()+self.name_hash+random_hash+ratchet+signature 303 304 if app_data != None: 305 announce_data += app_data 306 307 self.path_responses[tag] = [time.time(), announce_data] 308 309 if path_response: 310 announce_context = RNS.Packet.PATH_RESPONSE 311 else: 312 announce_context = RNS.Packet.NONE 313 314 if ratchet: 315 context_flag = RNS.Packet.FLAG_SET 316 else: 317 context_flag = RNS.Packet.FLAG_UNSET 318 319 announce_packet = RNS.Packet(self, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, 320 attached_interface = attached_interface, context_flag=context_flag) 321 if send: 322 announce_packet.send() 323 else: 324 return announce_packet 325 326 def accepts_links(self, accepts = None): 327 """ 328 Set or query whether the destination accepts incoming link requests. 329 330 :param accepts: If ``True`` or ``False``, this method sets whether the destination accepts incoming link requests. If not provided or ``None``, the method returns whether the destination currently accepts link requests. 331 :returns: ``True`` or ``False`` depending on whether the destination accepts incoming link requests, if the *accepts* parameter is not provided or ``None``. 332 """ 333 if accepts == None: 334 return self.accept_link_requests 335 336 if accepts: 337 self.accept_link_requests = True 338 else: 339 self.accept_link_requests = False 340 341 def set_link_established_callback(self, callback): 342 """ 343 Registers a function to be called when a link has been established to 344 this destination. 345 346 :param callback: A function or method with the signature *callback(link)* to be called when a new link is established with this destination. 347 """ 348 self.callbacks.link_established = callback 349 350 def set_packet_callback(self, callback): 351 """ 352 Registers a function to be called when a packet has been received by 353 this destination. 354 355 :param callback: A function or method with the signature *callback(data, packet)* to be called when this destination receives a packet. 356 """ 357 self.callbacks.packet = callback 358 359 def set_proof_requested_callback(self, callback): 360 """ 361 Registers a function to be called when a proof has been requested for 362 a packet sent to this destination. Allows control over when and if 363 proofs should be returned for received packets. 364 365 :param callback: A function or method to with the signature *callback(packet)* be called when a packet that requests a proof is received. The callback must return one of True or False. If the callback returns True, a proof will be sent. If it returns False, a proof will not be sent. 366 """ 367 self.callbacks.proof_requested = callback 368 369 def set_proof_strategy(self, proof_strategy): 370 """ 371 Sets the destinations proof strategy. 372 373 :param proof_strategy: One of ``RNS.Destination.PROVE_NONE``, ``RNS.Destination.PROVE_ALL`` or ``RNS.Destination.PROVE_APP``. If ``RNS.Destination.PROVE_APP`` is set, the `proof_requested_callback` will be called to determine whether a proof should be sent or not. 374 """ 375 if not proof_strategy in Destination.proof_strategies: 376 raise TypeError("Unsupported proof strategy") 377 else: 378 self.proof_strategy = proof_strategy 379 380 def register_request_handler(self, path, response_generator = None, allow = ALLOW_NONE, allowed_list = None, auto_compress = True): 381 """ 382 Registers a request handler. 383 384 :param path: The path for the request handler to be registered. 385 :param response_generator: A function or method with the signature *response_generator(path, data, request_id, link_id, remote_identity, requested_at)* to be called. Whatever this funcion returns will be sent as a response to the requester. If the function returns ``None``, no response will be sent. 386 :param allow: One of ``RNS.Destination.ALLOW_NONE``, ``RNS.Destination.ALLOW_ALL`` or ``RNS.Destination.ALLOW_LIST``. If ``RNS.Destination.ALLOW_LIST`` is set, the request handler will only respond to requests for identified peers in the supplied list. 387 :param allowed_list: A list of *bytes-like* :ref:`RNS.Identity<api-identity>` hashes. 388 :param auto_compress: If ``True`` or ``False``, determines whether automatic compression of responses should be carried out. If set to an integer value, responses will only be auto-compressed if under this size in bytes. If omitted, the default compression settings will be followed. 389 :raises: ``ValueError`` if any of the supplied arguments are invalid. 390 """ 391 if path == None or path == "": raise ValueError("Invalid path specified") 392 elif not callable(response_generator): raise ValueError("Invalid response generator specified") 393 elif not allow in Destination.request_policies: raise ValueError("Invalid request policy") 394 else: 395 path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) 396 request_handler = [path, response_generator, allow, allowed_list, auto_compress] 397 self.request_handlers[path_hash] = request_handler 398 399 def deregister_request_handler(self, path): 400 """ 401 Deregisters a request handler. 402 403 :param path: The path for the request handler to be deregistered. 404 :returns: True if the handler was deregistered, otherwise False. 405 """ 406 path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) 407 if path_hash in self.request_handlers: 408 self.request_handlers.pop(path_hash) 409 return True 410 else: 411 return False 412 413 def receive(self, packet): 414 if packet.packet_type == RNS.Packet.LINKREQUEST: 415 plaintext = packet.data 416 self.incoming_link_request(plaintext, packet) 417 else: 418 plaintext = self.decrypt(packet.data) 419 packet.ratchet_id = self.latest_ratchet_id 420 if plaintext == None: return False 421 else: 422 if packet.packet_type == RNS.Packet.DATA: 423 if self.callbacks.packet != None: 424 try: 425 self.callbacks.packet(plaintext, packet) 426 except Exception as e: 427 RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 428 429 return True 430 431 def incoming_link_request(self, data, packet): 432 if self.accept_link_requests: 433 link = RNS.Link.validate_request(self, data, packet) 434 if link != None: 435 self.links.append(link) 436 437 def _reload_ratchets(self, ratchets_path): 438 if os.path.isfile(ratchets_path): 439 with self.ratchet_file_lock: 440 def load_attempt(): 441 ratchets_file = open(ratchets_path, "rb") 442 persisted_data = umsgpack.unpackb(ratchets_file.read()) 443 if "signature" in persisted_data and "ratchets" in persisted_data: 444 if self.identity.validate(persisted_data["signature"], persisted_data["ratchets"]): 445 self.ratchets = umsgpack.unpackb(persisted_data["ratchets"]) 446 self.ratchets_path = ratchets_path 447 else: 448 raise KeyError("Invalid ratchet file signature") 449 450 try: 451 try: 452 load_attempt() 453 454 except Exception as e: 455 RNS.trace_exception(e) 456 RNS.log(f"First ratchet reload attempt for {self} failed. Possible I/O conflict. Retrying in 500ms.", RNS.LOG_ERROR) 457 time.sleep(0.5) 458 load_attempt() 459 RNS.log(f"Ratchet reload retry succeeded", RNS.LOG_DEBUG) 460 461 except Exception as e: 462 self.ratchets = None 463 self.ratchets_path = None 464 RNS.trace_exception(e) 465 raise OSError("Could not read ratchet file contents for "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 466 467 else: 468 RNS.log("No existing ratchet data found, initialising new ratchet file for "+str(self), RNS.LOG_DEBUG) 469 self.ratchets = [] 470 self.ratchets_path = ratchets_path 471 self._persist_ratchets() 472 473 def enable_ratchets(self, ratchets_path): 474 """ 475 Enables ratchets on the destination. When ratchets are enabled, Reticulum will automatically rotate 476 the keys used to encrypt packets to this destination, and include the latest ratchet key in announces. 477 478 Enabling ratchets on a destination will provide forward secrecy for packets sent to that destination, 479 even when sent outside a ``Link``. The normal Reticulum ``Link`` establishment procedure already performs 480 its own ephemeral key exchange for each link establishment, which means that ratchets are not necessary 481 to provide forward secrecy for links. 482 483 Enabling ratchets will have a small impact on announce size, adding 32 bytes to every sent announce. 484 485 :param ratchets_path: The path to a file to store ratchet data in. 486 :returns: True if the operation succeeded, otherwise False. 487 """ 488 if ratchets_path != None: 489 self.latest_ratchet_time = 0 490 self._reload_ratchets(ratchets_path) 491 492 RNS.log("Ratchets enabled on "+str(self), RNS.LOG_DEBUG) 493 return True 494 495 else: 496 raise ValueError("No ratchet file path specified for "+str(self)) 497 498 def enforce_ratchets(self): 499 """ 500 When ratchet enforcement is enabled, this destination will never accept packets that use its 501 base Identity key for encryption, but only accept packets encrypted with one of the retained 502 ratchet keys. 503 """ 504 if self.ratchets != None: 505 self.__enforce_ratchets = True 506 RNS.log("Ratchets enforced on "+str(self), RNS.LOG_DEBUG) 507 return True 508 else: 509 return False 510 511 def set_retained_ratchets(self, retained_ratchets): 512 """ 513 Sets the number of previously generated ratchet keys this destination will retain, 514 and try to use when decrypting incoming packets. Defaults to ``Destination.RATCHET_COUNT``. 515 516 :param retained_ratchets: The number of generated ratchets to retain. 517 :returns: True if the operation succeeded, False if not. 518 """ 519 if isinstance(retained_ratchets, int) and retained_ratchets > 0: 520 self.retained_ratchets = retained_ratchets 521 self._clean_ratchets() 522 return True 523 else: 524 return False 525 526 def set_ratchet_interval(self, interval): 527 """ 528 Sets the minimum interval in seconds between ratchet key rotation. 529 Defaults to ``Destination.RATCHET_INTERVAL``. 530 531 :param interval: The minimum interval in seconds. 532 :returns: True if the operation succeeded, False if not. 533 """ 534 if isinstance(interval, int) and interval > 0: 535 self.ratchet_interval = interval 536 return True 537 else: 538 return False 539 540 def create_keys(self): 541 """ 542 For a ``RNS.Destination.GROUP`` type destination, creates a new symmetric key. 543 544 :raises: ``TypeError`` if called on an incompatible type of destination. 545 """ 546 if self.type == Destination.PLAIN: 547 raise TypeError("A plain destination does not hold any keys") 548 549 if self.type == Destination.SINGLE: 550 raise TypeError("A single destination holds keys through an Identity instance") 551 552 if self.type == Destination.GROUP: 553 self.prv_bytes = Token.generate_key() 554 self.prv = Token(self.prv_bytes) 555 556 def get_private_key(self): 557 """ 558 For a ``RNS.Destination.GROUP`` type destination, returns the symmetric private key. 559 560 :raises: ``TypeError`` if called on an incompatible type of destination. 561 """ 562 if self.type == Destination.PLAIN: 563 raise TypeError("A plain destination does not hold any keys") 564 elif self.type == Destination.SINGLE: 565 raise TypeError("A single destination holds keys through an Identity instance") 566 else: 567 return self.prv_bytes 568 569 def load_private_key(self, key): 570 """ 571 For a ``RNS.Destination.GROUP`` type destination, loads a symmetric private key. 572 573 :param key: A *bytes-like* containing the symmetric key. 574 :raises: ``TypeError`` if called on an incompatible type of destination. 575 """ 576 if self.type == Destination.PLAIN: 577 raise TypeError("A plain destination does not hold any keys") 578 579 if self.type == Destination.SINGLE: 580 raise TypeError("A single destination holds keys through an Identity instance") 581 582 if self.type == Destination.GROUP: 583 self.prv_bytes = key 584 self.prv = Token(self.prv_bytes) 585 586 def load_public_key(self, key): 587 if self.type != Destination.SINGLE: 588 raise TypeError("Only the \"single\" destination type can hold a public key") 589 else: 590 raise TypeError("A single destination holds keys through an Identity instance") 591 592 def encrypt(self, plaintext): 593 """ 594 Encrypts information for ``RNS.Destination.SINGLE`` or ``RNS.Destination.GROUP`` type destination. 595 596 :param plaintext: A *bytes-like* containing the plaintext to be encrypted. 597 :raises: ``ValueError`` if destination does not hold a necessary key for encryption. 598 """ 599 if self.type == Destination.PLAIN: 600 return plaintext 601 602 if self.type == Destination.SINGLE and self.identity != None: 603 selected_ratchet = RNS.Identity.get_ratchet(self.hash) 604 if selected_ratchet: 605 self.latest_ratchet_id = RNS.Identity._get_ratchet_id(selected_ratchet) 606 return self.identity.encrypt(plaintext, ratchet=selected_ratchet) 607 608 if self.type == Destination.GROUP: 609 if hasattr(self, "prv") and self.prv != None: 610 try: 611 return self.prv.encrypt(plaintext) 612 except Exception as e: 613 RNS.log("The GROUP destination could not encrypt data", RNS.LOG_ERROR) 614 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 615 else: 616 raise ValueError("No private key held by GROUP destination. Did you create or load one?") 617 618 def decrypt(self, ciphertext): 619 """ 620 Decrypts information for ``RNS.Destination.SINGLE`` or ``RNS.Destination.GROUP`` type destination. 621 622 :param ciphertext: *Bytes* containing the ciphertext to be decrypted. 623 :raises: ``ValueError`` if destination does not hold a necessary key for decryption. 624 """ 625 if self.type == Destination.PLAIN: 626 return ciphertext 627 628 if self.type == Destination.SINGLE and self.identity != None: 629 if self.ratchets: 630 decrypted = None 631 try: 632 decrypted = self.identity.decrypt(ciphertext, ratchets=self.ratchets, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self) 633 except: 634 decrypted = None 635 636 if not decrypted: 637 try: 638 RNS.log(f"Decryption with ratchets failed on {self}, reloading ratchets from storage and retrying", RNS.LOG_ERROR) 639 self._reload_ratchets(self.ratchets_path) 640 decrypted = self.identity.decrypt(ciphertext, ratchets=self.ratchets, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self) 641 except Exception as e: 642 RNS.log(f"Decryption still failing after ratchet reload. The contained exception was: {e}", RNS.LOG_ERROR) 643 raise e 644 645 if decrypted: RNS.log("Decryption succeeded after ratchet reload", RNS.LOG_NOTICE) 646 647 return decrypted 648 649 else: 650 return self.identity.decrypt(ciphertext, ratchets=None, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self) 651 652 if self.type == Destination.GROUP: 653 if hasattr(self, "prv") and self.prv != None: 654 try: 655 return self.prv.decrypt(ciphertext) 656 except Exception as e: 657 RNS.log("The GROUP destination could not decrypt data", RNS.LOG_ERROR) 658 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 659 else: 660 raise ValueError("No private key held by GROUP destination. Did you create or load one?") 661 662 def sign(self, message): 663 """ 664 Signs information for ``RNS.Destination.SINGLE`` type destination. 665 666 :param message: *Bytes* containing the message to be signed. 667 :returns: A *bytes-like* containing the message signature, or *None* if the destination could not sign the message. 668 """ 669 if self.type == Destination.SINGLE and self.identity != None: 670 return self.identity.sign(message) 671 else: 672 return None 673 674 def set_default_app_data(self, app_data=None): 675 """ 676 Sets the default app_data for the destination. If set, the default 677 app_data will be included in every announce sent by the destination, 678 unless other app_data is specified in the *announce* method. 679 680 :param app_data: A *bytes-like* containing the default app_data, or a *callable* returning a *bytes-like* containing the app_data. 681 """ 682 self.default_app_data = app_data 683 684 def clear_default_app_data(self): 685 """ 686 Clears default app_data previously set for the destination. 687 """ 688 self.set_default_app_data(app_data=None)