/ RNS / Identity.py
Identity.py
  1  # Reticulum License
  2  #
  3  # Copyright (c) 2016-2025 Mark Qvist
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # - The Software shall not be used in any kind of system which includes amongst
 13  #   its functions the ability to purposefully do harm to human beings.
 14  #
 15  # - The Software shall not be used, directly or indirectly, in the creation of
 16  #   an artificial intelligence, machine learning or language model training
 17  #   dataset, including but not limited to any use that contributes to the
 18  #   training or development of such a model or algorithm.
 19  #
 20  # - The above copyright notice and this permission notice shall be included in
 21  #   all copies or substantial portions of the Software.
 22  #
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 29  # SOFTWARE.
 30  
 31  import math
 32  import os
 33  import RNS
 34  import time
 35  import atexit
 36  import hashlib
 37  import threading
 38  
 39  from .vendor import umsgpack as umsgpack
 40  
 41  from RNS.Cryptography import X25519PrivateKey, X25519PublicKey, Ed25519PrivateKey, Ed25519PublicKey
 42  from RNS.Cryptography import Token
 43  
 44  
 45  class Identity:
 46      """
 47      This class is used to manage identities in Reticulum. It provides methods
 48      for encryption, decryption, signatures and verification, and is the basis
 49      for all encrypted communication over Reticulum networks.
 50  
 51      :param create_keys: Specifies whether new encryption and signing keys should be generated.
 52      """
 53  
 54      CURVE = "Curve25519"
 55      """
 56      The curve used for Elliptic Curve DH key exchanges
 57      """
 58  
 59      KEYSIZE     = 256*2
 60      """
 61      X.25519 key size in bits. A complete key is the concatenation of a 256 bit encryption key, and a 256 bit signing key.
 62      """
 63  
 64      RATCHETSIZE = 256
 65      """
 66      X.25519 ratchet key size in bits.
 67      """
 68  
 69      RATCHET_EXPIRY = 60*60*24*30
 70      """
 71      The expiry time for received ratchets in seconds, defaults to 30 days. Reticulum will always use the most recently
 72      announced ratchet, and remember it for up to ``RATCHET_EXPIRY`` since receiving it, after which it will be discarded.
 73      If a newer ratchet is announced in the meantime, it will be replace the already known ratchet.
 74      """
 75  
 76      # Non-configurable constants
 77      TOKEN_OVERHEAD            = RNS.Cryptography.Token.TOKEN_OVERHEAD
 78      AES128_BLOCKSIZE          = 16          # In bytes
 79      HASHLENGTH                = 256         # In bits
 80      SIGLENGTH                 = KEYSIZE     # In bits
 81  
 82      NAME_HASH_LENGTH          = 80
 83      TRUNCATED_HASHLENGTH      = RNS.Reticulum.TRUNCATED_HASHLENGTH
 84      """
 85      Constant specifying the truncated hash length (in bits) used by Reticulum
 86      for addressable hashes and other purposes. Non-configurable.
 87      """
 88  
 89      DERIVED_KEY_LENGTH        = 512//8
 90      DERIVED_KEY_LENGTH_LEGACY = 256//8
 91  
 92      # Storage
 93      known_destinations = {}
 94      known_ratchets = {}
 95  
 96      ratchet_persist_lock = threading.Lock()
 97  
 98      @staticmethod
 99      def remember(packet_hash, destination_hash, public_key, app_data = None):
