Destination.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import os 32 import math 33 import time 34 import threading 35 import RNS 36 37 from RNS.Cryptography import Token 38 from .vendor import umsgpack as umsgpack 39 40 class Callbacks: 41 def __init__(self): 42 self.link_established = None 43 self.packet = None 44 self.proof_requested = None 45 46 class Destination: 47 """ 48 A class used to describe endpoints in a Reticulum Network. Destination 49 instances are used both to create outgoing and incoming endpoints. The 50 destination type will decide if encryption, and what type, is used in 51 communication with the endpoint. A destination can also announce its 52 presence on the network, which will distribute necessary keys for 53 encrypted communication with it. 54 55 :param identity: An instance of :ref:`RNS.Identity<api-identity>`. Can hold only public keys for an outgoing destination, or holding private keys for an ingoing. 56 :param direction: ``RNS.Destination.IN`` or ``RNS.Destination.OUT``. 57 :param type: ``RNS.Destination.SINGLE``, ``RNS.Destination.GROUP`` or ``RNS.Destination.PLAIN``. 58 :param app_name: A string specifying the app name. 59 :param \\*aspects: Any non-zero number of string arguments. 60 """ 61 62 # Constants 63 SINGLE = 0x00 64 GROUP = 0x01 65 PLAIN = 0x02 66 LINK = 0x03 67 types = [SINGLE, GROUP, PLAIN, LINK] 68 69 PROVE_NONE = 0x21 70 PROVE_APP = 0x22 71 PROVE_ALL = 0x23 72 proof_strategies = [PROVE_NONE, PROVE_APP, PROVE_ALL] 73 74 ALLOW_NONE = 0x00 75 ALLOW_ALL = 0x01 76 ALLOW_LIST = 0x02 77 request_policies = [ALLOW_NONE, ALLOW_ALL, ALLOW_LIST] 78 79 IN = 0x11; 80 OUT = 0x12; 81 directions = [IN, OUT] 82 83 PR_TAG_WINDOW = 30 84 85 RATCHET_COUNT = 512 86 """ 87 The default number of generated ratchet keys a destination will retain, if it has ratchets enabled. 88 """ 89 90 RATCHET_INTERVAL = 30*60 91 """ 92 The minimum interval between rotating ratchet keys, in seconds. 93 """ 94 95 @staticmethod 96 def expand_name(identity, app_name, *aspects): 97 """ 98 :returns: A string containing the full human-readable name of the destination, for an app_name and a number of aspects. 99 """ 100 101 # Check input values and build name string 102 if "." in app_name: raise ValueError("Dots can't be used in app names") 103 104 name = app_name 105 for aspect in aspects: 106 if "." in aspect: raise ValueError("Dots can't be used in aspects") 107 name += "." + aspect 108 109 if identity != None: 110 name += "." + identity.hexhash 111 112 return name 113 114 115 @staticmethod 116 def hash(identity, app_name, *aspects): 117 """ 118 :returns: A destination name in adressable hash form, for an app_name and a number of aspects. 119 """ 120 name_hash = RNS.Identity.full_hash(Destination.expand_name(None, app_name, *aspects).encode("utf-8"))[:(RNS.Identity.NAME_HASH_LENGTH//8)] 121 addr_hash_material = name_hash 122 if identity != None: 123 if isinstance(identity, RNS.Identity): 124 addr_hash_material += identity.hash 125 elif isinstance(identity, bytes) and len(identity) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: 126 addr_hash_material += identity 127 else: 128 raise TypeError("Invalid material supplied for destination hash calculation") 129 130 return RNS.Identity.full_hash(addr_hash_material)[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8] 131 132 @staticmethod 133 def app_and_aspects_from_name(full_name): 134 """ 135 :returns: A tuple containing the app name and a list of aspects, for a full-name string. 136 """ 137 components = full_name.split(".") 138 return (components[0], components[1:]) 139 140 @staticmethod 141 def hash_from_name_and_identity(full_name, identity): 142 """ 143 :returns: A destination name in adressable hash form, for a full name string and Identity instance. 144 """ 145 app_name, aspects = Destination.app_and_aspects_from_name(full_name) 146 147 return Destination.hash(identity, app_name, *aspects) 148 149 def __init__(self, identity, direction, type, app_name, *aspects): 150 # Check input values and build name string 151 if "." in app_name: raise ValueError("Dots can't be used in app names") 152 if not type in Destination.types: raise ValueError("Unknown destination type") 153 if not direction in Destination.directions: raise ValueError("Unknown destination direction") 154 155 self.accept_link_requests = True 156 self.callbacks = Callbacks() 157 self.request_handlers = {} 158 self.type = type 159 self.direction = direction 160 self.proof_strategy = Destination.PROVE_NONE 161 self.ratchets = None 162 self.ratchets_path = None 163 self.ratchet_interval = Destination.RATCHET_INTERVAL 164 self.ratchet_file_lock = threading.Lock() 165 self.retained_ratchets = Destination.RATCHET_COUNT 166 self.latest_ratchet_time = None 167 self.latest_ratchet_id = None 168 self.__enforce_ratchets = False 169 self.mtu = 0 170 171 self.path_responses = {} 172 self.links = [] 173 174 if identity == None and direction == Destination.IN and self.type != Destination.PLAIN: 175 identity = RNS.Identity() 176 aspects = aspects+(identity.hexhash,) 177 178 if identity == None and direction == Destination.OUT and self.type != Destination.PLAIN: 179 raise ValueError("Can't create outbound SINGLE destination without an identity") 180 181 if identity != None and self.type == Destination.PLAIN: 182 raise TypeError("Selected destination type PLAIN cannot hold an identity") 183 184 self.identity = identity 185 self.name = Destination.expand_name(identity, app_name, *aspects) 186 187 # Generate the destination address hash 188 self.hash = Destination.hash(self.identity, app_name, *aspects) 189 self.name_hash = RNS.Identity.full_hash(self.expand_name(None, app_name, *aspects).encode("utf-8"))[:(RNS.Identity.NAME_HASH_LENGTH//8)] 190 self.hexhash = self.hash.hex() 191 192 self.default_app_data = None 193 self.callback = None 194 self.proofcallback = None 195 196 RNS.Transport.register_destination(self) 197 198 199 def __str__(self): 200 """ 201 :returns: A human-readable representation of the destination including addressable hash and full name. 202 """ 203 return "<"+self.name+":"+self.hexhash+">" 204 205 def _clean_ratchets(self): 206 if self.ratchets != None: 207 if len (self.ratchets) > self.retained_ratchets: 208 self.ratchets = self.ratchets[:Destination.RATCHET_COUNT] 209 210 def _persist_ratchets(self): 211 try: 212 with self.ratchet_file_lock: 213 temp_write_path = self.ratchets_path+".tmp" 214 packed_ratchets = umsgpack.packb(self.ratchets) 215 persisted_data = {"signature": self.sign(packed_ratchets), "ratchets": packed_ratchets} 216 ratchets_file = open(temp_write_path, "wb") 217 ratchets_file.write(umsgpack.packb(persisted_data)) 218 ratchets_file.close() 219 if os.path.isfile(self.ratchets_path): os.unlink(self.ratchets_path) 220 os.rename(temp_write_path, self.ratchets_path) 221 except Exception as e: 222 RNS.trace_exception(e) 223 self.ratchets = None 224 self.ratchets_path = None 225 raise OSError("Could not write ratchet file contents for "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 226 227 def rotate_ratchets(self): 228 if self.ratchets != None: 229 now = time.time() 230 if now > self.latest_ratchet_time+self.ratchet_interval: 231 RNS.log("Rotating ratchets for "+str(self), RNS.LOG_DEBUG) 232 new_ratchet = RNS.Identity._generate_ratchet() 233 self.ratchets.insert(0, new_ratchet) 234 self.latest_ratchet_time = now 235 self._clean_ratchets() 236 self._persist_ratchets() 237 return True 238 else: 239 raise SystemError("Cannot rotate ratchet on "+str(self)+", ratchets are not enabled") 240 241 return False 242 243 def announce(self, app_data=None, path_response=False, attached_interface=None, tag=None, send=True): 244 """ 245 Creates an announce packet for this destination and broadcasts it on all 246 relevant interfaces. Application specific data can be added to the announce. 247 248 :param app_data: *bytes* containing the app_data. 249 :param path_response: Internal flag used by :ref:`RNS.Transport<api-transport>`. Ignore. 250 """ 251 if self.type != Destination.SINGLE: 252 raise TypeError("Only SINGLE destination types can be announced") 253 254 if self.direction != Destination.IN: 255 raise TypeError("Only IN destination types can be announced") 256 257 ratchet = b"" 258 now = time.time() 259 stale_responses = [] 260 for entry_tag in self.path_responses: 261 entry = self.path_responses[entry_tag] 262 if now > entry[0]+Destination.PR_TAG_WINDOW: 263 stale_responses.append(entry_tag) 264 265 for entry_tag in stale_responses: 266 self.path_responses.pop(entry_tag) 267 268 if (path_response == True and tag != None) and tag in self.path_responses: 269 # This code is currently not used, since Transport will block duplicate 270 # path requests based on tags. When multi-path support is implemented in 271 # Transport, this will allow Transport to detect redundant paths to the 272 # same destination, and select the best one based on chosen criteria, 273 # since it will be able to detect that a single emitted announce was 274 # received via multiple paths. The difference in reception time will 275 # potentially also be useful in determining characteristics of the 276 # multiple available paths, and to choose the best one. 277 RNS.log("Using cached announce data for answering path request with tag "+RNS.prettyhexrep(tag), RNS.LOG_EXTREME) 278 announce_data = self.path_responses[tag][1] 279 280 else: 281 destination_hash = self.hash 282 random_hash = RNS.Identity.get_random_hash()[0:5]+int(time.time()).to_bytes(5, "big") 283 284 if self.ratchets != None: 285 self.rotate_ratchets() 286 ratchet = RNS.Identity._ratchet_public_bytes(self.ratchets[0]) 287 RNS.Identity._remember_ratchet(self.hash, ratchet) 288 289 if app_data == None and self.default_app_data != None: 290 if isinstance(self.default_app_data, bytes): 291 app_data = self.default_app_data 292 elif callable(self.default_app_data): 293 returned_app_data = self.default_app_data() 294 if isinstance(returned_app_data, bytes): 295 app_data = returned_app_data 296 297 signed_data = self.hash+self.identity.get_public_key()+self.name_hash+random_hash+ratchet 298 if app_data != None: 299 signed_data += app_data 300 301 signature = self.identity.sign(signed_data) 302 announce_data = self.identity.get_public_key()+self.name_hash+random_hash+ratchet+signature 303 304 if app_data != None: 305 announce_data += app_data 306 307 self.path_responses[tag] = [time.time(), announce_data] 308 309 if path_response: 310 announce_context = RNS.Packet.PATH_RESPONSE 311 else: 312 announce_context = RNS.Packet.NONE 313 314 if ratchet: 315 context_flag = RNS.Packet.FLAG_SET 316 else: 317 context_flag = RNS.Packet.FLAG_UNSET 318 319 announce_packet = RNS.Packet(self, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, 320 attached_interface = attached_interface, context_flag=context_flag) 321 if send: 322 announce_packet.send() 323 else: 324 return announce_packet 325 326 def accepts_links(self, accepts = None): 327 """ 328 Set or query whether the destination accepts incoming link requests. 329 330 :param accepts: If ``True`` or ``False``, this method sets whether the destination accepts incoming link requests. If not provided or ``None``, the method returns whether the destination currently accepts link requests. 331 :returns: ``True`` or ``False`` depending on whether the destination accepts incoming link requests, if the *accepts* parameter is not provided or ``None``. 332 """ 333 if accepts == None: 334 return self.accept_link_requests 335 336 if accepts: 337 self.accept_link_requests = True 338 else: 339 self.accept_link_requests = False 340 341 def set_link_established_callback(self, callback): 342 """ 343 Registers a function to be called when a link has been established to 344 this destination. 345 346 :param callback: A function or method with the signature *callback(link)* to be called when a new link is established with this destination. 347 """ 348 self.callbacks.link_established = callback 349 350 def set_packet_callback(self, callback): 351 """ 352 Registers a function to be called when a packet has been received by 353 this destination. 354 355 :param callback: A function or method with the signature *callback(data, packet)* to be called when this destination receives a packet. 356 """ 357 self.callbacks.packet = callback 358 359 def set_proof_requested_callback(self, callback): 360 """ 361 Registers a function to be called when a proof has been requested for 362 a packet sent to this destination. Allows control over when and if 363 proofs should be returned for received packets. 364 365 :param callback: A function or method to with the signature *callback(packet)* be called when a packet that requests a proof is received. The callback must return one of True or False. If the callback returns True, a proof will be sent. If it returns False, a proof will not be sent. 366 """ 367 self.callbacks.proof_requested = callback 368 369 def set_proof_strategy(self, proof_strategy): 370 """ 371 Sets the destinations proof strategy. 372 373 :param proof_strategy: One of ``RNS.Destination.PROVE_NONE``, ``RNS.Destination.PROVE_ALL`` or ``RNS.Destination.PROVE_APP``. If ``RNS.Destination.PROVE_APP`` is set, the `proof_requested_callback` will be called to determine whether a proof should be sent or not. 374 """ 375 if not proof_strategy in Destination.proof_strategies: 376 raise TypeError("Unsupported proof strategy") 377 else: 378 self.proof_strategy = proof_strategy 379 380 def register_request_handler(self, path, response_generator = None, allow = ALLOW_NONE, allowed_list = None, auto_compress = True): 381 """ 382 Registers a request handler. 383 384 :param path: The path for the request handler to be registered. 385 :param response_generator: A function or method with the signature *response_generator(path, data, request_id, link_id, remote_identity, requested_at)* to be called. Whatever this funcion returns will be sent as a response to the requester. If the function returns ``None``, no response will be sent. 386 :param allow: One of ``RNS.Destination.ALLOW_NONE``, ``RNS.Destination.ALLOW_ALL`` or ``RNS.Destination.ALLOW_LIST``. If ``RNS.Destination.ALLOW_LIST`` is set, the request handler will only respond to requests for identified peers in the supplied list. 387 :param allowed_list: A list of *bytes-like* :ref:`RNS.Identity<api-identity>` hashes. 388 :param auto_compress: If ``True`` or ``False``, determines whether automatic compression of responses should be carried out. If set to an integer value, responses will only be auto-compressed if under this size in bytes. If omitted, the default compression settings will be followed. 389 :raises: ``ValueError`` if any of the supplied arguments are invalid. 390 """ 391 if path == None or path == "": raise ValueError("Invalid path specified") 392 elif not callable(response_generator): raise ValueError("Invalid response generator specified") 393 elif not allow in Destination.request_policies: raise ValueError("Invalid request policy") 394 else: 395 path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) 396 request_handler = [path, response_generator, allow, allowed_list, auto_compress] 397 self.request_handlers[path_hash] = request_handler 398 399 def deregister_request_handler(self, path): 400 """ 401 Deregisters a request handler. 402 403 :param path: The path for the request handler to be deregistered. 404 :returns: True if the handler was deregistered, otherwise False. 405 """ 406 path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) 407 if path_hash in self.request_handlers: 408 self.request_handlers.pop(path_hash) 409 return True 410 else: 411 return False 412 413 def receive(self, packet): 414 if packet.packet_type == RNS.Packet.LINKREQUEST: 415 plaintext = packet.data 416 self.incoming_link_request(plaintext, packet) 417 else: 418 plaintext = self.decrypt(packet.data) 419 packet.ratchet_id = self.latest_ratchet_id 420 if plaintext == None: return False 421 else: 422 if packet.packet_type == RNS.Packet.DATA: 423 if self.callbacks.packet != None: 424 try: 425 self.callbacks.packet(plaintext, packet) 426 except Exception as e: 427 RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 428 429 return True 430 431 def incoming_link_request(self, data, packet): 432 if self.accept_link_requests: 433 link = RNS.Link.validate_request(self, data, packet) 434 if link != None: 435 self.links.append(link) 436 437 def _reload_ratchets(self, ratchets_path): 438 if os.path.isfile(ratchets_path): 439 with self.ratchet_file_lock: 440 def load_attempt(): 441 ratchets_file = open(ratchets_path, "rb") 442 persisted_data = umsgpack.unpackb(ratchets_file.read()) 443 if "signature" in persisted_data and "ratchets" in persisted_data: 444 if self.identity.validate(persisted_data["signature"], persisted_data["ratchets"]): 445 self.ratchets = umsgpack.unpackb(persisted_data["ratchets"]) 446 self.ratchets_path = ratchets_path 447 else: 448 raise KeyError("Invalid ratchet file signature") 449 450 try: 451 try: 452 load_attempt() 453 454 except Exception as e: 455 RNS.trace_exception(e) 456 RNS.log(f"First ratchet reload attempt for {self} failed. Possible I/O conflict. Retrying in 500ms.", RNS.LOG_ERROR) 457 time.sleep(0.5) 458 load_attempt() 459 RNS.log(f"Ratchet reload retry succeeded", RNS.LOG_DEBUG) 460 461 except Exception as e: 462 self.ratchets = None 463 self.ratchets_path = None 464 RNS.trace_exception(e) 465 RNS.log(f"The ratchet file located at {ratchets_path} could not be loaded. This could indicate that the ratchet file has become corrupt.", RNS.LOG_CRITICAL) 466 RNS.log(f"You can attempt to manually recover the ratchet file, or simply remove it to have Reticulum recreate it on the next use.", RNS.LOG_CRITICAL) 467 RNS.log(f"If re-initialize this ratchet file, make sure to send an announce for the relevant destination as soon as possible,", RNS.LOG_CRITICAL) 468 RNS.log(f"so that the new ratchet information is synchronized to the network.", RNS.LOG_CRITICAL) 469 raise OSError("Could not read ratchet file contents for "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 470 471 else: 472 RNS.log("No existing ratchet data found, initialising new ratchet file for "+str(self), RNS.LOG_DEBUG) 473 self.ratchets = [] 474 self.ratchets_path = ratchets_path 475 self._persist_ratchets() 476 477 def enable_ratchets(self, ratchets_path): 478 """ 479 Enables ratchets on the destination. When ratchets are enabled, Reticulum will automatically rotate 480 the keys used to encrypt packets to this destination, and include the latest ratchet key in announces. 481 482 Enabling ratchets on a destination will provide forward secrecy for packets sent to that destination, 483 even when sent outside a ``Link``. The normal Reticulum ``Link`` establishment procedure already performs 484 its own ephemeral key exchange for each link establishment, which means that ratchets are not necessary 485 to provide forward secrecy for links. 486 487 Enabling ratchets will have a small impact on announce size, adding 32 bytes to every sent announce. 488 489 :param ratchets_path: The path to a file to store ratchet data in. 490 :returns: True if the operation succeeded, otherwise False. 491 """ 492 if ratchets_path != None: 493 self.latest_ratchet_time = 0 494 self._reload_ratchets(ratchets_path) 495 496 RNS.log("Ratchets enabled on "+str(self), RNS.LOG_DEBUG) 497 return True 498 499 else: 500 raise ValueError("No ratchet file path specified for "+str(self)) 501 502 def enforce_ratchets(self): 503 """ 504 When ratchet enforcement is enabled, this destination will never accept packets that use its 505 base Identity key for encryption, but only accept packets encrypted with one of the retained 506 ratchet keys. 507 """ 508 if self.ratchets != None: 509 self.__enforce_ratchets = True 510 RNS.log("Ratchets enforced on "+str(self), RNS.LOG_DEBUG) 511 return True 512 else: 513 return False 514 515 def set_retained_ratchets(self, retained_ratchets): 516 """ 517 Sets the number of previously generated ratchet keys this destination will retain, 518 and try to use when decrypting incoming packets. Defaults to ``Destination.RATCHET_COUNT``. 519 520 :param retained_ratchets: The number of generated ratchets to retain. 521 :returns: True if the operation succeeded, False if not. 522 """ 523 if isinstance(retained_ratchets, int) and retained_ratchets > 0: 524 self.retained_ratchets = retained_ratchets 525 self._clean_ratchets() 526 return True 527 else: 528 return False 529 530 def set_ratchet_interval(self, interval): 531 """ 532 Sets the minimum interval in seconds between ratchet key rotation. 533 Defaults to ``Destination.RATCHET_INTERVAL``. 534 535 :param interval: The minimum interval in seconds. 536 :returns: True if the operation succeeded, False if not. 537 """ 538 if isinstance(interval, int) and interval > 0: 539 self.ratchet_interval = interval 540 return True 541 else: 542 return False 543 544 def create_keys(self): 545 """ 546 For a ``RNS.Destination.GROUP`` type destination, creates a new symmetric key. 547 548 :raises: ``TypeError`` if called on an incompatible type of destination. 549 """ 550 if self.type == Destination.PLAIN: 551 raise TypeError("A plain destination does not hold any keys") 552 553 if self.type == Destination.SINGLE: 554 raise TypeError("A single destination holds keys through an Identity instance") 555 556 if self.type == Destination.GROUP: 557 self.prv_bytes = Token.generate_key() 558 self.prv = Token(self.prv_bytes) 559 560 def get_private_key(self): 561 """ 562 For a ``RNS.Destination.GROUP`` type destination, returns the symmetric private key. 563 564 :raises: ``TypeError`` if called on an incompatible type of destination. 565 """ 566 if self.type == Destination.PLAIN: 567 raise TypeError("A plain destination does not hold any keys") 568 elif self.type == Destination.SINGLE: 569 raise TypeError("A single destination holds keys through an Identity instance") 570 else: 571 return self.prv_bytes 572 573 def load_private_key(self, key): 574 """ 575 For a ``RNS.Destination.GROUP`` type destination, loads a symmetric private key. 576 577 :param key: A *bytes-like* containing the symmetric key. 578 :raises: ``TypeError`` if called on an incompatible type of destination. 579 """ 580 if self.type == Destination.PLAIN: 581 raise TypeError("A plain destination does not hold any keys") 582 583 if self.type == Destination.SINGLE: 584 raise TypeError("A single destination holds keys through an Identity instance") 585 586 if self.type == Destination.GROUP: 587 self.prv_bytes = key 588 self.prv = Token(self.prv_bytes) 589 590 def load_public_key(self, key): 591 if self.type != Destination.SINGLE: 592 raise TypeError("Only the \"single\" destination type can hold a public key") 593 else: 594 raise TypeError("A single destination holds keys through an Identity instance") 595 596 def encrypt(self, plaintext): 597 """ 598 Encrypts information for ``RNS.Destination.SINGLE`` or ``RNS.Destination.GROUP`` type destination. 599 600 :param plaintext: A *bytes-like* containing the plaintext to be encrypted. 601 :raises: ``ValueError`` if destination does not hold a necessary key for encryption. 602 """ 603 if self.type == Destination.PLAIN: 604 return plaintext 605 606 if self.type == Destination.SINGLE and self.identity != None: 607 selected_ratchet = RNS.Identity.get_ratchet(self.hash) 608 if selected_ratchet: 609 self.latest_ratchet_id = RNS.Identity._get_ratchet_id(selected_ratchet) 610 return self.identity.encrypt(plaintext, ratchet=selected_ratchet) 611 612 if self.type == Destination.GROUP: 613 if hasattr(self, "prv") and self.prv != None: 614 try: 615 return self.prv.encrypt(plaintext) 616 except Exception as e: 617 RNS.log("The GROUP destination could not encrypt data", RNS.LOG_ERROR) 618 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 619 else: 620 raise ValueError("No private key held by GROUP destination. Did you create or load one?") 621 622 def decrypt(self, ciphertext): 623 """ 624 Decrypts information for ``RNS.Destination.SINGLE`` or ``RNS.Destination.GROUP`` type destination. 625 626 :param ciphertext: *Bytes* containing the ciphertext to be decrypted. 627 :raises: ``ValueError`` if destination does not hold a necessary key for decryption. 628 """ 629 if self.type == Destination.PLAIN: 630 return ciphertext 631 632 if self.type == Destination.SINGLE and self.identity != None: 633 if self.ratchets: 634 decrypted = None 635 try: 636 decrypted = self.identity.decrypt(ciphertext, ratchets=self.ratchets, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self) 637 except: 638 decrypted = None 639 640 if not decrypted: 641 try: 642 RNS.log(f"Decryption with ratchets failed on {self}, reloading ratchets from storage and retrying", RNS.LOG_ERROR) 643 self._reload_ratchets(self.ratchets_path) 644 decrypted = self.identity.decrypt(ciphertext, ratchets=self.ratchets, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self) 645 except Exception as e: 646 RNS.log(f"Decryption still failing after ratchet reload. The contained exception was: {e}", RNS.LOG_ERROR) 647 raise e 648 649 if decrypted: RNS.log("Decryption succeeded after ratchet reload", RNS.LOG_NOTICE) 650 651 return decrypted 652 653 else: 654 return self.identity.decrypt(ciphertext, ratchets=None, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self) 655 656 if self.type == Destination.GROUP: 657 if hasattr(self, "prv") and self.prv != None: 658 try: 659 return self.prv.decrypt(ciphertext) 660 except Exception as e: 661 RNS.log("The GROUP destination could not decrypt data", RNS.LOG_ERROR) 662 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 663 else: 664 raise ValueError("No private key held by GROUP destination. Did you create or load one?") 665 666 def sign(self, message): 667 """ 668 Signs information for ``RNS.Destination.SINGLE`` type destination. 669 670 :param message: *Bytes* containing the message to be signed. 671 :returns: A *bytes-like* containing the message signature, or *None* if the destination could not sign the message. 672 """ 673 if self.type == Destination.SINGLE and self.identity != None: 674 return self.identity.sign(message) 675 else: 676 return None 677 678 def set_default_app_data(self, app_data=None): 679 """ 680 Sets the default app_data for the destination. If set, the default 681 app_data will be included in every announce sent by the destination, 682 unless other app_data is specified in the *announce* method. 683 684 :param app_data: A *bytes-like* containing the default app_data, or a *callable* returning a *bytes-like* containing the app_data. 685 """ 686 self.default_app_data = app_data 687 688 def clear_default_app_data(self): 689 """ 690 Clears default app_data previously set for the destination. 691 """ 692 self.set_default_app_data(app_data=None)