/ RNS / Identity.py
Identity.py
  1  # Reticulum License
  2  #
  3  # Copyright (c) 2016-2025 Mark Qvist
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # - The Software shall not be used in any kind of system which includes amongst
 13  #   its functions the ability to purposefully do harm to human beings.
 14  #
 15  # - The Software shall not be used, directly or indirectly, in the creation of
 16  #   an artificial intelligence, machine learning or language model training
 17  #   dataset, including but not limited to any use that contributes to the
 18  #   training or development of such a model or algorithm.
 19  #
 20  # - The above copyright notice and this permission notice shall be included in
 21  #   all copies or substantial portions of the Software.
 22  #
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 29  # SOFTWARE.
 30  
 31  import math
 32  import os
 33  import RNS
 34  import time
 35  import atexit
 36  import hashlib
 37  import threading
 38  
 39  from .vendor import umsgpack as umsgpack
 40  
 41  from RNS.Cryptography import X25519PrivateKey, X25519PublicKey, Ed25519PrivateKey, Ed25519PublicKey
 42  from RNS.Cryptography import Token
 43  
 44  
 45  class Identity:
 46      """
 47      This class is used to manage identities in Reticulum. It provides methods
 48      for encryption, decryption, signatures and verification, and is the basis
 49      for all encrypted communication over Reticulum networks.
 50  
 51      :param create_keys: Specifies whether new encryption and signing keys should be generated.
 52      """
 53  
 54      CURVE = "Curve25519"
 55      """
 56      The curve used for Elliptic Curve DH key exchanges
 57      """
 58  
 59      KEYSIZE     = 256*2
 60      """
 61      X.25519 key size in bits. A complete key is the concatenation of a 256 bit encryption key, and a 256 bit signing key.
 62      """
 63  
 64      RATCHETSIZE = 256
 65      """
 66      X.25519 ratchet key size in bits.
 67      """
 68  
 69      RATCHET_EXPIRY = 60*60*24*30
 70      """
 71      The expiry time for received ratchets in seconds, defaults to 30 days. Reticulum will always use the most recently
 72      announced ratchet, and remember it for up to ``RATCHET_EXPIRY`` since receiving it, after which it will be discarded.
 73      If a newer ratchet is announced in the meantime, it will be replace the already known ratchet.
 74      """
 75  
 76      # Non-configurable constants
 77      TOKEN_OVERHEAD            = RNS.Cryptography.Token.TOKEN_OVERHEAD
 78      AES128_BLOCKSIZE          = 16          # In bytes
 79      HASHLENGTH                = 256         # In bits
 80      SIGLENGTH                 = KEYSIZE     # In bits
 81  
 82      NAME_HASH_LENGTH          = 80
 83      TRUNCATED_HASHLENGTH      = RNS.Reticulum.TRUNCATED_HASHLENGTH
 84      """
 85      Constant specifying the truncated hash length (in bits) used by Reticulum
 86      for addressable hashes and other purposes. Non-configurable.
 87      """
 88  
 89      DERIVED_KEY_LENGTH        = 512//8
 90      DERIVED_KEY_LENGTH_LEGACY = 256//8
 91  
 92      # Storage
 93      known_destinations = {}
 94      known_ratchets = {}
 95  
 96      ratchet_persist_lock = threading.Lock()
 97  
 98      @staticmethod
 99      def remember(packet_hash, destination_hash, public_key, app_data = None):
