Identity.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import math 32 import os 33 import RNS 34 import time 35 import atexit 36 import hashlib 37 import threading 38 39 from .vendor import umsgpack as umsgpack 40 41 from RNS.Cryptography import X25519PrivateKey, X25519PublicKey, Ed25519PrivateKey, Ed25519PublicKey 42 from RNS.Cryptography import Token 43 44 45 class Identity: 46 """ 47 This class is used to manage identities in Reticulum. It provides methods 48 for encryption, decryption, signatures and verification, and is the basis 49 for all encrypted communication over Reticulum networks. 50 51 :param create_keys: Specifies whether new encryption and signing keys should be generated. 52 """ 53 54 CURVE = "Curve25519" 55 """ 56 The curve used for Elliptic Curve DH key exchanges 57 """ 58 59 KEYSIZE = 256*2 60 """ 61 X.25519 key size in bits. A complete key is the concatenation of a 256 bit encryption key, and a 256 bit signing key. 62 """ 63 64 RATCHETSIZE = 256 65 """ 66 X.25519 ratchet key size in bits. 67 """ 68 69 RATCHET_EXPIRY = 60*60*24*30 70 """ 71 The expiry time for received ratchets in seconds, defaults to 30 days. Reticulum will always use the most recently 72 announced ratchet, and remember it for up to ``RATCHET_EXPIRY`` since receiving it, after which it will be discarded. 73 If a newer ratchet is announced in the meantime, it will be replace the already known ratchet. 74 """ 75 76 # Non-configurable constants 77 TOKEN_OVERHEAD = RNS.Cryptography.Token.TOKEN_OVERHEAD 78 AES128_BLOCKSIZE = 16 # In bytes 79 HASHLENGTH = 256 # In bits 80 SIGLENGTH = KEYSIZE # In bits 81 82 NAME_HASH_LENGTH = 80 83 TRUNCATED_HASHLENGTH = RNS.Reticulum.TRUNCATED_HASHLENGTH 84 """ 85 Constant specifying the truncated hash length (in bits) used by Reticulum 86 for addressable hashes and other purposes. Non-configurable. 87 """ 88 89 DERIVED_KEY_LENGTH = 512//8 90 DERIVED_KEY_LENGTH_LEGACY = 256//8 91 92 # Storage 93 known_destinations = {} 94 known_ratchets = {} 95 96 ratchet_persist_lock = threading.Lock() 97 98 @staticmethod 99 def remember(packet_hash, destination_hash, public_key, app_data = None): 100 if len(public_key) != Identity.KEYSIZE//8: 101 raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR) 102 else: 103 Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data] 104 105 106 @staticmethod 107 def recall(target_hash, from_identity_hash=False): 108 """ 109 Recall identity for a destination or identity hash. By default, this function 110 will return the identity associated with a given *destination* hash. As an 111 example, if you know the ``lxmf.delivery`` destination hash of an endpoint, 112 this function will return the associated underlying identity. You can also 113 search for an identity from a known *identity hash*, by setting the 114 ``from_identity_hash`` argument. 115 116 :param target_hash: Destination or identity hash as *bytes*. 117 :param from_identity_hash: Whether to search based on identity hash instead of destination hash as *bool*. 118 :returns: An :ref:`RNS.Identity<api-identity>` instance that can be used to create an outgoing :ref:`RNS.Destination<api-destination>`, or *None* if the destination is unknown. 119 """ 120 if from_identity_hash: 121 for destination_hash in Identity.known_destinations: 122 if target_hash == Identity.truncated_hash(Identity.known_destinations[destination_hash][2]): 123 identity_data = Identity.known_destinations[destination_hash] 124 identity = Identity(create_keys=False) 125 identity.load_public_key(identity_data[2]) 126 identity.app_data = identity_data[3] 127 return identity 128 129 return None 130 131 else: 132 if target_hash in Identity.known_destinations: 133 identity_data = Identity.known_destinations[target_hash] 134 identity = Identity(create_keys=False) 135 identity.load_public_key(identity_data[2]) 136 identity.app_data = identity_data[3] 137 return identity 138 else: 139 for registered_destination in RNS.Transport.destinations: 140 if target_hash == registered_destination.hash: 141 identity = Identity(create_keys=False) 142 identity.load_public_key(registered_destination.identity.get_public_key()) 143 identity.app_data = None 144 return identity 145 146 return None 147 148 @staticmethod 149 def recall_app_data(destination_hash): 150 """ 151 Recall last heard app_data for a destination hash. 152 153 :param destination_hash: Destination hash as *bytes*. 154 :returns: *Bytes* containing app_data, or *None* if the destination is unknown. 155 """ 156 if destination_hash in Identity.known_destinations: 157 app_data = Identity.known_destinations[destination_hash][3] 158 return app_data 159 else: 160 return None 161 162 @staticmethod 163 def save_known_destinations(): 164 # TODO: Improve the storage method so we don't have to 165 # deserialize and serialize the entire table on every 166 # save, but the only changes. It might be possible to 167 # simply overwrite on exit now that every local client 168 # disconnect triggers a data persist. 169 170 try: 171 if hasattr(Identity, "saving_known_destinations"): 172 wait_interval = 0.2 173 wait_timeout = 5 174 wait_start = time.time() 175 while Identity.saving_known_destinations: 176 time.sleep(wait_interval) 177 if time.time() > wait_start+wait_timeout: 178 RNS.log("Could not save known destinations to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR) 179 return False 180 181 Identity.saving_known_destinations = True 182 save_start = time.time() 183 184 storage_known_destinations = {} 185 if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"): 186 try: 187 with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file: 188 storage_known_destinations = umsgpack.load(file) 189 190 except: 191 pass 192 193 try: 194 for destination_hash in storage_known_destinations: 195 if not destination_hash in Identity.known_destinations: 196 Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash] 197 except Exception as e: 198 RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING) 199 200 RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_DEBUG) 201 with open(RNS.Reticulum.storagepath+"/known_destinations","wb") as file: 202 umsgpack.dump(Identity.known_destinations, file) 203 204 205 save_time = time.time() - save_start 206 if save_time < 1: 207 time_str = str(round(save_time*1000,2))+"ms" 208 else: 209 time_str = str(round(save_time,2))+"s" 210 211 RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_DEBUG) 212 213 except Exception as e: 214 RNS.log("Error while saving known destinations to disk, the contained exception was: "+str(e), RNS.LOG_ERROR) 215 RNS.trace_exception(e) 216 217 Identity.saving_known_destinations = False 218 219 @staticmethod 220 def load_known_destinations(): 221 if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"): 222 try: 223 with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file: 224 loaded_known_destinations = umsgpack.load(file) 225 226 Identity.known_destinations = {} 227 for known_destination in loaded_known_destinations: 228 if len(known_destination) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: 229 Identity.known_destinations[known_destination] = loaded_known_destinations[known_destination] 230 231 RNS.log("Loaded "+str(len(Identity.known_destinations))+" known destination from storage", RNS.LOG_VERBOSE) 232 233 except Exception as e: 234 RNS.log("Error loading known destinations from disk, file will be recreated on exit", RNS.LOG_ERROR) 235 else: 236 RNS.log("Destinations file does not exist, no known destinations loaded", RNS.LOG_VERBOSE) 237 238 @staticmethod 239 def full_hash(data): 240 """ 241 Get a SHA-256 hash of passed data. 242 243 :param data: Data to be hashed as *bytes*. 244 :returns: SHA-256 hash as *bytes*. 245 """ 246 return RNS.Cryptography.sha256(data) 247 248 @staticmethod 249 def truncated_hash(data): 250 """ 251 Get a truncated SHA-256 hash of passed data. 252 253 :param data: Data to be hashed as *bytes*. 254 :returns: Truncated SHA-256 hash as *bytes*. 255 """ 256 return Identity.full_hash(data)[:(Identity.TRUNCATED_HASHLENGTH//8)] 257 258 @staticmethod 259 def get_random_hash(): 260 """ 261 Get a random SHA-256 hash. 262 263 :param data: Data to be hashed as *bytes*. 264 :returns: Truncated SHA-256 hash of random data as *bytes*. 265 """ 266 return Identity.truncated_hash(os.urandom(Identity.TRUNCATED_HASHLENGTH//8)) 267 268 @staticmethod 269 def current_ratchet_id(destination_hash): 270 """ 271 Get the ID of the currently used ratchet key for a given destination hash 272 273 :param destination_hash: A destination hash as *bytes*. 274 :returns: A ratchet ID as *bytes* or *None*. 275 """ 276 ratchet = Identity.get_ratchet(destination_hash) 277 if ratchet == None: 278 return None 279 else: 280 return Identity._get_ratchet_id(ratchet) 281 282 @staticmethod 283 def _get_ratchet_id(ratchet_pub_bytes): 284 return Identity.full_hash(ratchet_pub_bytes)[:Identity.NAME_HASH_LENGTH//8] 285 286 @staticmethod 287 def _ratchet_public_bytes(ratchet): 288 return X25519PrivateKey.from_private_bytes(ratchet).public_key().public_bytes() 289 290 @staticmethod 291 def _generate_ratchet(): 292 ratchet_prv = X25519PrivateKey.generate() 293 ratchet_pub = ratchet_prv.public_key() 294 return ratchet_prv.private_bytes() 295 296 @staticmethod 297 def _remember_ratchet(destination_hash, ratchet): 298 try: 299 if destination_hash in Identity.known_ratchets and Identity.known_ratchets[destination_hash] == ratchet: 300 ratchet_exists = True 301 else: 302 ratchet_exists = False 303 304 if not ratchet_exists: 305 RNS.log(f"Remembering ratchet {RNS.prettyhexrep(Identity._get_ratchet_id(ratchet))} for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_EXTREME) 306 Identity.known_ratchets[destination_hash] = ratchet 307 if not RNS.Transport.owner.is_connected_to_shared_instance: 308 def persist_job(): 309 with Identity.ratchet_persist_lock: 310 hexhash = RNS.hexrep(destination_hash, delimit=False) 311 ratchet_data = {"ratchet": ratchet, "received": time.time()} 312 313 ratchetdir = RNS.Reticulum.storagepath+"/ratchets" 314 315 if not os.path.isdir(ratchetdir): 316 os.makedirs(ratchetdir) 317 318 outpath = f"{ratchetdir}/{hexhash}.out" 319 finalpath = f"{ratchetdir}/{hexhash}" 320 with open(outpath, "wb") as ratchet_file: 321 ratchet_file.write(umsgpack.packb(ratchet_data)) 322 os.replace(outpath, finalpath) 323 324 325 threading.Thread(target=persist_job, daemon=True).start() 326 327 except Exception as e: 328 RNS.log(f"Could not persist ratchet for {RNS.prettyhexrep(destination_hash)} to storage.", RNS.LOG_ERROR) 329 RNS.log(f"The contained exception was: {e}") 330 RNS.trace_exception(e) 331 332 @staticmethod 333 def _clean_ratchets(): 334 RNS.log("Cleaning ratchets...", RNS.LOG_DEBUG) 335 try: 336 now = time.time() 337 ratchetdir = RNS.Reticulum.storagepath+"/ratchets" 338 if os.path.isdir(ratchetdir): 339 for filename in os.listdir(ratchetdir): 340 try: 341 expired = False 342 corrupted = False 343 with open(f"{ratchetdir}/{filename}", "rb") as rf: 344 # TODO: Remove individual ratchet file if corrupt 345 try: 346 ratchet_data = umsgpack.unpackb(rf.read()) 347 if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY: 348 expired = True 349 350 except Exception as e: 351 RNS.log(f"Corrupted ratchet data while reading {ratchetdir}/{filename}, removing file", RNS.LOG_ERROR) 352 corrupted = True 353 354 if expired or corrupted: 355 os.unlink(f"{ratchetdir}/{filename}") 356 357 except Exception as e: 358 RNS.log(f"An error occurred while cleaning ratchets, in the processing of {ratchetdir}/{filename}.", RNS.LOG_ERROR) 359 RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR) 360 361 except Exception as e: 362 RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR) 363 364 @staticmethod 365 def get_ratchet(destination_hash): 366 if not destination_hash in Identity.known_ratchets: 367 ratchetdir = RNS.Reticulum.storagepath+"/ratchets" 368 hexhash = RNS.hexrep(destination_hash, delimit=False) 369 ratchet_path = f"{ratchetdir}/{hexhash}" 370 if os.path.isfile(ratchet_path): 371 try: 372 with open(ratchet_path, "rb") as ratchet_file: 373 ratchet_data = umsgpack.unpackb(ratchet_file.read()) 374 if time.time() < ratchet_data["received"]+Identity.RATCHET_EXPIRY and len(ratchet_data["ratchet"]) == Identity.RATCHETSIZE//8: 375 Identity.known_ratchets[destination_hash] = ratchet_data["ratchet"] 376 else: 377 return None 378 379 except Exception as e: 380 RNS.log(f"An error occurred while loading ratchet data for {RNS.prettyhexrep(destination_hash)} from storage.", RNS.LOG_ERROR) 381 RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR) 382 return None 383 384 if destination_hash in Identity.known_ratchets: 385 return Identity.known_ratchets[destination_hash] 386 else: 387 RNS.log(f"Could not load ratchet for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) 388 return None 389 390 @staticmethod 391 def validate_announce(packet, only_validate_signature=False): 392 try: 393 if packet.packet_type == RNS.Packet.ANNOUNCE: 394 keysize = Identity.KEYSIZE//8 395 ratchetsize = Identity.RATCHETSIZE//8 396 name_hash_len = Identity.NAME_HASH_LENGTH//8 397 sig_len = Identity.SIGLENGTH//8 398 destination_hash = packet.destination_hash 399 400 # Get public key bytes from announce 401 public_key = packet.data[:keysize] 402 403 # If the packet context flag is set, 404 # this announce contains a new ratchet 405 if packet.context_flag == RNS.Packet.FLAG_SET: 406 name_hash = packet.data[keysize:keysize+name_hash_len ] 407 random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10] 408 ratchet = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+ratchetsize] 409 signature = packet.data[keysize+name_hash_len+10+ratchetsize:keysize+name_hash_len+10+ratchetsize+sig_len] 410 app_data = b"" 411 if len(packet.data) > keysize+name_hash_len+10+sig_len+ratchetsize: 412 app_data = packet.data[keysize+name_hash_len+10+sig_len+ratchetsize:] 413 414 # If the packet context flag is not set, 415 # this announce does not contain a ratchet 416 else: 417 ratchet = b"" 418 name_hash = packet.data[keysize:keysize+name_hash_len] 419 random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10] 420 signature = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+sig_len] 421 app_data = b"" 422 if len(packet.data) > keysize+name_hash_len+10+sig_len: 423 app_data = packet.data[keysize+name_hash_len+10+sig_len:] 424 425 signed_data = destination_hash+public_key+name_hash+random_hash+ratchet+app_data 426 427 if not len(packet.data) > Identity.KEYSIZE//8+Identity.NAME_HASH_LENGTH//8+10+Identity.SIGLENGTH//8: 428 app_data = None 429 430 announced_identity = Identity(create_keys=False) 431 announced_identity.load_public_key(public_key) 432 433 if len(RNS.Transport.blackholed_identities) > 0: 434 if announced_identity.hash in RNS.Transport.blackholed_identities: 435 RNS.log(f"Invalidated and dropped announce from blackholed identity {RNS.prettyhexrep(announced_identity.hash)}", RNS.LOG_EXTREME) 436 return False 437 438 if announced_identity.pub != None and announced_identity.validate(signature, signed_data): 439 if only_validate_signature: 440 del announced_identity 441 return True 442 443 hash_material = name_hash+announced_identity.hash 444 expected_hash = RNS.Identity.full_hash(hash_material)[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8] 445 446 if destination_hash == expected_hash: 447 # Check if we already have a public key for this destination 448 # and make sure the public key is not different. 449 if destination_hash in Identity.known_destinations: 450 if public_key != Identity.known_destinations[destination_hash][2]: 451 # In reality, this should never occur, but in the odd case 452 # that someone manages a hash collision, we reject the announce. 453 RNS.log("Received announce with valid signature and destination hash, but announced public key does not match already known public key.", RNS.LOG_CRITICAL) 454 RNS.log("This may indicate an attempt to modify network paths, or a random hash collision. The announce was rejected.", RNS.LOG_CRITICAL) 455 return False 456 457 RNS.Identity.remember(packet.get_hash(), destination_hash, public_key, app_data) 458 del announced_identity 459 460 if packet.rssi != None or packet.snr != None: 461 signal_str = " [" 462 if packet.rssi != None: 463 signal_str += "RSSI "+str(packet.rssi)+"dBm" 464 if packet.snr != None: 465 signal_str += ", " 466 if packet.snr != None: 467 signal_str += "SNR "+str(packet.snr)+"dB" 468 signal_str += "]" 469 else: 470 signal_str = "" 471 472 if hasattr(packet, "transport_id") and packet.transport_id != None: 473 RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received via "+RNS.prettyhexrep(packet.transport_id)+" on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME) 474 else: 475 RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME) 476 477 if ratchet: 478 Identity._remember_ratchet(destination_hash, ratchet) 479 480 return True 481 482 else: 483 RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Destination mismatch.", RNS.LOG_DEBUG) 484 return False 485 486 else: 487 RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Invalid signature.", RNS.LOG_DEBUG) 488 del announced_identity 489 return False 490 491 except Exception as e: 492 RNS.log("Error occurred while validating announce. The contained exception was: "+str(e), RNS.LOG_ERROR) 493 return False 494 495 @staticmethod 496 def persist_data(): 497 if not RNS.Transport.owner.is_connected_to_shared_instance: 498 Identity.save_known_destinations() 499 500 @staticmethod 501 def exit_handler(): 502 Identity.persist_data() 503 504 505 @staticmethod 506 def from_bytes(prv_bytes): 507 """ 508 Create a new :ref:`RNS.Identity<api-identity>` instance from *bytes* of private key. 509 Can be used to load previously created and saved identities into Reticulum. 510 511 :param prv_bytes: The *bytes* of private a saved private key. **HAZARD!** Never use this to generate a new key by feeding random data in prv_bytes. 512 :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the *bytes* data was invalid. 513 """ 514 identity = Identity(create_keys=False) 515 if identity.load_private_key(prv_bytes): 516 return identity 517 else: 518 return None 519 520 521 @staticmethod 522 def from_file(path): 523 """ 524 Create a new :ref:`RNS.Identity<api-identity>` instance from a file. 525 Can be used to load previously created and saved identities into Reticulum. 526 527 :param path: The full path to the saved :ref:`RNS.Identity<api-identity>` data 528 :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the loaded data was invalid. 529 """ 530 identity = Identity(create_keys=False) 531 if identity.load(path): 532 return identity 533 else: 534 return None 535 536 def to_file(self, path): 537 """ 538 Saves the identity to a file. This will write the private key to disk, 539 and anyone with access to this file will be able to decrypt all 540 communication for the identity. Be very careful with this method. 541 542 :param path: The full path specifying where to save the identity. 543 :returns: True if the file was saved, otherwise False. 544 """ 545 try: 546 with open(path, "wb") as key_file: 547 key_file.write(self.get_private_key()) 548 return True 549 return False 550 except Exception as e: 551 RNS.log("Error while saving identity to "+str(path), RNS.LOG_ERROR) 552 RNS.log("The contained exception was: "+str(e)) 553 554 def __init__(self,create_keys=True): 555 # Initialize keys to none 556 self.prv = None 557 self.prv_bytes = None 558 self.sig_prv = None 559 self.sig_prv_bytes = None 560 561 self.pub = None 562 self.pub_bytes = None 563 self.sig_pub = None 564 self.sig_pub_bytes = None 565 566 self.hash = None 567 self.hexhash = None 568 569 if create_keys: 570 self.create_keys() 571 572 def create_keys(self): 573 self.prv = X25519PrivateKey.generate() 574 self.prv_bytes = self.prv.private_bytes() 575 576 self.sig_prv = Ed25519PrivateKey.generate() 577 self.sig_prv_bytes = self.sig_prv.private_bytes() 578 579 self.pub = self.prv.public_key() 580 self.pub_bytes = self.pub.public_bytes() 581 582 self.sig_pub = self.sig_prv.public_key() 583 self.sig_pub_bytes = self.sig_pub.public_bytes() 584 585 self.update_hashes() 586 587 RNS.log("Identity keys created for "+RNS.prettyhexrep(self.hash), RNS.LOG_VERBOSE) 588 589 def get_private_key(self): 590 """ 591 :returns: The private key as *bytes* 592 """ 593 return self.prv_bytes+self.sig_prv_bytes 594 595 def get_public_key(self): 596 """ 597 :returns: The public key as *bytes* 598 """ 599 return self.pub_bytes+self.sig_pub_bytes 600 601 def load_private_key(self, prv_bytes): 602 """ 603 Load a private key into the instance. 604 605 :param prv_bytes: The private key as *bytes*. 606 :returns: True if the key was loaded, otherwise False. 607 """ 608 try: 609 self.prv_bytes = prv_bytes[:Identity.KEYSIZE//8//2] 610 self.prv = X25519PrivateKey.from_private_bytes(self.prv_bytes) 611 self.sig_prv_bytes = prv_bytes[Identity.KEYSIZE//8//2:] 612 self.sig_prv = Ed25519PrivateKey.from_private_bytes(self.sig_prv_bytes) 613 614 self.pub = self.prv.public_key() 615 self.pub_bytes = self.pub.public_bytes() 616 617 self.sig_pub = self.sig_prv.public_key() 618 self.sig_pub_bytes = self.sig_pub.public_bytes() 619 620 self.update_hashes() 621 622 return True 623 624 except Exception as e: 625 raise e 626 RNS.log("Failed to load identity key", RNS.LOG_ERROR) 627 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 628 return False 629 630 def load_public_key(self, pub_bytes): 631 """ 632 Load a public key into the instance. 633 634 :param pub_bytes: The public key as *bytes*. 635 :returns: True if the key was loaded, otherwise False. 636 """ 637 try: 638 self.pub_bytes = pub_bytes[:Identity.KEYSIZE//8//2] 639 self.sig_pub_bytes = pub_bytes[Identity.KEYSIZE//8//2:] 640 641 self.pub = X25519PublicKey.from_public_bytes(self.pub_bytes) 642 self.sig_pub = Ed25519PublicKey.from_public_bytes(self.sig_pub_bytes) 643 644 self.update_hashes() 645 except Exception as e: 646 RNS.log("Error while loading public key, the contained exception was: "+str(e), RNS.LOG_ERROR) 647 648 def update_hashes(self): 649 self.hash = Identity.truncated_hash(self.get_public_key()) 650 self.hexhash = self.hash.hex() 651 652 def load(self, path): 653 try: 654 with open(path, "rb") as key_file: 655 prv_bytes = key_file.read() 656 return self.load_private_key(prv_bytes) 657 return False 658 except Exception as e: 659 RNS.log("Error while loading identity from "+str(path), RNS.LOG_ERROR) 660 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 661 662 def get_salt(self): 663 return self.hash 664 665 def get_context(self): 666 return None 667 668 def encrypt(self, plaintext, ratchet=None): 669 """ 670 Encrypts information for the identity. 671 672 :param plaintext: The plaintext to be encrypted as *bytes*. 673 :returns: Ciphertext token as *bytes*. 674 :raises: *KeyError* if the instance does not hold a public key. 675 """ 676 if self.pub != None: 677 ephemeral_key = X25519PrivateKey.generate() 678 ephemeral_pub_bytes = ephemeral_key.public_key().public_bytes() 679 680 if ratchet != None: 681 target_public_key = X25519PublicKey.from_public_bytes(ratchet) 682 else: 683 target_public_key = self.pub 684 685 shared_key = ephemeral_key.exchange(target_public_key) 686 687 derived_key = RNS.Cryptography.hkdf( 688 length=Identity.DERIVED_KEY_LENGTH, 689 derive_from=shared_key, 690 salt=self.get_salt(), 691 context=self.get_context(), 692 ) 693 694 token = Token(derived_key) 695 ciphertext = token.encrypt(plaintext) 696 token = ephemeral_pub_bytes+ciphertext 697 698 return token 699 else: 700 raise KeyError("Encryption failed because identity does not hold a public key") 701 702 def __decrypt(self, shared_key, ciphertext): 703 derived_key = RNS.Cryptography.hkdf( 704 length=Identity.DERIVED_KEY_LENGTH, 705 derive_from=shared_key, 706 salt=self.get_salt(), 707 context=self.get_context()) 708 709 token = Token(derived_key) 710 plaintext = token.decrypt(ciphertext) 711 return plaintext 712 713 def decrypt(self, ciphertext_token, ratchets=None, enforce_ratchets=False, ratchet_id_receiver=None): 714 """ 715 Decrypts information for the identity. 716 717 :param ciphertext: The ciphertext to be decrypted as *bytes*. 718 :returns: Plaintext as *bytes*, or *None* if decryption fails. 719 :raises: *KeyError* if the instance does not hold a private key. 720 """ 721 722 if self.prv != None: 723 if len(ciphertext_token) > Identity.KEYSIZE//8//2: 724 plaintext = None 725 try: 726 peer_pub_bytes = ciphertext_token[:Identity.KEYSIZE//8//2] 727 peer_pub = X25519PublicKey.from_public_bytes(peer_pub_bytes) 728 ciphertext = ciphertext_token[Identity.KEYSIZE//8//2:] 729 730 if ratchets: 731 for ratchet in ratchets: 732 try: 733 ratchet_prv = X25519PrivateKey.from_private_bytes(ratchet) 734 ratchet_id = Identity._get_ratchet_id(ratchet_prv.public_key().public_bytes()) 735 shared_key = ratchet_prv.exchange(peer_pub) 736 plaintext = self.__decrypt(shared_key, ciphertext) 737 if ratchet_id_receiver: 738 ratchet_id_receiver.latest_ratchet_id = ratchet_id 739 740 break 741 742 except Exception as e: 743 pass 744 745 if enforce_ratchets and plaintext == None: 746 RNS.log("Decryption with ratchet enforcement by "+RNS.prettyhexrep(self.hash)+" failed. Dropping packet.", RNS.LOG_DEBUG) 747 if ratchet_id_receiver: 748 ratchet_id_receiver.latest_ratchet_id = None 749 return None 750 751 if plaintext == None: 752 shared_key = self.prv.exchange(peer_pub) 753 plaintext = self.__decrypt(shared_key, ciphertext) 754 755 if ratchet_id_receiver: 756 ratchet_id_receiver.latest_ratchet_id = None 757 758 except Exception as e: 759 RNS.log("Decryption by "+RNS.prettyhexrep(self.hash)+" failed: "+str(e), RNS.LOG_DEBUG) 760 if ratchet_id_receiver: 761 ratchet_id_receiver.latest_ratchet_id = None 762 763 return plaintext 764 765 else: 766 RNS.log("Decryption failed because the token size was invalid.", RNS.LOG_DEBUG) 767 return None 768 else: 769 raise KeyError("Decryption failed because identity does not hold a private key") 770 771 772 def sign(self, message): 773 """ 774 Signs information by the identity. 775 776 :param message: The message to be signed as *bytes*. 777 :returns: Signature as *bytes*. 778 :raises: *KeyError* if the instance does not hold a private key. 779 """ 780 if self.sig_prv != None: 781 try: 782 return self.sig_prv.sign(message) 783 except Exception as e: 784 RNS.log("The identity "+str(self)+" could not sign the requested message. The contained exception was: "+str(e), RNS.LOG_ERROR) 785 raise e 786 else: 787 raise KeyError("Signing failed because identity does not hold a private key") 788 789 def validate(self, signature, message): 790 """ 791 Validates the signature of a signed message. 792 793 :param signature: The signature to be validated as *bytes*. 794 :param message: The message to be validated as *bytes*. 795 :returns: True if the signature is valid, otherwise False. 796 :raises: *KeyError* if the instance does not hold a public key. 797 """ 798 if self.pub != None: 799 try: 800 self.sig_pub.verify(signature, message) 801 return True 802 except Exception as e: 803 return False 804 else: 805 raise KeyError("Signature validation failed because identity does not hold a public key") 806 807 def prove(self, packet, destination=None): 808 signature = self.sign(packet.packet_hash) 809 if RNS.Reticulum.should_use_implicit_proof(): 810 proof_data = signature 811 else: 812 proof_data = packet.packet_hash + signature 813 814 if destination == None: 815 destination = packet.generate_proof_destination() 816 817 proof = RNS.Packet(destination, proof_data, RNS.Packet.PROOF, attached_interface = packet.receiving_interface) 818 proof.send() 819 820 def __str__(self): 821 return RNS.prettyhexrep(self.hash)