/ RNS / Destination.py
Destination.py
  1  # Reticulum License
  2  #
  3  # Copyright (c) 2016-2025 Mark Qvist
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # - The Software shall not be used in any kind of system which includes amongst
 13  #   its functions the ability to purposefully do harm to human beings.
 14  #
 15  # - The Software shall not be used, directly or indirectly, in the creation of
 16  #   an artificial intelligence, machine learning or language model training
 17  #   dataset, including but not limited to any use that contributes to the
 18  #   training or development of such a model or algorithm.
 19  #
 20  # - The above copyright notice and this permission notice shall be included in
 21  #   all copies or substantial portions of the Software.
 22  #
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 29  # SOFTWARE.
 30  
 31  import os
 32  import math
 33  import time
 34  import threading
 35  import RNS
 36  
 37  from RNS.Cryptography import Token
 38  from .vendor import umsgpack as umsgpack
 39  
 40  class Callbacks:
 41      def __init__(self):
 42          self.link_established = None
 43          self.packet = None
 44          self.proof_requested = None
 45  
 46  class Destination:
 47      """
 48      A class used to describe endpoints in a Reticulum Network. Destination
 49      instances are used both to create outgoing and incoming endpoints. The
 50      destination type will decide if encryption, and what type, is used in
 51      communication with the endpoint. A destination can also announce its
 52      presence on the network, which will distribute necessary keys for
 53      encrypted communication with it.
 54  
 55      :param identity: An instance of :ref:`RNS.Identity<api-identity>`. Can hold only public keys for an outgoing destination, or holding private keys for an ingoing.
 56      :param direction: ``RNS.Destination.IN`` or ``RNS.Destination.OUT``.
 57      :param type: ``RNS.Destination.SINGLE``, ``RNS.Destination.GROUP`` or ``RNS.Destination.PLAIN``.
 58      :param app_name: A string specifying the app name.
 59      :param \\*aspects: Any non-zero number of string arguments.
 60      """
 61  
 62      # Constants
 63      SINGLE     = 0x00
 64      GROUP      = 0x01
 65      PLAIN      = 0x02
 66      LINK       = 0x03
 67      types      = [SINGLE, GROUP, PLAIN, LINK]
 68  
 69      PROVE_NONE = 0x21
 70      PROVE_APP  = 0x22
 71      PROVE_ALL  = 0x23
 72      proof_strategies = [PROVE_NONE, PROVE_APP, PROVE_ALL]
 73  
 74      ALLOW_NONE = 0x00
 75      ALLOW_ALL  = 0x01
 76      ALLOW_LIST = 0x02
 77      request_policies = [ALLOW_NONE, ALLOW_ALL, ALLOW_LIST]
 78  
 79      IN         = 0x11;
 80      OUT        = 0x12;
 81      directions = [IN, OUT]
 82  
 83      PR_TAG_WINDOW = 30
 84  
 85      RATCHET_COUNT    = 512
 86      """
 87      The default number of generated ratchet keys a destination will retain, if it has ratchets enabled.
 88      """
 89  
 90      RATCHET_INTERVAL = 30*60
 91      """
 92      The minimum interval between rotating ratchet keys, in seconds.
 93      """
 94  
 95      @staticmethod
 96      def expand_name(identity, app_name, *aspects):
 97          """
 98          :returns: A string containing the full human-readable name of the destination, for an app_name and a number of aspects.
 99          """