100          if len(public_key) != Identity.KEYSIZE//8:
101              raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR)
102          else:
103              Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data]
104  
105  
106      @staticmethod
107      def recall(target_hash, from_identity_hash=False):
108          """
109          Recall identity for a destination or identity hash. By default, this function
110          will return the identity associated with a given *destination* hash. As an
111          example, if you know the ``lxmf.delivery`` destination hash of an endpoint,
112          this function will return the associated underlying identity. You can also
113          search for an identity from a known *identity hash*, by setting the
114          ``from_identity_hash`` argument.
115  
116          :param target_hash: Destination or identity hash as *bytes*.
117          :param from_identity_hash: Whether to search based on identity hash instead of destination hash as *bool*.
118          :returns: An :ref:`RNS.Identity<api-identity>` instance that can be used to create an outgoing :ref:`RNS.Destination<api-destination>`, or *None* if the destination is unknown.
119          """
120          if from_identity_hash:
121              for destination_hash in Identity.known_destinations:
122                  if target_hash == Identity.truncated_hash(Identity.known_destinations[destination_hash][2]):
123                      identity_data = Identity.known_destinations[destination_hash]
124                      identity = Identity(create_keys=False)
125                      identity.load_public_key(identity_data[2])
126                      identity.app_data = identity_data[3]
127                      return identity
128  
129              return None
130  
131          else:
132              if target_hash in Identity.known_destinations:
133                  identity_data = Identity.known_destinations[target_hash]
134                  identity = Identity(create_keys=False)
135                  identity.load_public_key(identity_data[2])
136                  identity.app_data = identity_data[3]
137                  return identity
138              else:
139                  for registered_destination in RNS.Transport.destinations:
140                      if target_hash == registered_destination.hash:
141                          identity = Identity(create_keys=False)
142                          identity.load_public_key(registered_destination.identity.get_public_key())
143                          identity.app_data = None
144                          return identity
145  
146                  return None
147  
148      @staticmethod
149      def recall_app_data(destination_hash):
150          """
151          Recall last heard app_data for a destination hash.
152  
153          :param destination_hash: Destination hash as *bytes*.
154          :returns: *Bytes* containing app_data, or *None* if the destination is unknown.
155          """
156          if destination_hash in Identity.known_destinations:
157              app_data = Identity.known_destinations[destination_hash][3]
158              return app_data
159          else:
160              return None
161  
162      @staticmethod
163      def save_known_destinations():
164          # TODO: Improve the storage method so we don't have to
165          # deserialize and serialize the entire table on every
166          # save, but the only changes. It might be possible to
167          # simply overwrite on exit now that every local client
168          # disconnect triggers a data persist.
169          
170          try:
171              if hasattr(Identity, "saving_known_destinations"):
172                  wait_interval = 0.2
173                  wait_timeout = 5
174                  wait_start = time.time()
175                  while Identity.saving_known_destinations:
176                      time.sleep(wait_interval)
177                      if time.time() > wait_start+wait_timeout:
178                          RNS.log("Could not save known destinations to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR)
179                          return False
180  
181              Identity.saving_known_destinations = True
182              save_start = time.time()
183  
184              storage_known_destinations = {}
185              if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
186                  try:
187                      with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file:
188                          storage_known_destinations = umsgpack.load(file)
189   
190                  except:
191                      pass
192  
193              try:
194                  for destination_hash in storage_known_destinations:
195                      if not destination_hash in Identity.known_destinations:
196                          Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash]
197              except Exception as e:
198                  RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING)
199  
200              RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_DEBUG)
201              with open(RNS.Reticulum.storagepath+"/known_destinations","wb") as file:
202                  umsgpack.dump(Identity.known_destinations, file)
203              
204  
205              save_time = time.time() - save_start
206              if save_time < 1:
207                  time_str = str(round(save_time*1000,2))+"ms"
208              else:
209                  time_str = str(round(save_time,2))+"s"
210  
211              RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_DEBUG)
212  
213          except Exception as e:
214              RNS.log("Error while saving known destinations to disk, the contained exception was: "+str(e), RNS.LOG_ERROR)
215              RNS.trace_exception(e)
216  
217          Identity.saving_known_destinations = False
218  
219      @staticmethod
220      def load_known_destinations():
221          if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
222              try:
223                  with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file:
224                      loaded_known_destinations = umsgpack.load(file)
225  
226                  Identity.known_destinations = {}
227                  for known_destination in loaded_known_destinations:
228                      if len(known_destination) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
229                          Identity.known_destinations[known_destination] = loaded_known_destinations[known_destination]
230  
231                  RNS.log("Loaded "+str(len(Identity.known_destinations))+" known destination from storage", RNS.LOG_VERBOSE)
232  
233              except Exception as e:
234                  RNS.log("Error loading known destinations from disk, file will be recreated on exit", RNS.LOG_ERROR)
235          else:
236              RNS.log("Destinations file does not exist, no known destinations loaded", RNS.LOG_VERBOSE)
237  
238      @staticmethod
239      def full_hash(data):
240          """
241          Get a SHA-256 hash of passed data.
242  
243          :param data: Data to be hashed as *bytes*.
244          :returns: SHA-256 hash as *bytes*.
245          """
246          return RNS.Cryptography.sha256(data)
247  
248      @staticmethod
249      def truncated_hash(data):
250          """
251          Get a truncated SHA-256 hash of passed data.
252  
253          :param data: Data to be hashed as *bytes*.
254          :returns: Truncated SHA-256 hash as *bytes*.
255          """
256          return Identity.full_hash(data)[:(Identity.TRUNCATED_HASHLENGTH//8)]
257  
258      @staticmethod
259      def get_random_hash():
260          """
261          Get a random SHA-256 hash.
262  
263          :param data: Data to be hashed as *bytes*.
264          :returns: Truncated SHA-256 hash of random data as *bytes*.
265          """
266          return Identity.truncated_hash(os.urandom(Identity.TRUNCATED_HASHLENGTH//8))
267  
268      @staticmethod
269      def current_ratchet_id(destination_hash):
270          """
271          Get the ID of the currently used ratchet key for a given destination hash
272  
273          :param destination_hash: A destination hash as *bytes*.
274          :returns: A ratchet ID as *bytes* or *None*.
275          """
276          ratchet = Identity.get_ratchet(destination_hash)
277          if ratchet == None:
278              return None
279          else:
280              return Identity._get_ratchet_id(ratchet)
281  
282      @staticmethod
283      def _get_ratchet_id(ratchet_pub_bytes):
284          return Identity.full_hash(ratchet_pub_bytes)[:Identity.NAME_HASH_LENGTH//8]
285  
286      @staticmethod
287      def _ratchet_public_bytes(ratchet):
288          return X25519PrivateKey.from_private_bytes(ratchet).public_key().public_bytes()
289  
290      @staticmethod
291      def _generate_ratchet():
292          ratchet_prv = X25519PrivateKey.generate()
293          ratchet_pub = ratchet_prv.public_key()
294          return ratchet_prv.private_bytes()
295  
296      @staticmethod
297      def _remember_ratchet(destination_hash, ratchet):
298          try:
299              if destination_hash in Identity.known_ratchets and Identity.known_ratchets[destination_hash] == ratchet:
300                  ratchet_exists = True
301              else:
302                  ratchet_exists = False
303  
304              if not ratchet_exists:
305                  RNS.log(f"Remembering ratchet {RNS.prettyhexrep(Identity._get_ratchet_id(ratchet))} for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_EXTREME)
306                  Identity.known_ratchets[destination_hash] = ratchet
307                  if not RNS.Transport.owner.is_connected_to_shared_instance:
308                      def persist_job():
309                          with Identity.ratchet_persist_lock:
310                              hexhash = RNS.hexrep(destination_hash, delimit=False)
311                              ratchet_data = {"ratchet": ratchet, "received": time.time()}
312  
313                              ratchetdir = RNS.Reticulum.storagepath+"/ratchets"
314                              
315                              if not os.path.isdir(ratchetdir):
316                                  os.makedirs(ratchetdir)
317  
318                              outpath   = f"{ratchetdir}/{hexhash}.out"
319                              finalpath = f"{ratchetdir}/{hexhash}"
320                              with open(outpath, "wb") as ratchet_file:
321                                  ratchet_file.write(umsgpack.packb(ratchet_data))
322                              os.replace(outpath, finalpath)
323  
324                      
325                      threading.Thread(target=persist_job, daemon=True).start()
326  
327          except Exception as e:
328              RNS.log(f"Could not persist ratchet for {RNS.prettyhexrep(destination_hash)} to storage.", RNS.LOG_ERROR)
329              RNS.log(f"The contained exception was: {e}")
330              RNS.trace_exception(e)
331  
332      @staticmethod
333      def _clean_ratchets():
334          RNS.log("Cleaning ratchets...", RNS.LOG_DEBUG)
335          try:
336              now = time.time()
337              ratchetdir = RNS.Reticulum.storagepath+"/ratchets"
338              if os.path.isdir(ratchetdir):
339                  for filename in os.listdir(ratchetdir):
340                      try:
341                          expired = False
342                          corrupted = False
343                          with open(f"{ratchetdir}/{filename}", "rb") as rf:
344                              # TODO: Remove individual ratchet file if corrupt
345                              try:
346                                  ratchet_data = umsgpack.unpackb(rf.read())
347                                  if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY:
348                                      expired = True
349  
350                              except Exception as e:
351                                  RNS.log(f"Corrupted ratchet data while reading {ratchetdir}/{filename}, removing file", RNS.LOG_ERROR)
352                                  corrupted = True
353  
354                          if expired or corrupted:
355                              os.unlink(f"{ratchetdir}/{filename}")
356  
357                      except Exception as e:
358                          RNS.log(f"An error occurred while cleaning ratchets, in the processing of {ratchetdir}/{filename}.", RNS.LOG_ERROR)
359                          RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
360  
361          except Exception as e:
362              RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR)
363  
364      @staticmethod
365      def get_ratchet(destination_hash):
366          if not destination_hash in Identity.known_ratchets:
367              ratchetdir = RNS.Reticulum.storagepath+"/ratchets"
368              hexhash = RNS.hexrep(destination_hash, delimit=False)
369              ratchet_path = f"{ratchetdir}/{hexhash}"
370              if os.path.isfile(ratchet_path):
371                  try:
372                      with open(ratchet_path, "rb") as ratchet_file:
373                          ratchet_data = umsgpack.unpackb(ratchet_file.read())
374                          if time.time() < ratchet_data["received"]+Identity.RATCHET_EXPIRY and len(ratchet_data["ratchet"]) == Identity.RATCHETSIZE//8:
375                              Identity.known_ratchets[destination_hash] = ratchet_data["ratchet"]
376                          else:
377                              return None
378                  
379                  except Exception as e:
380                      RNS.log(f"An error occurred while loading ratchet data for {RNS.prettyhexrep(destination_hash)} from storage.", RNS.LOG_ERROR)
381                      RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
382                      return None
383  
384          if destination_hash in Identity.known_ratchets:
385              return Identity.known_ratchets[destination_hash]
386          else:
387              RNS.log(f"Could not load ratchet for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
388              return None
389  
390      @staticmethod
391      def validate_announce(packet, only_validate_signature=False):
392          try:
393              if packet.packet_type == RNS.Packet.ANNOUNCE:
394                  keysize       = Identity.KEYSIZE//8
395                  ratchetsize   = Identity.RATCHETSIZE//8
396                  name_hash_len = Identity.NAME_HASH_LENGTH//8
397                  sig_len       = Identity.SIGLENGTH//8
398                  destination_hash = packet.destination_hash
399  
400                  # Get public key bytes from announce
401                  public_key = packet.data[:keysize]
402  
403                  # If the packet context flag is set,
404                  # this announce contains a new ratchet
405                  if packet.context_flag == RNS.Packet.FLAG_SET:
406                      name_hash   = packet.data[keysize:keysize+name_hash_len ]
407                      random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10]
408                      ratchet     = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+ratchetsize]
409                      signature   = packet.data[keysize+name_hash_len+10+ratchetsize:keysize+name_hash_len+10+ratchetsize+sig_len]
410                      app_data    = b""
411                      if len(packet.data) > keysize+name_hash_len+10+sig_len+ratchetsize:
412                          app_data = packet.data[keysize+name_hash_len+10+sig_len+ratchetsize:]
413  
414                  # If the packet context flag is not set,
415                  # this announce does not contain a ratchet
416                  else:
417                      ratchet     = b""
418                      name_hash   = packet.data[keysize:keysize+name_hash_len]
419                      random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10]
420                      signature   = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+sig_len]
421                      app_data    = b""
422                      if len(packet.data) > keysize+name_hash_len+10+sig_len:
423                          app_data = packet.data[keysize+name_hash_len+10+sig_len:]
424  
425                  signed_data = destination_hash+public_key+name_hash+random_hash+ratchet+app_data
426  
427                  if not len(packet.data) > Identity.KEYSIZE//8+Identity.NAME_HASH_LENGTH//8+10+Identity.SIGLENGTH//8:
428                      app_data = None
429  
430                  announced_identity = Identity(create_keys=False)
431                  announced_identity.load_public_key(public_key)
432  
433                  if announced_identity.pub != None and announced_identity.validate(signature, signed_data):
434                      if only_validate_signature:
435                          del announced_identity
436                          return True
437  
438                      hash_material = name_hash+announced_identity.hash
439                      expected_hash = RNS.Identity.full_hash(hash_material)[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8]
440  
441                      if destination_hash == expected_hash:
442                          # Check if we already have a public key for this destination
443                          # and make sure the public key is not different.
444                          if destination_hash in Identity.known_destinations:
445                              if public_key != Identity.known_destinations[destination_hash][2]:
446                                  # In reality, this should never occur, but in the odd case
447                                  # that someone manages a hash collision, we reject the announce.
448                                  RNS.log("Received announce with valid signature and destination hash, but announced public key does not match already known public key.", RNS.LOG_CRITICAL)
449                                  RNS.log("This may indicate an attempt to modify network paths, or a random hash collision. The announce was rejected.", RNS.LOG_CRITICAL)
450                                  return False
451  
452                          RNS.Identity.remember(packet.get_hash(), destination_hash, public_key, app_data)
453                          del announced_identity
454  
455                          if packet.rssi != None or packet.snr != None:
456                              signal_str = " ["
457                              if packet.rssi != None:
458                                  signal_str += "RSSI "+str(packet.rssi)+"dBm"
459                                  if packet.snr != None:
460                                      signal_str += ", "
461                              if packet.snr != None:
462                                  signal_str += "SNR "+str(packet.snr)+"dB"
463                              signal_str += "]"
464                          else:
465                              signal_str = ""
466  
467                          if hasattr(packet, "transport_id") and packet.transport_id != None:
468                              RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received via "+RNS.prettyhexrep(packet.transport_id)+" on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME)
469                          else:
470                              RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME)
471  
472                          if ratchet:
473                              Identity._remember_ratchet(destination_hash, ratchet)
474  
475                          return True
476  
477                      else:
478                          RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Destination mismatch.", RNS.LOG_DEBUG)
479                          return False
480  
481                  else:
482                      RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Invalid signature.", RNS.LOG_DEBUG)
483                      del announced_identity
484                      return False
485          
486          except Exception as e:
487              RNS.log("Error occurred while validating announce. The contained exception was: "+str(e), RNS.LOG_ERROR)
488              return False
489  
490      @staticmethod
491      def persist_data():
492          if not RNS.Transport.owner.is_connected_to_shared_instance:
493              Identity.save_known_destinations()
494  
495      @staticmethod
496      def exit_handler():
497          Identity.persist_data()
498  
499  
500      @staticmethod
501      def from_bytes(prv_bytes):
502          """
503          Create a new :ref:`RNS.Identity<api-identity>` instance from *bytes* of private key.
504          Can be used to load previously created and saved identities into Reticulum.
505  
506          :param prv_bytes: The *bytes* of private a saved private key. **HAZARD!** Never use this to generate a new key by feeding random data in prv_bytes.
507          :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the *bytes* data was invalid.
508          """
509          identity = Identity(create_keys=False)
510          if identity.load_private_key(prv_bytes):
511              return identity
512          else:
513              return None
514  
515  
516      @staticmethod
517      def from_file(path):
518          """
519          Create a new :ref:`RNS.Identity<api-identity>` instance from a file.
520          Can be used to load previously created and saved identities into Reticulum.
521  
522          :param path: The full path to the saved :ref:`RNS.Identity<api-identity>` data
523          :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the loaded data was invalid.
524          """
525          identity = Identity(create_keys=False)
526          if identity.load(path):
527              return identity
528          else:
529              return None
530  
531      def to_file(self, path):
532          """
533          Saves the identity to a file. This will write the private key to disk,
534          and anyone with access to this file will be able to decrypt all
535          communication for the identity. Be very careful with this method.
536  
537          :param path: The full path specifying where to save the identity.
538          :returns: True if the file was saved, otherwise False.
539          """
540          try:
541              with open(path, "wb") as key_file:
542                  key_file.write(self.get_private_key())
543                  return True
544              return False
545          except Exception as e:
546              RNS.log("Error while saving identity to "+str(path), RNS.LOG_ERROR)
547              RNS.log("The contained exception was: "+str(e))
548  
549      def __init__(self,create_keys=True):
550          # Initialize keys to none
551          self.prv           = None
552          self.prv_bytes     = None
553          self.sig_prv       = None
554          self.sig_prv_bytes = None
555  
556          self.pub           = None
557          self.pub_bytes     = None
558          self.sig_pub       = None
559          self.sig_pub_bytes = None
560  
561          self.hash          = None
562          self.hexhash       = None
563  
564          if create_keys:
565              self.create_keys()
566  
567      def create_keys(self):
568          self.prv           = X25519PrivateKey.generate()
569          self.prv_bytes     = self.prv.private_bytes()
570  
571          self.sig_prv       = Ed25519PrivateKey.generate()
572          self.sig_prv_bytes = self.sig_prv.private_bytes()
573  
574          self.pub           = self.prv.public_key()
575          self.pub_bytes     = self.pub.public_bytes()
576  
577          self.sig_pub       = self.sig_prv.public_key()
578          self.sig_pub_bytes = self.sig_pub.public_bytes()
579  
580          self.update_hashes()
581  
582          RNS.log("Identity keys created for "+RNS.prettyhexrep(self.hash), RNS.LOG_VERBOSE)
583  
584      def get_private_key(self):
585          """
586          :returns: The private key as *bytes*
587          """
588          return self.prv_bytes+self.sig_prv_bytes
589  
590      def get_public_key(self):
591          """
592          :returns: The public key as *bytes*
593          """
594          return self.pub_bytes+self.sig_pub_bytes
595  
596      def load_private_key(self, prv_bytes):
597          """
598          Load a private key into the instance.
599  
600          :param prv_bytes: The private key as *bytes*.
601          :returns: True if the key was loaded, otherwise False.
602          """
603          try:
604              self.prv_bytes     = prv_bytes[:Identity.KEYSIZE//8//2]
605              self.prv           = X25519PrivateKey.from_private_bytes(self.prv_bytes)
606              self.sig_prv_bytes = prv_bytes[Identity.KEYSIZE//8//2:]
607              self.sig_prv       = Ed25519PrivateKey.from_private_bytes(self.sig_prv_bytes)
608              
609              self.pub           = self.prv.public_key()
610              self.pub_bytes     = self.pub.public_bytes()
611  
612              self.sig_pub       = self.sig_prv.public_key()
613              self.sig_pub_bytes = self.sig_pub.public_bytes()
614  
615              self.update_hashes()
616  
617              return True
618  
619          except Exception as e:
620              raise e
621              RNS.log("Failed to load identity key", RNS.LOG_ERROR)
622              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
623              return False
624  
625      def load_public_key(self, pub_bytes):
626          """
627          Load a public key into the instance.
628  
629          :param pub_bytes: The public key as *bytes*.
630          :returns: True if the key was loaded, otherwise False.
631          """
632          try:
633              self.pub_bytes     = pub_bytes[:Identity.KEYSIZE//8//2]
634              self.sig_pub_bytes = pub_bytes[Identity.KEYSIZE//8//2:]
635  
636              self.pub           = X25519PublicKey.from_public_bytes(self.pub_bytes)
637              self.sig_pub       = Ed25519PublicKey.from_public_bytes(self.sig_pub_bytes)
638  
639              self.update_hashes()
640          except Exception as e:
641              RNS.log("Error while loading public key, the contained exception was: "+str(e), RNS.LOG_ERROR)
642  
643      def update_hashes(self):
644          self.hash = Identity.truncated_hash(self.get_public_key())
645          self.hexhash = self.hash.hex()
646  
647      def load(self, path):
648          try:
649              with open(path, "rb") as key_file:
650                  prv_bytes = key_file.read()
651                  return self.load_private_key(prv_bytes)
652              return False
653          except Exception as e:
654              RNS.log("Error while loading identity from "+str(path), RNS.LOG_ERROR)
655              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
656  
657      def get_salt(self):
658          return self.hash
659  
660      def get_context(self):
661          return None
662  
663      def encrypt(self, plaintext, ratchet=None):
664          """
665          Encrypts information for the identity.
666  
667          :param plaintext: The plaintext to be encrypted as *bytes*.
668          :returns: Ciphertext token as *bytes*.
669          :raises: *KeyError* if the instance does not hold a public key.
670          """
671          if self.pub != None:
672              ephemeral_key = X25519PrivateKey.generate()
673              ephemeral_pub_bytes = ephemeral_key.public_key().public_bytes()
674  
675              if ratchet != None:
676                  target_public_key = X25519PublicKey.from_public_bytes(ratchet)
677              else:
678                  target_public_key = self.pub
679  
680              shared_key = ephemeral_key.exchange(target_public_key)
681              
682              derived_key = RNS.Cryptography.hkdf(
683                  length=Identity.DERIVED_KEY_LENGTH,
684                  derive_from=shared_key,
685                  salt=self.get_salt(),
686                  context=self.get_context(),
687              )
688  
689              token = Token(derived_key)
690              ciphertext = token.encrypt(plaintext)
691              token = ephemeral_pub_bytes+ciphertext
692  
693              return token
694          else:
695              raise KeyError("Encryption failed because identity does not hold a public key")
696  
697      def __decrypt(self, shared_key, ciphertext):
698          derived_key = RNS.Cryptography.hkdf(
699              length=Identity.DERIVED_KEY_LENGTH,
700              derive_from=shared_key,
701              salt=self.get_salt(),
702              context=self.get_context())
703  
704          token = Token(derived_key)
705          plaintext = token.decrypt(ciphertext)
706          return plaintext
707  
708      def decrypt(self, ciphertext_token, ratchets=None, enforce_ratchets=False, ratchet_id_receiver=None):
709          """
710          Decrypts information for the identity.
711  
712          :param ciphertext: The ciphertext to be decrypted as *bytes*.
713          :returns: Plaintext as *bytes*, or *None* if decryption fails.
714          :raises: *KeyError* if the instance does not hold a private key.
715          """
716  
717          if self.prv != None:
718              if len(ciphertext_token) > Identity.KEYSIZE//8//2:
719                  plaintext = None
720                  try:
721                      peer_pub_bytes = ciphertext_token[:Identity.KEYSIZE//8//2]
722                      peer_pub = X25519PublicKey.from_public_bytes(peer_pub_bytes)
723                      ciphertext = ciphertext_token[Identity.KEYSIZE//8//2:]
724  
725                      if ratchets:
726                          for ratchet in ratchets:
727                              try:
728                                  ratchet_prv = X25519PrivateKey.from_private_bytes(ratchet)
729                                  ratchet_id = Identity._get_ratchet_id(ratchet_prv.public_key().public_bytes())
730                                  shared_key = ratchet_prv.exchange(peer_pub)
731                                  plaintext = self.__decrypt(shared_key, ciphertext)
732                                  if ratchet_id_receiver:
733                                      ratchet_id_receiver.latest_ratchet_id = ratchet_id
734                                  
735                                  break
736                              
737                              except Exception as e:
738                                  pass
739  
740                      if enforce_ratchets and plaintext == None:
741                          RNS.log("Decryption with ratchet enforcement by "+RNS.prettyhexrep(self.hash)+" failed. Dropping packet.", RNS.LOG_DEBUG)
742                          if ratchet_id_receiver:
743                              ratchet_id_receiver.latest_ratchet_id = None
744                          return None
745  
746                      if plaintext == None:
747                          shared_key = self.prv.exchange(peer_pub)
748                          plaintext = self.__decrypt(shared_key, ciphertext)
749  
750                          if ratchet_id_receiver:
751                              ratchet_id_receiver.latest_ratchet_id = None
752  
753                  except Exception as e:
754                      RNS.log("Decryption by "+RNS.prettyhexrep(self.hash)+" failed: "+str(e), RNS.LOG_DEBUG)
755                      if ratchet_id_receiver:
756                          ratchet_id_receiver.latest_ratchet_id = None
757                      
758                  return plaintext
759              
760              else:
761                  RNS.log("Decryption failed because the token size was invalid.", RNS.LOG_DEBUG)
762                  return None
763          else:
764              raise KeyError("Decryption failed because identity does not hold a private key")
765  
766  
767      def sign(self, message):
768          """
769          Signs information by the identity.
770  
771          :param message: The message to be signed as *bytes*.
772          :returns: Signature as *bytes*.
773          :raises: *KeyError* if the instance does not hold a private key.
774          """
775          if self.sig_prv != None:
776              try:
777                  return self.sig_prv.sign(message)    
778              except Exception as e:
779                  RNS.log("The identity "+str(self)+" could not sign the requested message. The contained exception was: "+str(e), RNS.LOG_ERROR)
780                  raise e
781          else:
782              raise KeyError("Signing failed because identity does not hold a private key")
783  
784      def validate(self, signature, message):
785          """
786          Validates the signature of a signed message.
787  
788          :param signature: The signature to be validated as *bytes*.
789          :param message: The message to be validated as *bytes*.
790          :returns: True if the signature is valid, otherwise False.
791          :raises: *KeyError* if the instance does not hold a public key.
792          """
793          if self.pub != None:
794              try:
795                  self.sig_pub.verify(signature, message)
796                  return True
797              except Exception as e:
798                  return False
799          else:
800              raise KeyError("Signature validation failed because identity does not hold a public key")
801  
802      def prove(self, packet, destination=None):
803          signature = self.sign(packet.packet_hash)
804          if RNS.Reticulum.should_use_implicit_proof():
805              proof_data = signature
806          else:
807              proof_data = packet.packet_hash + signature
808          
809          if destination == None:
810              destination = packet.generate_proof_destination()
811  
812          proof = RNS.Packet(destination, proof_data, RNS.Packet.PROOF, attached_interface = packet.receiving_interface)
813          proof.send()
814  
815      def __str__(self):
816          return RNS.prettyhexrep(self.hash)