100          if len(public_key) != Identity.KEYSIZE//8:
101              raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR)
102          else:
103              Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data]
104  
105  
106      @staticmethod
107      def recall(target_hash, from_identity_hash=False):
108          """
109          Recall identity for a destination or identity hash. By default, this function
110          will return the identity associated with a given *destination* hash. As an
111          example, if you know the ``lxmf.delivery`` destination hash of an endpoint,
112          this function will return the associated underlying identity. You can also
113          search for an identity from a known *identity hash*, by setting the
114          ``from_identity_hash`` argument.
115  
116          :param target_hash: Destination or identity hash as *bytes*.
117          :param from_identity_hash: Whether to search based on identity hash instead of destination hash as *bool*.
118          :returns: An :ref:`RNS.Identity<api-identity>` instance that can be used to create an outgoing :ref:`RNS.Destination<api-destination>`, or *None* if the destination is unknown.
119          """
120          if from_identity_hash:
121              for destination_hash in Identity.known_destinations:
122                  if target_hash == Identity.truncated_hash(Identity.known_destinations[destination_hash][2]):
123                      identity_data = Identity.known_destinations[destination_hash]
124                      identity = Identity(create_keys=False)
125                      identity.load_public_key(identity_data[2])
126                      identity.app_data = identity_data[3]
127                      return identity
128  
129              return None
130  
131          else:
132              if target_hash in Identity.known_destinations:
133                  identity_data = Identity.known_destinations[target_hash]
134                  identity = Identity(create_keys=False)
135                  identity.load_public_key(identity_data[2])
136                  identity.app_data = identity_data[3]
137                  return identity
138              else:
139                  for registered_destination in RNS.Transport.destinations:
140                      if target_hash == registered_destination.hash:
141                          identity = Identity(create_keys=False)
142                          identity.load_public_key(registered_destination.identity.get_public_key())
143                          identity.app_data = None
144                          return identity
145  
146                  return None
147  
148      @staticmethod
149      def recall_app_data(destination_hash):
150          """
151          Recall last heard app_data for a destination hash.
152  
153          :param destination_hash: Destination hash as *bytes*.
154          :returns: *Bytes* containing app_data, or *None* if the destination is unknown.
155          """
156          if destination_hash in Identity.known_destinations:
157              app_data = Identity.known_destinations[destination_hash][3]
158              return app_data
159          else:
160              return None
161  
162      @staticmethod
163      def save_known_destinations():
164          # TODO: Improve the storage method so we don't have to
165          # deserialize and serialize the entire table on every
166          # save, but the only changes. It might be possible to
167          # simply overwrite on exit now that every local client
168          # disconnect triggers a data persist.
169          
170          try:
171              if hasattr(Identity, "saving_known_destinations"):
172                  wait_interval = 0.2
173                  wait_timeout = 5
174                  wait_start = time.time()
175                  while Identity.saving_known_destinations:
176                      time.sleep(wait_interval)
177                      if time.time() > wait_start+wait_timeout:
178                          RNS.log("Could not save known destinations to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR)
179                          return False
180  
181              Identity.saving_known_destinations = True
182              save_start = time.time()
183  
184              storage_known_destinations = {}
185              if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
186                  try:
187                      with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file:
188                          storage_known_destinations = umsgpack.load(file)
189   
190                  except:
191                      pass
192  
193              try:
194                  for destination_hash in storage_known_destinations:
195                      if not destination_hash in Identity.known_destinations:
196                          Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash]
197              except Exception as e:
198                  RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING)
199  
200              RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_DEBUG)
201              with open(RNS.Reticulum.storagepath+"/known_destinations","wb") as file:
202                  umsgpack.dump(Identity.known_destinations, file)
203              
204  
205              save_time = time.time() - save_start
206              if save_time < 1:
207                  time_str = str(round(save_time*1000,2))+"ms"
208              else:
209                  time_str = str(round(save_time,2))+"s"
210  
211              RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_DEBUG)
212  
213          except Exception as e:
214              RNS.log("Error while saving known destinations to disk, the contained exception was: "+str(e), RNS.LOG_ERROR)
215              RNS.trace_exception(e)
216  
217          Identity.saving_known_destinations = False
218  
219      @staticmethod
220      def load_known_destinations():
221          if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
222              try:
223                  with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file:
224                      loaded_known_destinations = umsgpack.load(file)
225  
226                  Identity.known_destinations = {}
227                  for known_destination in loaded_known_destinations:
228                      if len(known_destination) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
229                          Identity.known_destinations[known_destination] = loaded_known_destinations[known_destination]
230  
231                  RNS.log("Loaded "+str(len(Identity.known_destinations))+" known destination from storage", RNS.LOG_VERBOSE)
232  
233              except Exception as e:
234                  RNS.log("Error loading known destinations from disk, file will be recreated on exit", RNS.LOG_ERROR)
235          else:
236              RNS.log("Destinations file does not exist, no known destinations loaded", RNS.LOG_VERBOSE)
237  
238      @staticmethod
239      def full_hash(data):
240          """
241          Get a SHA-256 hash of passed data.
242  
243          :param data: Data to be hashed as *bytes*.
244          :returns: SHA-256 hash as *bytes*.
245          """
246          return RNS.Cryptography.sha256(data)
247  
248      @staticmethod
249      def truncated_hash(data):
250          """
251          Get a truncated SHA-256 hash of passed data.
252  
253          :param data: Data to be hashed as *bytes*.
254          :returns: Truncated SHA-256 hash as *bytes*.
255          """
256          return Identity.full_hash(data)[:(Identity.TRUNCATED_HASHLENGTH//8)]
257  
258      @staticmethod
259      def get_random_hash():
260          """
261          Get a random SHA-256 hash.
262  
263          :param data: Data to be hashed as *bytes*.
264          :returns: Truncated SHA-256 hash of random data as *bytes*.
265          """
266          return Identity.truncated_hash(os.urandom(Identity.TRUNCATED_HASHLENGTH//8))
267  
268      @staticmethod
269      def current_ratchet_id(destination_hash):
270          """
271          Get the ID of the currently used ratchet key for a given destination hash
272  
273          :param destination_hash: A destination hash as *bytes*.
274          :returns: A ratchet ID as *bytes* or *None*.
275          """
276          ratchet = Identity.get_ratchet(destination_hash)
277          if ratchet == None:
278              return None
279          else:
280              return Identity._get_ratchet_id(ratchet)
281  
282      @staticmethod
283      def _get_ratchet_id(ratchet_pub_bytes):
284          return Identity.full_hash(ratchet_pub_bytes)[:Identity.NAME_HASH_LENGTH//8]
285  
286      @staticmethod
287      def _ratchet_public_bytes(ratchet):
288          return X25519PrivateKey.from_private_bytes(ratchet).public_key().public_bytes()
289  
290      @staticmethod
291      def _generate_ratchet():
292          ratchet_prv = X25519PrivateKey.generate()
293          ratchet_pub = ratchet_prv.public_key()
294          return ratchet_prv.private_bytes()
295  
296      @staticmethod
297      def _remember_ratchet(destination_hash, ratchet):
298          try:
299              if destination_hash in Identity.known_ratchets and Identity.known_ratchets[destination_hash] == ratchet:
300                  ratchet_exists = True
301              else:
302                  ratchet_exists = False
303  
304              if not ratchet_exists:
305                  RNS.log(f"Remembering ratchet {RNS.prettyhexrep(Identity._get_ratchet_id(ratchet))} for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_EXTREME)
306                  Identity.known_ratchets[destination_hash] = ratchet
307                  if not RNS.Transport.owner.is_connected_to_shared_instance:
308                      def persist_job():
309                          with Identity.ratchet_persist_lock:
310                              hexhash = RNS.hexrep(destination_hash, delimit=False)
311                              ratchet_data = {"ratchet": ratchet, "received": time.time()}
312  
313                              ratchetdir = RNS.Reticulum.storagepath+"/ratchets"
314                              
315                              if not os.path.isdir(ratchetdir):
316                                  os.makedirs(ratchetdir)
317  
318                              outpath   = f"{ratchetdir}/{hexhash}.out"
319                              finalpath = f"{ratchetdir}/{hexhash}"
320                              with open(outpath, "wb") as ratchet_file:
321                                  ratchet_file.write(umsgpack.packb(ratchet_data))
322                              os.replace(outpath, finalpath)
323  
324                      
325                      threading.Thread(target=persist_job, daemon=True).start()
326  
327          except Exception as e:
328              RNS.log(f"Could not persist ratchet for {RNS.prettyhexrep(destination_hash)} to storage.", RNS.LOG_ERROR)
329              RNS.log(f"The contained exception was: {e}")
330              RNS.trace_exception(e)
331  
332      @staticmethod
333      def _clean_ratchets():
334          RNS.log("Cleaning ratchets...", RNS.LOG_DEBUG)
335          try:
336              now = time.time()
337              ratchetdir = RNS.Reticulum.storagepath+"/ratchets"
338              if os.path.isdir(ratchetdir):
339                  for filename in os.listdir(ratchetdir):
340                      try:
341                          expired = False
342                          corrupted = False
343                          with open(f"{ratchetdir}/{filename}", "rb") as rf:
344                              # TODO: Remove individual ratchet file if corrupt
345                              try:
346                                  ratchet_data = umsgpack.unpackb(rf.read())
347                                  if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY:
348                                      expired = True
349  
350                              except Exception as e:
351                                  RNS.log(f"Corrupted ratchet data while reading {ratchetdir}/{filename}, removing file", RNS.LOG_ERROR)
352                                  corrupted = True
353  
354                          if expired or corrupted:
355                              os.unlink(f"{ratchetdir}/{filename}")
356  
357                      except Exception as e:
358                          RNS.log(f"An error occurred while cleaning ratchets, in the processing of {ratchetdir}/{filename}.", RNS.LOG_ERROR)
359                          RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
360  
361          except Exception as e:
362              RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR)
363  
364      @staticmethod
365      def get_ratchet(destination_hash):
366          if not destination_hash in Identity.known_ratchets:
367              ratchetdir = RNS.Reticulum.storagepath+"/ratchets"
368              hexhash = RNS.hexrep(destination_hash, delimit=False)
369              ratchet_path = f"{ratchetdir}/{hexhash}"
370              if os.path.isfile(ratchet_path):
371                  try:
372                      with open(ratchet_path, "rb") as ratchet_file:
373                          ratchet_data = umsgpack.unpackb(ratchet_file.read())
374                          if time.time() < ratchet_data["received"]+Identity.RATCHET_EXPIRY and len(ratchet_data["ratchet"]) == Identity.RATCHETSIZE//8:
375                              Identity.known_ratchets[destination_hash] = ratchet_data["ratchet"]
376                          else:
377                              return None
378                  
379                  except Exception as e:
380                      RNS.log(f"An error occurred while loading ratchet data for {RNS.prettyhexrep(destination_hash)} from storage.", RNS.LOG_ERROR)
381                      RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
382                      return None
383  
384          if destination_hash in Identity.known_ratchets:
385              return Identity.known_ratchets[destination_hash]
386          else:
387              RNS.log(f"Could not load ratchet for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
388              return None
389  
390      @staticmethod
391      def validate_announce(packet, only_validate_signature=False):
392          try:
393              if packet.packet_type == RNS.Packet.ANNOUNCE:
394                  keysize       = Identity.KEYSIZE//8
395                  ratchetsize   = Identity.RATCHETSIZE//8
396                  name_hash_len = Identity.NAME_HASH_LENGTH//8
397                  sig_len       = Identity.SIGLENGTH//8
398                  destination_hash = packet.destination_hash
399  
400                  # Get public key bytes from announce
401                  public_key = packet.data[:keysize]
402  
403                  # If the packet context flag is set,
404                  # this announce contains a new ratchet
405                  if packet.context_flag == RNS.Packet.FLAG_SET:
406                      name_hash   = packet.data[keysize:keysize+name_hash_len ]
407                      random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10]
408                      ratchet     = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+ratchetsize]
409                      signature   = packet.data[keysize+name_hash_len+10+ratchetsize:keysize+name_hash_len+10+ratchetsize+sig_len]
410                      app_data    = b""
411                      if len(packet.data) > keysize+name_hash_len+10+sig_len+ratchetsize:
412                          app_data = packet.data[keysize+name_hash_len+10+sig_len+ratchetsize:]
413  
414                  # If the packet context flag is not set,
415                  # this announce does not contain a ratchet
416                  else:
417                      ratchet     = b""
418                      name_hash   = packet.data[keysize:keysize+name_hash_len]
419                      random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10]
420                      signature   = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+sig_len]
421                      app_data    = b""
422                      if len(packet.data) > keysize+name_hash_len+10+sig_len:
423                          app_data = packet.data[keysize+name_hash_len+10+sig_len:]
424  
425                  signed_data = destination_hash+public_key+name_hash+random_hash+ratchet+app_data
426  
427                  if not len(packet.data) > Identity.KEYSIZE//8+Identity.NAME_HASH_LENGTH//8+10+Identity.SIGLENGTH//8:
428                      app_data = None
429  
430                  announced_identity = Identity(create_keys=False)
431                  announced_identity.load_public_key(public_key)
432  
433                  if len(RNS.Transport.blackholed_identities) > 0:
434                      if announced_identity.hash in RNS.Transport.blackholed_identities:
435                          RNS.log(f"Invalidated and dropped announce from blackholed identity {RNS.prettyhexrep(announced_identity.hash)}", RNS.LOG_EXTREME)
436                          return False
437  
438                  if announced_identity.pub != None and announced_identity.validate(signature, signed_data):
439                      if only_validate_signature:
440                          del announced_identity
441                          return True
442  
443                      hash_material = name_hash+announced_identity.hash
444                      expected_hash = RNS.Identity.full_hash(hash_material)[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8]
445  
446                      if destination_hash == expected_hash:
447                          # Check if we already have a public key for this destination
448                          # and make sure the public key is not different.
449                          if destination_hash in Identity.known_destinations:
450                              if public_key != Identity.known_destinations[destination_hash][2]:
451                                  # In reality, this should never occur, but in the odd case
452                                  # that someone manages a hash collision, we reject the announce.
453                                  RNS.log("Received announce with valid signature and destination hash, but announced public key does not match already known public key.", RNS.LOG_CRITICAL)
454                                  RNS.log("This may indicate an attempt to modify network paths, or a random hash collision. The announce was rejected.", RNS.LOG_CRITICAL)
455                                  return False
456  
457                          RNS.Identity.remember(packet.get_hash(), destination_hash, public_key, app_data)
458                          del announced_identity
459  
460                          if packet.rssi != None or packet.snr != None:
461                              signal_str = " ["
462                              if packet.rssi != None:
463                                  signal_str += "RSSI "+str(packet.rssi)+"dBm"
464                                  if packet.snr != None:
465                                      signal_str += ", "
466                              if packet.snr != None:
467                                  signal_str += "SNR "+str(packet.snr)+"dB"
468                              signal_str += "]"
469                          else:
470                              signal_str = ""
471  
472                          if hasattr(packet, "transport_id") and packet.transport_id != None:
473                              RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received via "+RNS.prettyhexrep(packet.transport_id)+" on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME)
474                          else:
475                              RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME)
476  
477                          if ratchet:
478                              Identity._remember_ratchet(destination_hash, ratchet)
479  
480                          return True
481  
482                      else:
483                          RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Destination mismatch.", RNS.LOG_DEBUG)
484                          return False
485  
486                  else:
487                      RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Invalid signature.", RNS.LOG_DEBUG)
488                      del announced_identity
489                      return False
490          
491          except Exception as e:
492              RNS.log("Error occurred while validating announce. The contained exception was: "+str(e), RNS.LOG_ERROR)
493              return False
494  
495      @staticmethod
496      def persist_data():
497          if not RNS.Transport.owner.is_connected_to_shared_instance:
498              Identity.save_known_destinations()
499  
500      @staticmethod
501      def exit_handler():
502          Identity.persist_data()
503  
504  
505      @staticmethod
506      def from_bytes(prv_bytes):
507          """
508          Create a new :ref:`RNS.Identity<api-identity>` instance from *bytes* of private key.
509          Can be used to load previously created and saved identities into Reticulum.
510  
511          :param prv_bytes: The *bytes* of private a saved private key. **HAZARD!** Never use this to generate a new key by feeding random data in prv_bytes.
512          :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the *bytes* data was invalid.
513          """
514          identity = Identity(create_keys=False)
515          if identity.load_private_key(prv_bytes):
516              return identity
517          else:
518              return None
519  
520  
521      @staticmethod
522      def from_file(path):
523          """
524          Create a new :ref:`RNS.Identity<api-identity>` instance from a file.
525          Can be used to load previously created and saved identities into Reticulum.
526  
527          :param path: The full path to the saved :ref:`RNS.Identity<api-identity>` data
528          :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the loaded data was invalid.
529          """
530          identity = Identity(create_keys=False)
531          if identity.load(path):
532              return identity
533          else:
534              return None
535  
536      def to_file(self, path):
537          """
538          Saves the identity to a file. This will write the private key to disk,
539          and anyone with access to this file will be able to decrypt all
540          communication for the identity. Be very careful with this method.
541  
542          :param path: The full path specifying where to save the identity.
543          :returns: True if the file was saved, otherwise False.
544          """
545          try:
546              with open(path, "wb") as key_file:
547                  key_file.write(self.get_private_key())
548                  return True
549              return False
550          except Exception as e:
551              RNS.log("Error while saving identity to "+str(path), RNS.LOG_ERROR)
552              RNS.log("The contained exception was: "+str(e))
553  
554      def __init__(self,create_keys=True):
555          # Initialize keys to none
556          self.prv           = None
557          self.prv_bytes     = None
558          self.sig_prv       = None
559          self.sig_prv_bytes = None
560  
561          self.pub           = None
562          self.pub_bytes     = None
563          self.sig_pub       = None
564          self.sig_pub_bytes = None
565  
566          self.hash          = None
567          self.hexhash       = None
568  
569          if create_keys:
570              self.create_keys()
571  
572      def create_keys(self):
573          self.prv           = X25519PrivateKey.generate()
574          self.prv_bytes     = self.prv.private_bytes()
575  
576          self.sig_prv       = Ed25519PrivateKey.generate()
577          self.sig_prv_bytes = self.sig_prv.private_bytes()
578  
579          self.pub           = self.prv.public_key()
580          self.pub_bytes     = self.pub.public_bytes()
581  
582          self.sig_pub       = self.sig_prv.public_key()
583          self.sig_pub_bytes = self.sig_pub.public_bytes()
584  
585          self.update_hashes()
586  
587          RNS.log("Identity keys created for "+RNS.prettyhexrep(self.hash), RNS.LOG_VERBOSE)
588  
589      def get_private_key(self):
590          """
591          :returns: The private key as *bytes*
592          """
593          return self.prv_bytes+self.sig_prv_bytes
594  
595      def get_public_key(self):
596          """
597          :returns: The public key as *bytes*
598          """
599          return self.pub_bytes+self.sig_pub_bytes
600  
601      def load_private_key(self, prv_bytes):
602          """
603          Load a private key into the instance.
604  
605          :param prv_bytes: The private key as *bytes*.
606          :returns: True if the key was loaded, otherwise False.
607          """
608          try:
609              self.prv_bytes     = prv_bytes[:Identity.KEYSIZE//8//2]
610              self.prv           = X25519PrivateKey.from_private_bytes(self.prv_bytes)
611              self.sig_prv_bytes = prv_bytes[Identity.KEYSIZE//8//2:]
612              self.sig_prv       = Ed25519PrivateKey.from_private_bytes(self.sig_prv_bytes)
613              
614              self.pub           = self.prv.public_key()
615              self.pub_bytes     = self.pub.public_bytes()
616  
617              self.sig_pub       = self.sig_prv.public_key()
618              self.sig_pub_bytes = self.sig_pub.public_bytes()
619  
620              self.update_hashes()
621  
622              return True
623  
624          except Exception as e:
625              raise e
626              RNS.log("Failed to load identity key", RNS.LOG_ERROR)
627              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
628              return False
629  
630      def load_public_key(self, pub_bytes):
631          """
632          Load a public key into the instance.
633  
634          :param pub_bytes: The public key as *bytes*.
635          :returns: True if the key was loaded, otherwise False.
636          """
637          try:
638              self.pub_bytes     = pub_bytes[:Identity.KEYSIZE//8//2]
639              self.sig_pub_bytes = pub_bytes[Identity.KEYSIZE//8//2:]
640  
641              self.pub           = X25519PublicKey.from_public_bytes(self.pub_bytes)
642              self.sig_pub       = Ed25519PublicKey.from_public_bytes(self.sig_pub_bytes)
643  
644              self.update_hashes()
645          except Exception as e:
646              RNS.log("Error while loading public key, the contained exception was: "+str(e), RNS.LOG_ERROR)
647  
648      def update_hashes(self):
649          self.hash = Identity.truncated_hash(self.get_public_key())
650          self.hexhash = self.hash.hex()
651  
652      def load(self, path):
653          try:
654              with open(path, "rb") as key_file:
655                  prv_bytes = key_file.read()
656                  return self.load_private_key(prv_bytes)
657              return False
658          except Exception as e:
659              RNS.log("Error while loading identity from "+str(path), RNS.LOG_ERROR)
660              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
661  
662      def get_salt(self):
663          return self.hash
664  
665      def get_context(self):
666          return None
667  
668      def encrypt(self, plaintext, ratchet=None):
669          """
670          Encrypts information for the identity.
671  
672          :param plaintext: The plaintext to be encrypted as *bytes*.
673          :returns: Ciphertext token as *bytes*.
674          :raises: *KeyError* if the instance does not hold a public key.
675          """
676          if self.pub != None:
677              ephemeral_key = X25519PrivateKey.generate()
678              ephemeral_pub_bytes = ephemeral_key.public_key().public_bytes()
679  
680              if ratchet != None:
681                  target_public_key = X25519PublicKey.from_public_bytes(ratchet)
682              else:
683                  target_public_key = self.pub
684  
685              shared_key = ephemeral_key.exchange(target_public_key)
686              
687              derived_key = RNS.Cryptography.hkdf(
688                  length=Identity.DERIVED_KEY_LENGTH,
689                  derive_from=shared_key,
690                  salt=self.get_salt(),
691                  context=self.get_context(),
692              )
693  
694              token = Token(derived_key)
695              ciphertext = token.encrypt(plaintext)
696              token = ephemeral_pub_bytes+ciphertext
697  
698              return token
699          else:
700              raise KeyError("Encryption failed because identity does not hold a public key")
701  
702      def __decrypt(self, shared_key, ciphertext):
703          derived_key = RNS.Cryptography.hkdf(
704              length=Identity.DERIVED_KEY_LENGTH,
705              derive_from=shared_key,
706              salt=self.get_salt(),
707              context=self.get_context())
708  
709          token = Token(derived_key)
710          plaintext = token.decrypt(ciphertext)
711          return plaintext
712  
713      def decrypt(self, ciphertext_token, ratchets=None, enforce_ratchets=False, ratchet_id_receiver=None):
714          """
715          Decrypts information for the identity.
716  
717          :param ciphertext: The ciphertext to be decrypted as *bytes*.
718          :returns: Plaintext as *bytes*, or *None* if decryption fails.
719          :raises: *KeyError* if the instance does not hold a private key.
720          """
721  
722          if self.prv != None:
723              if len(ciphertext_token) > Identity.KEYSIZE//8//2:
724                  plaintext = None
725                  try:
726                      peer_pub_bytes = ciphertext_token[:Identity.KEYSIZE//8//2]
727                      peer_pub = X25519PublicKey.from_public_bytes(peer_pub_bytes)
728                      ciphertext = ciphertext_token[Identity.KEYSIZE//8//2:]
729  
730                      if ratchets:
731                          for ratchet in ratchets:
732                              try:
733                                  ratchet_prv = X25519PrivateKey.from_private_bytes(ratchet)
734                                  ratchet_id = Identity._get_ratchet_id(ratchet_prv.public_key().public_bytes())
735                                  shared_key = ratchet_prv.exchange(peer_pub)
736                                  plaintext = self.__decrypt(shared_key, ciphertext)
737                                  if ratchet_id_receiver:
738                                      ratchet_id_receiver.latest_ratchet_id = ratchet_id
739                                  
740                                  break
741                              
742                              except Exception as e:
743                                  pass
744  
745                      if enforce_ratchets and plaintext == None:
746                          RNS.log("Decryption with ratchet enforcement by "+RNS.prettyhexrep(self.hash)+" failed. Dropping packet.", RNS.LOG_DEBUG)
747                          if ratchet_id_receiver:
748                              ratchet_id_receiver.latest_ratchet_id = None
749                          return None
750  
751                      if plaintext == None:
752                          shared_key = self.prv.exchange(peer_pub)
753                          plaintext = self.__decrypt(shared_key, ciphertext)
754  
755                          if ratchet_id_receiver:
756                              ratchet_id_receiver.latest_ratchet_id = None
757  
758                  except Exception as e:
759                      RNS.log("Decryption by "+RNS.prettyhexrep(self.hash)+" failed: "+str(e), RNS.LOG_DEBUG)
760                      if ratchet_id_receiver:
761                          ratchet_id_receiver.latest_ratchet_id = None
762                      
763                  return plaintext
764              
765              else:
766                  RNS.log("Decryption failed because the token size was invalid.", RNS.LOG_DEBUG)
767                  return None
768          else:
769              raise KeyError("Decryption failed because identity does not hold a private key")
770  
771  
772      def sign(self, message):
773          """
774          Signs information by the identity.
775  
776          :param message: The message to be signed as *bytes*.
777          :returns: Signature as *bytes*.
778          :raises: *KeyError* if the instance does not hold a private key.
779          """
780          if self.sig_prv != None:
781              try:
782                  return self.sig_prv.sign(message)    
783              except Exception as e:
784                  RNS.log("The identity "+str(self)+" could not sign the requested message. The contained exception was: "+str(e), RNS.LOG_ERROR)
785                  raise e
786          else:
787              raise KeyError("Signing failed because identity does not hold a private key")
788  
789      def validate(self, signature, message):
790          """
791          Validates the signature of a signed message.
792  
793          :param signature: The signature to be validated as *bytes*.
794          :param message: The message to be validated as *bytes*.
795          :returns: True if the signature is valid, otherwise False.
796          :raises: *KeyError* if the instance does not hold a public key.
797          """
798          if self.pub != None:
799              try:
800                  self.sig_pub.verify(signature, message)
801                  return True
802              except Exception as e:
803                  return False
804          else:
805              raise KeyError("Signature validation failed because identity does not hold a public key")
806  
807      def prove(self, packet, destination=None):
808          signature = self.sign(packet.packet_hash)
809          if RNS.Reticulum.should_use_implicit_proof():
810              proof_data = signature
811          else:
812              proof_data = packet.packet_hash + signature
813          
814          if destination == None:
815              destination = packet.generate_proof_destination()
816  
817          proof = RNS.Packet(destination, proof_data, RNS.Packet.PROOF, attached_interface = packet.receiving_interface)
818          proof.send()
819  
820      def __str__(self):
821          return RNS.prettyhexrep(self.hash)