100  
101          # Check input values and build name string
102          if "." in app_name: raise ValueError("Dots can't be used in app names")
103  
104          name = app_name
105          for aspect in aspects:
106              if "." in aspect: raise ValueError("Dots can't be used in aspects")
107              name += "." + aspect
108  
109          if identity != None:
110              name += "." + identity.hexhash
111  
112          return name
113  
114  
115      @staticmethod
116      def hash(identity, app_name, *aspects):
117          """
118          :returns: A destination name in adressable hash form, for an app_name and a number of aspects.
119          """
120          name_hash = RNS.Identity.full_hash(Destination.expand_name(None, app_name, *aspects).encode("utf-8"))[:(RNS.Identity.NAME_HASH_LENGTH//8)]
121          addr_hash_material = name_hash
122          if identity != None:
123              if isinstance(identity, RNS.Identity):
124                  addr_hash_material += identity.hash
125              elif isinstance(identity, bytes) and len(identity) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
126                  addr_hash_material += identity
127              else:
128                  raise TypeError("Invalid material supplied for destination hash calculation")
129  
130          return RNS.Identity.full_hash(addr_hash_material)[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8]
131  
132      @staticmethod
133      def app_and_aspects_from_name(full_name):
134          """
135          :returns: A tuple containing the app name and a list of aspects, for a full-name string.
136          """
137          components = full_name.split(".")
138          return (components[0], components[1:])
139  
140      @staticmethod
141      def hash_from_name_and_identity(full_name, identity):
142          """
143          :returns: A destination name in adressable hash form, for a full name string and Identity instance.
144          """
145          app_name, aspects = Destination.app_and_aspects_from_name(full_name)
146  
147          return Destination.hash(identity, app_name, *aspects)
148  
149      def __init__(self, identity, direction, type, app_name, *aspects):
150          # Check input values and build name string
151          if "." in app_name: raise ValueError("Dots can't be used in app names") 
152          if not type in Destination.types: raise ValueError("Unknown destination type")
153          if not direction in Destination.directions: raise ValueError("Unknown destination direction")
154  
155          self.accept_link_requests = True
156          self.callbacks = Callbacks()
157          self.request_handlers = {}
158          self.type = type
159          self.direction = direction
160          self.proof_strategy = Destination.PROVE_NONE
161          self.ratchets = None
162          self.ratchets_path = None
163          self.ratchet_interval = Destination.RATCHET_INTERVAL
164          self.ratchet_file_lock = threading.Lock()
165          self.retained_ratchets = Destination.RATCHET_COUNT
166          self.latest_ratchet_time = None
167          self.latest_ratchet_id = None
168          self.__enforce_ratchets = False
169          self.mtu = 0
170  
171          self.path_responses = {}
172          self.links = []
173  
174          if identity == None and direction == Destination.IN and self.type != Destination.PLAIN:
175              identity = RNS.Identity()
176              aspects = aspects+(identity.hexhash,)
177  
178          if identity == None and direction == Destination.OUT and self.type != Destination.PLAIN:
179              raise ValueError("Can't create outbound SINGLE destination without an identity")
180  
181          if identity != None and self.type == Destination.PLAIN:
182              raise TypeError("Selected destination type PLAIN cannot hold an identity")
183  
184          self.identity = identity
185          self.name = Destination.expand_name(identity, app_name, *aspects)
186  
187          # Generate the destination address hash
188          self.hash = Destination.hash(self.identity, app_name, *aspects)
189          self.name_hash = RNS.Identity.full_hash(self.expand_name(None, app_name, *aspects).encode("utf-8"))[:(RNS.Identity.NAME_HASH_LENGTH//8)]
190          self.hexhash = self.hash.hex()
191  
192          self.default_app_data = None
193          self.callback = None
194          self.proofcallback = None
195  
196          RNS.Transport.register_destination(self)
197  
198  
199      def __str__(self):
200          """
201          :returns: A human-readable representation of the destination including addressable hash and full name.
202          """
203          return "<"+self.name+":"+self.hexhash+">"
204  
205      def _clean_ratchets(self):
206          if self.ratchets != None:
207              if len (self.ratchets) > self.retained_ratchets:
208                  self.ratchets = self.ratchets[:Destination.RATCHET_COUNT]
209  
210      def _persist_ratchets(self):
211          try:
212              with self.ratchet_file_lock:
213                  temp_write_path = self.ratchets_path+".tmp"
214                  packed_ratchets = umsgpack.packb(self.ratchets)
215                  persisted_data = {"signature": self.sign(packed_ratchets), "ratchets": packed_ratchets}
216                  ratchets_file = open(temp_write_path, "wb")
217                  ratchets_file.write(umsgpack.packb(persisted_data))
218                  ratchets_file.close()
219                  if os.path.isfile(self.ratchets_path): os.unlink(self.ratchets_path)
220                  os.rename(temp_write_path, self.ratchets_path)
221          except Exception as e:
222              RNS.trace_exception(e)
223              self.ratchets = None
224              self.ratchets_path = None
225              raise OSError("Could not write ratchet file contents for "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
226  
227      def rotate_ratchets(self):
228          if self.ratchets != None:
229              now = time.time()
230              if now > self.latest_ratchet_time+self.ratchet_interval:
231                  RNS.log("Rotating ratchets for "+str(self), RNS.LOG_DEBUG)
232                  new_ratchet = RNS.Identity._generate_ratchet()
233                  self.ratchets.insert(0, new_ratchet)
234                  self.latest_ratchet_time = now
235                  self._clean_ratchets()
236                  self._persist_ratchets()
237              return True
238          else:
239              raise SystemError("Cannot rotate ratchet on "+str(self)+", ratchets are not enabled")
240  
241          return False
242  
243      def announce(self, app_data=None, path_response=False, attached_interface=None, tag=None, send=True):
244          """
245          Creates an announce packet for this destination and broadcasts it on all
246          relevant interfaces. Application specific data can be added to the announce.
247  
248          :param app_data: *bytes* containing the app_data.
249          :param path_response: Internal flag used by :ref:`RNS.Transport<api-transport>`. Ignore.
250          """
251          if self.type != Destination.SINGLE:
252              raise TypeError("Only SINGLE destination types can be announced")
253  
254          if self.direction != Destination.IN:
255              raise TypeError("Only IN destination types can be announced")
256          
257          ratchet = b""
258          now = time.time()
259          stale_responses = []
260          for entry_tag in self.path_responses:
261              entry = self.path_responses[entry_tag]
262              if now > entry[0]+Destination.PR_TAG_WINDOW:
263                  stale_responses.append(entry_tag)
264  
265          for entry_tag in stale_responses:
266              self.path_responses.pop(entry_tag)
267  
268          if (path_response == True and tag != None) and tag in self.path_responses:
269              # This code is currently not used, since Transport will block duplicate
270              # path requests based on tags. When multi-path support is implemented in
271              # Transport, this will allow Transport to detect redundant paths to the
272              # same destination, and select the best one based on chosen criteria,
273              # since it will be able to detect that a single emitted announce was
274              # received via multiple paths. The difference in reception time will
275              # potentially also be useful in determining characteristics of the
276              # multiple available paths, and to choose the best one.
277              RNS.log("Using cached announce data for answering path request with tag "+RNS.prettyhexrep(tag), RNS.LOG_EXTREME)
278              announce_data = self.path_responses[tag][1]
279          
280          else:
281              destination_hash = self.hash
282              random_hash = RNS.Identity.get_random_hash()[0:5]+int(time.time()).to_bytes(5, "big")
283  
284              if self.ratchets != None:
285                  self.rotate_ratchets()
286                  ratchet = RNS.Identity._ratchet_public_bytes(self.ratchets[0])
287                  RNS.Identity._remember_ratchet(self.hash, ratchet)
288  
289              if app_data == None and self.default_app_data != None:
290                  if isinstance(self.default_app_data, bytes):
291                      app_data = self.default_app_data
292                  elif callable(self.default_app_data):
293                      returned_app_data = self.default_app_data()
294                      if isinstance(returned_app_data, bytes):
295                          app_data = returned_app_data
296              
297              signed_data = self.hash+self.identity.get_public_key()+self.name_hash+random_hash+ratchet
298              if app_data != None:
299                  signed_data += app_data
300  
301              signature = self.identity.sign(signed_data)
302              announce_data = self.identity.get_public_key()+self.name_hash+random_hash+ratchet+signature
303  
304              if app_data != None:
305                  announce_data += app_data
306  
307              self.path_responses[tag] = [time.time(), announce_data]
308  
309          if path_response:
310              announce_context = RNS.Packet.PATH_RESPONSE
311          else:
312              announce_context = RNS.Packet.NONE
313  
314          if ratchet:
315              context_flag = RNS.Packet.FLAG_SET
316          else:
317              context_flag = RNS.Packet.FLAG_UNSET
318  
319          announce_packet = RNS.Packet(self, announce_data, RNS.Packet.ANNOUNCE, context = announce_context,
320                                       attached_interface = attached_interface, context_flag=context_flag)
321          if send:
322              announce_packet.send()
323          else:
324              return announce_packet
325  
326      def accepts_links(self, accepts = None):
327          """
328          Set or query whether the destination accepts incoming link requests.
329  
330          :param accepts: If ``True`` or ``False``, this method sets whether the destination accepts incoming link requests. If not provided or ``None``, the method returns whether the destination currently accepts link requests.
331          :returns: ``True`` or ``False`` depending on whether the destination accepts incoming link requests, if the *accepts* parameter is not provided or ``None``.
332          """
333          if accepts == None:
334              return self.accept_link_requests
335  
336          if accepts:
337              self.accept_link_requests = True
338          else:
339              self.accept_link_requests = False
340  
341      def set_link_established_callback(self, callback):
342          """
343          Registers a function to be called when a link has been established to
344          this destination.
345  
346          :param callback: A function or method with the signature *callback(link)* to be called when a new link is established with this destination.
347          """
348          self.callbacks.link_established = callback
349  
350      def set_packet_callback(self, callback):
351          """
352          Registers a function to be called when a packet has been received by
353          this destination.
354  
355          :param callback: A function or method with the signature *callback(data, packet)* to be called when this destination receives a packet.
356          """
357          self.callbacks.packet = callback
358  
359      def set_proof_requested_callback(self, callback):
360          """
361          Registers a function to be called when a proof has been requested for
362          a packet sent to this destination. Allows control over when and if
363          proofs should be returned for received packets.
364  
365          :param callback: A function or method to with the signature *callback(packet)* be called when a packet that requests a proof is received. The callback must return one of True or False. If the callback returns True, a proof will be sent. If it returns False, a proof will not be sent.
366          """
367          self.callbacks.proof_requested = callback
368  
369      def set_proof_strategy(self, proof_strategy):
370          """
371          Sets the destinations proof strategy.
372  
373          :param proof_strategy: One of ``RNS.Destination.PROVE_NONE``, ``RNS.Destination.PROVE_ALL`` or ``RNS.Destination.PROVE_APP``. If ``RNS.Destination.PROVE_APP`` is set, the `proof_requested_callback` will be called to determine whether a proof should be sent or not.
374          """
375          if not proof_strategy in Destination.proof_strategies:
376              raise TypeError("Unsupported proof strategy")
377          else:
378              self.proof_strategy = proof_strategy
379  
380      def register_request_handler(self, path, response_generator = None, allow = ALLOW_NONE, allowed_list = None, auto_compress = True):
381          """
382          Registers a request handler.
383  
384          :param path: The path for the request handler to be registered.
385          :param response_generator: A function or method with the signature *response_generator(path, data, request_id, link_id, remote_identity, requested_at)* to be called. Whatever this funcion returns will be sent as a response to the requester. If the function returns ``None``, no response will be sent.
386          :param allow: One of ``RNS.Destination.ALLOW_NONE``, ``RNS.Destination.ALLOW_ALL`` or ``RNS.Destination.ALLOW_LIST``. If ``RNS.Destination.ALLOW_LIST`` is set, the request handler will only respond to requests for identified peers in the supplied list.
387          :param allowed_list: A list of *bytes-like* :ref:`RNS.Identity<api-identity>` hashes.
388          :param auto_compress: If ``True`` or ``False``, determines whether automatic compression of responses should be carried out. If set to an integer value, responses will only be auto-compressed if under this size in bytes. If omitted, the default compression settings will be followed.
389          :raises: ``ValueError`` if any of the supplied arguments are invalid.
390          """
391          if path == None or path == "": raise ValueError("Invalid path specified")
392          elif not callable(response_generator): raise ValueError("Invalid response generator specified")
393          elif not allow in Destination.request_policies: raise ValueError("Invalid request policy")
394          else:
395              path_hash = RNS.Identity.truncated_hash(path.encode("utf-8"))
396              request_handler = [path, response_generator, allow, allowed_list, auto_compress]
397              self.request_handlers[path_hash] = request_handler
398  
399      def deregister_request_handler(self, path):
400          """
401          Deregisters a request handler.
402  
403          :param path: The path for the request handler to be deregistered.
404          :returns: True if the handler was deregistered, otherwise False.
405          """
406          path_hash = RNS.Identity.truncated_hash(path.encode("utf-8"))
407          if path_hash in self.request_handlers:
408              self.request_handlers.pop(path_hash)
409              return True
410          else:
411              return False
412  
413      def receive(self, packet):
414          if packet.packet_type == RNS.Packet.LINKREQUEST:
415              plaintext = packet.data
416              self.incoming_link_request(plaintext, packet)
417          else:
418              plaintext = self.decrypt(packet.data)
419              packet.ratchet_id = self.latest_ratchet_id
420              if plaintext == None: return False
421              else:
422                  if packet.packet_type == RNS.Packet.DATA:
423                      if self.callbacks.packet != None:
424                          try:
425                              self.callbacks.packet(plaintext, packet)
426                          except Exception as e:
427                              RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
428  
429                  return True
430  
431      def incoming_link_request(self, data, packet):
432          if self.accept_link_requests:
433              link = RNS.Link.validate_request(self, data, packet)
434              if link != None:
435                  self.links.append(link)
436  
437      def _reload_ratchets(self, ratchets_path):
438          if os.path.isfile(ratchets_path):
439              with self.ratchet_file_lock:
440                  def load_attempt():
441                      ratchets_file = open(ratchets_path, "rb")
442                      persisted_data = umsgpack.unpackb(ratchets_file.read())
443                      if "signature" in persisted_data and "ratchets" in persisted_data:
444                          if self.identity.validate(persisted_data["signature"], persisted_data["ratchets"]):
445                              self.ratchets = umsgpack.unpackb(persisted_data["ratchets"])
446                              self.ratchets_path = ratchets_path
447                          else:
448                              raise KeyError("Invalid ratchet file signature")
449                  
450                  try:
451                      try:
452                          load_attempt()
453  
454                      except Exception as e:
455                          RNS.trace_exception(e)
456                          RNS.log(f"First ratchet reload attempt for {self} failed. Possible I/O conflict. Retrying in 500ms.", RNS.LOG_ERROR)
457                          time.sleep(0.5)
458                          load_attempt()
459                          RNS.log(f"Ratchet reload retry succeeded", RNS.LOG_DEBUG)
460  
461                  except Exception as e:
462                      self.ratchets = None
463                      self.ratchets_path = None
464                      RNS.trace_exception(e)
465                      RNS.log(f"The ratchet file located at {ratchets_path} could not be loaded. This could indicate that the ratchet file has become corrupt.", RNS.LOG_CRITICAL)
466                      RNS.log(f"You can attempt to manually recover the ratchet file, or simply remove it to have Reticulum recreate it on the next use.", RNS.LOG_CRITICAL)
467                      RNS.log(f"If re-initialize this ratchet file, make sure to send an announce for the relevant destination as soon as possible,", RNS.LOG_CRITICAL)
468                      RNS.log(f"so that the new ratchet information is synchronized to the network.", RNS.LOG_CRITICAL)
469                      raise OSError("Could not read ratchet file contents for "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
470  
471          else:
472              RNS.log("No existing ratchet data found, initialising new ratchet file for "+str(self), RNS.LOG_DEBUG)
473              self.ratchets = []
474              self.ratchets_path = ratchets_path
475              self._persist_ratchets()
476  
477      def enable_ratchets(self, ratchets_path):
478          """
479          Enables ratchets on the destination. When ratchets are enabled, Reticulum will automatically rotate
480          the keys used to encrypt packets to this destination, and include the latest ratchet key in announces.
481  
482          Enabling ratchets on a destination will provide forward secrecy for packets sent to that destination,
483          even when sent outside a ``Link``. The normal Reticulum ``Link`` establishment procedure already performs
484          its own ephemeral key exchange for each link establishment, which means that ratchets are not necessary
485          to provide forward secrecy for links.
486  
487          Enabling ratchets will have a small impact on announce size, adding 32 bytes to every sent announce.
488  
489          :param ratchets_path: The path to a file to store ratchet data in.
490          :returns: True if the operation succeeded, otherwise False.
491          """
492          if ratchets_path != None:
493              self.latest_ratchet_time = 0
494              self._reload_ratchets(ratchets_path)
495  
496              RNS.log("Ratchets enabled on "+str(self), RNS.LOG_DEBUG)
497              return True
498  
499          else:
500              raise ValueError("No ratchet file path specified for "+str(self))
501  
502      def enforce_ratchets(self):
503          """
504          When ratchet enforcement is enabled, this destination will never accept packets that use its
505          base Identity key for encryption, but only accept packets encrypted with one of the retained
506          ratchet keys.
507          """
508          if self.ratchets != None:
509              self.__enforce_ratchets = True
510              RNS.log("Ratchets enforced on "+str(self), RNS.LOG_DEBUG)
511              return True
512          else:
513              return False
514  
515      def set_retained_ratchets(self, retained_ratchets):
516          """
517          Sets the number of previously generated ratchet keys this destination will retain,
518          and try to use when decrypting incoming packets. Defaults to ``Destination.RATCHET_COUNT``.
519  
520          :param retained_ratchets: The number of generated ratchets to retain.
521          :returns: True if the operation succeeded, False if not.
522          """
523          if isinstance(retained_ratchets, int) and retained_ratchets > 0:
524              self.retained_ratchets = retained_ratchets
525              self._clean_ratchets()
526              return True
527          else:
528              return False
529  
530      def set_ratchet_interval(self, interval):
531          """
532          Sets the minimum interval in seconds between ratchet key rotation.
533          Defaults to ``Destination.RATCHET_INTERVAL``.
534  
535          :param interval: The minimum interval in seconds.
536          :returns: True if the operation succeeded, False if not.
537          """
538          if isinstance(interval, int) and interval > 0:
539              self.ratchet_interval = interval
540              return True
541          else:
542              return False
543  
544      def create_keys(self):
545          """
546          For a ``RNS.Destination.GROUP`` type destination, creates a new symmetric key.
547  
548          :raises: ``TypeError`` if called on an incompatible type of destination.
549          """
550          if self.type == Destination.PLAIN:
551              raise TypeError("A plain destination does not hold any keys")
552  
553          if self.type == Destination.SINGLE:
554              raise TypeError("A single destination holds keys through an Identity instance")
555  
556          if self.type == Destination.GROUP:
557              self.prv_bytes = Token.generate_key()
558              self.prv = Token(self.prv_bytes)
559  
560      def get_private_key(self):
561          """
562          For a ``RNS.Destination.GROUP`` type destination, returns the symmetric private key.
563  
564          :raises: ``TypeError`` if called on an incompatible type of destination.
565          """
566          if self.type == Destination.PLAIN:
567              raise TypeError("A plain destination does not hold any keys")
568          elif self.type == Destination.SINGLE:
569              raise TypeError("A single destination holds keys through an Identity instance")
570          else:
571              return self.prv_bytes
572  
573      def load_private_key(self, key):
574          """
575          For a ``RNS.Destination.GROUP`` type destination, loads a symmetric private key.
576  
577          :param key: A *bytes-like* containing the symmetric key.
578          :raises: ``TypeError`` if called on an incompatible type of destination.
579          """
580          if self.type == Destination.PLAIN:
581              raise TypeError("A plain destination does not hold any keys")
582  
583          if self.type == Destination.SINGLE:
584              raise TypeError("A single destination holds keys through an Identity instance")
585  
586          if self.type == Destination.GROUP:
587              self.prv_bytes = key
588              self.prv = Token(self.prv_bytes)
589  
590      def load_public_key(self, key):
591          if self.type != Destination.SINGLE:
592              raise TypeError("Only the \"single\" destination type can hold a public key")
593          else:
594              raise TypeError("A single destination holds keys through an Identity instance")
595  
596      def encrypt(self, plaintext):
597          """
598          Encrypts information for ``RNS.Destination.SINGLE`` or ``RNS.Destination.GROUP`` type destination.
599  
600          :param plaintext: A *bytes-like* containing the plaintext to be encrypted.
601          :raises: ``ValueError`` if destination does not hold a necessary key for encryption.
602          """
603          if self.type == Destination.PLAIN:
604              return plaintext
605  
606          if self.type == Destination.SINGLE and self.identity != None:
607              selected_ratchet = RNS.Identity.get_ratchet(self.hash)
608              if selected_ratchet:
609                  self.latest_ratchet_id = RNS.Identity._get_ratchet_id(selected_ratchet)
610              return self.identity.encrypt(plaintext, ratchet=selected_ratchet)
611  
612          if self.type == Destination.GROUP:
613              if hasattr(self, "prv") and self.prv != None:
614                  try:
615                      return self.prv.encrypt(plaintext)
616                  except Exception as e:
617                      RNS.log("The GROUP destination could not encrypt data", RNS.LOG_ERROR)
618                      RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
619              else:
620                  raise ValueError("No private key held by GROUP destination. Did you create or load one?")
621  
622      def decrypt(self, ciphertext):
623          """
624          Decrypts information for ``RNS.Destination.SINGLE`` or ``RNS.Destination.GROUP`` type destination.
625  
626          :param ciphertext: *Bytes* containing the ciphertext to be decrypted.
627          :raises: ``ValueError`` if destination does not hold a necessary key for decryption.
628          """
629          if self.type == Destination.PLAIN:
630              return ciphertext
631  
632          if self.type == Destination.SINGLE and self.identity != None:
633              if self.ratchets:
634                  decrypted = None
635                  try:
636                      decrypted = self.identity.decrypt(ciphertext, ratchets=self.ratchets, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self)
637                  except:
638                      decrypted = None
639  
640                  if not decrypted:
641                      try:
642                          RNS.log(f"Decryption with ratchets failed on {self}, reloading ratchets from storage and retrying", RNS.LOG_ERROR)
643                          self._reload_ratchets(self.ratchets_path)
644                          decrypted = self.identity.decrypt(ciphertext, ratchets=self.ratchets, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self)
645                      except Exception as e:
646                          RNS.log(f"Decryption still failing after ratchet reload. The contained exception was: {e}", RNS.LOG_ERROR)
647                          raise e
648  
649                      if decrypted: RNS.log("Decryption succeeded after ratchet reload", RNS.LOG_NOTICE)
650  
651                  return decrypted
652  
653              else:
654                  return self.identity.decrypt(ciphertext, ratchets=None, enforce_ratchets=self.__enforce_ratchets, ratchet_id_receiver=self)
655  
656          if self.type == Destination.GROUP:
657              if hasattr(self, "prv") and self.prv != None:
658                  try:
659                      return self.prv.decrypt(ciphertext)
660                  except Exception as e:
661                      RNS.log("The GROUP destination could not decrypt data", RNS.LOG_ERROR)
662                      RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
663              else:
664                  raise ValueError("No private key held by GROUP destination. Did you create or load one?")
665  
666      def sign(self, message):
667          """
668          Signs information for ``RNS.Destination.SINGLE`` type destination.
669  
670          :param message: *Bytes* containing the message to be signed.
671          :returns: A *bytes-like* containing the message signature, or *None* if the destination could not sign the message.
672          """
673          if self.type == Destination.SINGLE and self.identity != None:
674              return self.identity.sign(message)
675          else:
676              return None
677  
678      def set_default_app_data(self, app_data=None):
679          """
680          Sets the default app_data for the destination. If set, the default
681          app_data will be included in every announce sent by the destination,
682          unless other app_data is specified in the *announce* method.
683  
684          :param app_data: A *bytes-like* containing the default app_data, or a *callable* returning a *bytes-like* containing the app_data.
685          """
686          self.default_app_data = app_data
687  
688      def clear_default_app_data(self):
689          """
690          Clears default app_data previously set for the destination.
691          """
692          self.set_default_app_data(app_data=None)