Identity.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import math 32 import os 33 import RNS 34 import time 35 import atexit 36 import hashlib 37 import threading 38 39 from .vendor import umsgpack as umsgpack 40 41 from RNS.Cryptography import X25519PrivateKey, X25519PublicKey, Ed25519PrivateKey, Ed25519PublicKey 42 from RNS.Cryptography import Token 43 44 45 class Identity: 46 """ 47 This class is used to manage identities in Reticulum. It provides methods 48 for encryption, decryption, signatures and verification, and is the basis 49 for all encrypted communication over Reticulum networks. 50 51 :param create_keys: Specifies whether new encryption and signing keys should be generated. 52 """ 53 54 CURVE = "Curve25519" 55 """ 56 The curve used for Elliptic Curve DH key exchanges 57 """ 58 59 KEYSIZE = 256*2 60 """ 61 X.25519 key size in bits. A complete key is the concatenation of a 256 bit encryption key, and a 256 bit signing key. 62 """ 63 64 RATCHETSIZE = 256 65 """ 66 X.25519 ratchet key size in bits. 67 """ 68 69 RATCHET_EXPIRY = 60*60*24*30 70 """ 71 The expiry time for received ratchets in seconds, defaults to 30 days. Reticulum will always use the most recently 72 announced ratchet, and remember it for up to ``RATCHET_EXPIRY`` since receiving it, after which it will be discarded. 73 If a newer ratchet is announced in the meantime, it will be replace the already known ratchet. 74 """ 75 76 # Non-configurable constants 77 TOKEN_OVERHEAD = RNS.Cryptography.Token.TOKEN_OVERHEAD 78 AES128_BLOCKSIZE = 16 # In bytes 79 HASHLENGTH = 256 # In bits 80 SIGLENGTH = KEYSIZE # In bits 81 82 NAME_HASH_LENGTH = 80 83 TRUNCATED_HASHLENGTH = RNS.Reticulum.TRUNCATED_HASHLENGTH 84 """ 85 Constant specifying the truncated hash length (in bits) used by Reticulum 86 for addressable hashes and other purposes. Non-configurable. 87 """ 88 89 DERIVED_KEY_LENGTH = 512//8 90 DERIVED_KEY_LENGTH_LEGACY = 256//8 91 92 # Storage 93 known_destinations = {} 94 known_ratchets = {} 95 96 ratchet_persist_lock = threading.Lock() 97 98 @staticmethod 99 def remember(packet_hash, destination_hash, public_key, app_data = None): 100 if len(public_key) != Identity.KEYSIZE//8: 101 raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR) 102 else: 103 Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data] 104 105 106 @staticmethod 107 def recall(target_hash, from_identity_hash=False): 108 """ 109 Recall identity for a destination or identity hash. By default, this function 110 will return the identity associated with a given *destination* hash. As an 111 example, if you know the ``lxmf.delivery`` destination hash of an endpoint, 112 this function will return the associated underlying identity. You can also 113 search for an identity from a known *identity hash*, by setting the 114 ``from_identity_hash`` argument. 115 116 :param target_hash: Destination or identity hash as *bytes*. 117 :param from_identity_hash: Whether to search based on identity hash instead of destination hash as *bool*. 118 :returns: An :ref:`RNS.Identity<api-identity>` instance that can be used to create an outgoing :ref:`RNS.Destination<api-destination>`, or *None* if the destination is unknown. 119 """ 120 if from_identity_hash: 121 for destination_hash in Identity.known_destinations: 122 if target_hash == Identity.truncated_hash(Identity.known_destinations[destination_hash][2]): 123 identity_data = Identity.known_destinations[destination_hash] 124 identity = Identity(create_keys=False) 125 identity.load_public_key(identity_data[2]) 126 identity.app_data = identity_data[3] 127 return identity 128 129 return None 130 131 else: 132 if target_hash in Identity.known_destinations: 133 identity_data = Identity.known_destinations[target_hash] 134 identity = Identity(create_keys=False) 135 identity.load_public_key(identity_data[2]) 136 identity.app_data = identity_data[3] 137 return identity 138 else: 139 for registered_destination in RNS.Transport.destinations: 140 if target_hash == registered_destination.hash: 141 identity = Identity(create_keys=False) 142 identity.load_public_key(registered_destination.identity.get_public_key()) 143 identity.app_data = None 144 return identity 145 146 return None 147 148 @staticmethod 149 def recall_app_data(destination_hash): 150 """ 151 Recall last heard app_data for a destination hash. 152 153 :param destination_hash: Destination hash as *bytes*. 154 :returns: *Bytes* containing app_data, or *None* if the destination is unknown. 155 """ 156 if destination_hash in Identity.known_destinations: 157 app_data = Identity.known_destinations[destination_hash][3] 158 return app_data 159 else: 160 return None 161 162 @staticmethod 163 def save_known_destinations(): 164 # TODO: Improve the storage method so we don't have to 165 # deserialize and serialize the entire table on every 166 # save, but the only changes. It might be possible to 167 # simply overwrite on exit now that every local client 168 # disconnect triggers a data persist. 169 170 try: 171 if hasattr(Identity, "saving_known_destinations"): 172 wait_interval = 0.2 173 wait_timeout = 5 174 wait_start = time.time() 175 while Identity.saving_known_destinations: 176 time.sleep(wait_interval) 177 if time.time() > wait_start+wait_timeout: 178 RNS.log("Could not save known destinations to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR) 179 return False 180 181 Identity.saving_known_destinations = True 182 save_start = time.time() 183 184 storage_known_destinations = {} 185 if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"): 186 try: 187 with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file: 188 storage_known_destinations = umsgpack.load(file) 189 190 except: 191 pass 192 193 try: 194 for destination_hash in storage_known_destinations: 195 if not destination_hash in Identity.known_destinations: 196 Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash] 197 except Exception as e: 198 RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING) 199 200 RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_DEBUG) 201 with open(RNS.Reticulum.storagepath+"/known_destinations","wb") as file: 202 umsgpack.dump(Identity.known_destinations, file) 203 204 205 save_time = time.time() - save_start 206 if save_time < 1: 207 time_str = str(round(save_time*1000,2))+"ms" 208 else: 209 time_str = str(round(save_time,2))+"s" 210 211 RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_DEBUG) 212 213 except Exception as e: 214 RNS.log("Error while saving known destinations to disk, the contained exception was: "+str(e), RNS.LOG_ERROR) 215 RNS.trace_exception(e) 216 217 Identity.saving_known_destinations = False 218 219 @staticmethod 220 def load_known_destinations(): 221 if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"): 222 try: 223 with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file: 224 loaded_known_destinations = umsgpack.load(file) 225 226 Identity.known_destinations = {} 227 for known_destination in loaded_known_destinations: 228 if len(known_destination) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: 229 Identity.known_destinations[known_destination] = loaded_known_destinations[known_destination] 230 231 RNS.log("Loaded "+str(len(Identity.known_destinations))+" known destination from storage", RNS.LOG_VERBOSE) 232 233 except Exception as e: 234 RNS.log("Error loading known destinations from disk, file will be recreated on exit", RNS.LOG_ERROR) 235 else: 236 RNS.log("Destinations file does not exist, no known destinations loaded", RNS.LOG_VERBOSE) 237 238 @staticmethod 239 def full_hash(data): 240 """ 241 Get a SHA-256 hash of passed data. 242 243 :param data: Data to be hashed as *bytes*. 244 :returns: SHA-256 hash as *bytes*. 245 """ 246 return RNS.Cryptography.sha256(data) 247 248 @staticmethod 249 def truncated_hash(data): 250 """ 251 Get a truncated SHA-256 hash of passed data. 252 253 :param data: Data to be hashed as *bytes*. 254 :returns: Truncated SHA-256 hash as *bytes*. 255 """ 256 return Identity.full_hash(data)[:(Identity.TRUNCATED_HASHLENGTH//8)] 257 258 @staticmethod 259 def get_random_hash(): 260 """ 261 Get a random SHA-256 hash. 262 263 :param data: Data to be hashed as *bytes*. 264 :returns: Truncated SHA-256 hash of random data as *bytes*. 265 """ 266 return Identity.truncated_hash(os.urandom(Identity.TRUNCATED_HASHLENGTH//8)) 267 268 @staticmethod 269 def current_ratchet_id(destination_hash): 270 """ 271 Get the ID of the currently used ratchet key for a given destination hash 272 273 :param destination_hash: A destination hash as *bytes*. 274 :returns: A ratchet ID as *bytes* or *None*. 275 """ 276 ratchet = Identity.get_ratchet(destination_hash) 277 if ratchet == None: 278 return None 279 else: 280 return Identity._get_ratchet_id(ratchet) 281 282 @staticmethod 283 def _get_ratchet_id(ratchet_pub_bytes): 284 return Identity.full_hash(ratchet_pub_bytes)[:Identity.NAME_HASH_LENGTH//8] 285 286 @staticmethod 287 def _ratchet_public_bytes(ratchet): 288 return X25519PrivateKey.from_private_bytes(ratchet).public_key().public_bytes() 289 290 @staticmethod 291 def _generate_ratchet(): 292 ratchet_prv = X25519PrivateKey.generate() 293 ratchet_pub = ratchet_prv.public_key() 294 return ratchet_prv.private_bytes() 295 296 @staticmethod 297 def _remember_ratchet(destination_hash, ratchet): 298 try: 299 if destination_hash in Identity.known_ratchets and Identity.known_ratchets[destination_hash] == ratchet: 300 ratchet_exists = True 301 else: 302 ratchet_exists = False 303 304 if not ratchet_exists: 305 RNS.log(f"Remembering ratchet {RNS.prettyhexrep(Identity._get_ratchet_id(ratchet))} for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_EXTREME) 306 Identity.known_ratchets[destination_hash] = ratchet 307 if not RNS.Transport.owner.is_connected_to_shared_instance: 308 def persist_job(): 309 with Identity.ratchet_persist_lock: 310 hexhash = RNS.hexrep(destination_hash, delimit=False) 311 ratchet_data = {"ratchet": ratchet, "received": time.time()} 312 313 ratchetdir = RNS.Reticulum.storagepath+"/ratchets" 314 315 if not os.path.isdir(ratchetdir): 316 os.makedirs(ratchetdir) 317 318 outpath = f"{ratchetdir}/{hexhash}.out" 319 finalpath = f"{ratchetdir}/{hexhash}" 320 with open(outpath, "wb") as ratchet_file: 321 ratchet_file.write(umsgpack.packb(ratchet_data)) 322 os.replace(outpath, finalpath) 323 324 325 threading.Thread(target=persist_job, daemon=True).start() 326 327 except Exception as e: 328 RNS.log(f"Could not persist ratchet for {RNS.prettyhexrep(destination_hash)} to storage.", RNS.LOG_ERROR) 329 RNS.log(f"The contained exception was: {e}") 330 RNS.trace_exception(e) 331 332 @staticmethod 333 def _clean_ratchets(): 334 RNS.log("Cleaning ratchets...", RNS.LOG_DEBUG) 335 try: 336 now = time.time() 337 ratchetdir = RNS.Reticulum.storagepath+"/ratchets" 338 if os.path.isdir(ratchetdir): 339 for filename in os.listdir(ratchetdir): 340 try: 341 expired = False 342 corrupted = False 343 with open(f"{ratchetdir}/{filename}", "rb") as rf: 344 # TODO: Remove individual ratchet file if corrupt 345 try: 346 ratchet_data = umsgpack.unpackb(rf.read()) 347 if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY: 348 expired = True 349 350 except Exception as e: 351 RNS.log(f"Corrupted ratchet data while reading {ratchetdir}/{filename}, removing file", RNS.LOG_ERROR) 352 corrupted = True 353 354 if expired or corrupted: 355 os.unlink(f"{ratchetdir}/{filename}") 356 357 except Exception as e: 358 RNS.log(f"An error occurred while cleaning ratchets, in the processing of {ratchetdir}/{filename}.", RNS.LOG_ERROR) 359 RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR) 360 361 except Exception as e: 362 RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR) 363 364 @staticmethod 365 def get_ratchet(destination_hash): 366 if not destination_hash in Identity.known_ratchets: 367 ratchetdir = RNS.Reticulum.storagepath+"/ratchets" 368 hexhash = RNS.hexrep(destination_hash, delimit=False) 369 ratchet_path = f"{ratchetdir}/{hexhash}" 370 if os.path.isfile(ratchet_path): 371 try: 372 with open(ratchet_path, "rb") as ratchet_file: 373 ratchet_data = umsgpack.unpackb(ratchet_file.read()) 374 if time.time() < ratchet_data["received"]+Identity.RATCHET_EXPIRY and len(ratchet_data["ratchet"]) == Identity.RATCHETSIZE//8: 375 Identity.known_ratchets[destination_hash] = ratchet_data["ratchet"] 376 else: 377 return None 378 379 except Exception as e: 380 RNS.log(f"An error occurred while loading ratchet data for {RNS.prettyhexrep(destination_hash)} from storage.", RNS.LOG_ERROR) 381 RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR) 382 return None 383 384 if destination_hash in Identity.known_ratchets: 385 return Identity.known_ratchets[destination_hash] 386 else: 387 RNS.log(f"Could not load ratchet for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) 388 return None 389 390 @staticmethod 391 def validate_announce(packet, only_validate_signature=False): 392 try: 393 if packet.packet_type == RNS.Packet.ANNOUNCE: 394 keysize = Identity.KEYSIZE//8 395 ratchetsize = Identity.RATCHETSIZE//8 396 name_hash_len = Identity.NAME_HASH_LENGTH//8 397 sig_len = Identity.SIGLENGTH//8 398 destination_hash = packet.destination_hash 399 400 # Get public key bytes from announce 401 public_key = packet.data[:keysize] 402 403 # If the packet context flag is set, 404 # this announce contains a new ratchet 405 if packet.context_flag == RNS.Packet.FLAG_SET: 406 name_hash = packet.data[keysize:keysize+name_hash_len ] 407 random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10] 408 ratchet = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+ratchetsize] 409 signature = packet.data[keysize+name_hash_len+10+ratchetsize:keysize+name_hash_len+10+ratchetsize+sig_len] 410 app_data = b"" 411 if len(packet.data) > keysize+name_hash_len+10+sig_len+ratchetsize: 412 app_data = packet.data[keysize+name_hash_len+10+sig_len+ratchetsize:] 413 414 # If the packet context flag is not set, 415 # this announce does not contain a ratchet 416 else: 417 ratchet = b"" 418 name_hash = packet.data[keysize:keysize+name_hash_len] 419 random_hash = packet.data[keysize+name_hash_len:keysize+name_hash_len+10] 420 signature = packet.data[keysize+name_hash_len+10:keysize+name_hash_len+10+sig_len] 421 app_data = b"" 422 if len(packet.data) > keysize+name_hash_len+10+sig_len: 423 app_data = packet.data[keysize+name_hash_len+10+sig_len:] 424 425 signed_data = destination_hash+public_key+name_hash+random_hash+ratchet+app_data 426 427 if not len(packet.data) > Identity.KEYSIZE//8+Identity.NAME_HASH_LENGTH//8+10+Identity.SIGLENGTH//8: 428 app_data = None 429 430 announced_identity = Identity(create_keys=False) 431 announced_identity.load_public_key(public_key) 432 433 if announced_identity.pub != None and announced_identity.validate(signature, signed_data): 434 if only_validate_signature: 435 del announced_identity 436 return True 437 438 hash_material = name_hash+announced_identity.hash 439 expected_hash = RNS.Identity.full_hash(hash_material)[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8] 440 441 if destination_hash == expected_hash: 442 # Check if we already have a public key for this destination 443 # and make sure the public key is not different. 444 if destination_hash in Identity.known_destinations: 445 if public_key != Identity.known_destinations[destination_hash][2]: 446 # In reality, this should never occur, but in the odd case 447 # that someone manages a hash collision, we reject the announce. 448 RNS.log("Received announce with valid signature and destination hash, but announced public key does not match already known public key.", RNS.LOG_CRITICAL) 449 RNS.log("This may indicate an attempt to modify network paths, or a random hash collision. The announce was rejected.", RNS.LOG_CRITICAL) 450 return False 451 452 RNS.Identity.remember(packet.get_hash(), destination_hash, public_key, app_data) 453 del announced_identity 454 455 if packet.rssi != None or packet.snr != None: 456 signal_str = " [" 457 if packet.rssi != None: 458 signal_str += "RSSI "+str(packet.rssi)+"dBm" 459 if packet.snr != None: 460 signal_str += ", " 461 if packet.snr != None: 462 signal_str += "SNR "+str(packet.snr)+"dB" 463 signal_str += "]" 464 else: 465 signal_str = "" 466 467 if hasattr(packet, "transport_id") and packet.transport_id != None: 468 RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received via "+RNS.prettyhexrep(packet.transport_id)+" on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME) 469 else: 470 RNS.log("Valid announce for "+RNS.prettyhexrep(destination_hash)+" "+str(packet.hops)+" hops away, received on "+str(packet.receiving_interface)+signal_str, RNS.LOG_EXTREME) 471 472 if ratchet: 473 Identity._remember_ratchet(destination_hash, ratchet) 474 475 return True 476 477 else: 478 RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Destination mismatch.", RNS.LOG_DEBUG) 479 return False 480 481 else: 482 RNS.log("Received invalid announce for "+RNS.prettyhexrep(destination_hash)+": Invalid signature.", RNS.LOG_DEBUG) 483 del announced_identity 484 return False 485 486 except Exception as e: 487 RNS.log("Error occurred while validating announce. The contained exception was: "+str(e), RNS.LOG_ERROR) 488 return False 489 490 @staticmethod 491 def persist_data(): 492 if not RNS.Transport.owner.is_connected_to_shared_instance: 493 Identity.save_known_destinations() 494 495 @staticmethod 496 def exit_handler(): 497 Identity.persist_data() 498 499 500 @staticmethod 501 def from_bytes(prv_bytes): 502 """ 503 Create a new :ref:`RNS.Identity<api-identity>` instance from *bytes* of private key. 504 Can be used to load previously created and saved identities into Reticulum. 505 506 :param prv_bytes: The *bytes* of private a saved private key. **HAZARD!** Never use this to generate a new key by feeding random data in prv_bytes. 507 :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the *bytes* data was invalid. 508 """ 509 identity = Identity(create_keys=False) 510 if identity.load_private_key(prv_bytes): 511 return identity 512 else: 513 return None 514 515 516 @staticmethod 517 def from_file(path): 518 """ 519 Create a new :ref:`RNS.Identity<api-identity>` instance from a file. 520 Can be used to load previously created and saved identities into Reticulum. 521 522 :param path: The full path to the saved :ref:`RNS.Identity<api-identity>` data 523 :returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the loaded data was invalid. 524 """ 525 identity = Identity(create_keys=False) 526 if identity.load(path): 527 return identity 528 else: 529 return None 530 531 def to_file(self, path): 532 """ 533 Saves the identity to a file. This will write the private key to disk, 534 and anyone with access to this file will be able to decrypt all 535 communication for the identity. Be very careful with this method. 536 537 :param path: The full path specifying where to save the identity. 538 :returns: True if the file was saved, otherwise False. 539 """ 540 try: 541 with open(path, "wb") as key_file: 542 key_file.write(self.get_private_key()) 543 return True 544 return False 545 except Exception as e: 546 RNS.log("Error while saving identity to "+str(path), RNS.LOG_ERROR) 547 RNS.log("The contained exception was: "+str(e)) 548 549 def __init__(self,create_keys=True): 550 # Initialize keys to none 551 self.prv = None 552 self.prv_bytes = None 553 self.sig_prv = None 554 self.sig_prv_bytes = None 555 556 self.pub = None 557 self.pub_bytes = None 558 self.sig_pub = None 559 self.sig_pub_bytes = None 560 561 self.hash = None 562 self.hexhash = None 563 564 if create_keys: 565 self.create_keys() 566 567 def create_keys(self): 568 self.prv = X25519PrivateKey.generate() 569 self.prv_bytes = self.prv.private_bytes() 570 571 self.sig_prv = Ed25519PrivateKey.generate() 572 self.sig_prv_bytes = self.sig_prv.private_bytes() 573 574 self.pub = self.prv.public_key() 575 self.pub_bytes = self.pub.public_bytes() 576 577 self.sig_pub = self.sig_prv.public_key() 578 self.sig_pub_bytes = self.sig_pub.public_bytes() 579 580 self.update_hashes() 581 582 RNS.log("Identity keys created for "+RNS.prettyhexrep(self.hash), RNS.LOG_VERBOSE) 583 584 def get_private_key(self): 585 """ 586 :returns: The private key as *bytes* 587 """ 588 return self.prv_bytes+self.sig_prv_bytes 589 590 def get_public_key(self): 591 """ 592 :returns: The public key as *bytes* 593 """ 594 return self.pub_bytes+self.sig_pub_bytes 595 596 def load_private_key(self, prv_bytes): 597 """ 598 Load a private key into the instance. 599 600 :param prv_bytes: The private key as *bytes*. 601 :returns: True if the key was loaded, otherwise False. 602 """ 603 try: 604 self.prv_bytes = prv_bytes[:Identity.KEYSIZE//8//2] 605 self.prv = X25519PrivateKey.from_private_bytes(self.prv_bytes) 606 self.sig_prv_bytes = prv_bytes[Identity.KEYSIZE//8//2:] 607 self.sig_prv = Ed25519PrivateKey.from_private_bytes(self.sig_prv_bytes) 608 609 self.pub = self.prv.public_key() 610 self.pub_bytes = self.pub.public_bytes() 611 612 self.sig_pub = self.sig_prv.public_key() 613 self.sig_pub_bytes = self.sig_pub.public_bytes() 614 615 self.update_hashes() 616 617 return True 618 619 except Exception as e: 620 raise e 621 RNS.log("Failed to load identity key", RNS.LOG_ERROR) 622 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 623 return False 624 625 def load_public_key(self, pub_bytes): 626 """ 627 Load a public key into the instance. 628 629 :param pub_bytes: The public key as *bytes*. 630 :returns: True if the key was loaded, otherwise False. 631 """ 632 try: 633 self.pub_bytes = pub_bytes[:Identity.KEYSIZE//8//2] 634 self.sig_pub_bytes = pub_bytes[Identity.KEYSIZE//8//2:] 635 636 self.pub = X25519PublicKey.from_public_bytes(self.pub_bytes) 637 self.sig_pub = Ed25519PublicKey.from_public_bytes(self.sig_pub_bytes) 638 639 self.update_hashes() 640 except Exception as e: 641 RNS.log("Error while loading public key, the contained exception was: "+str(e), RNS.LOG_ERROR) 642 643 def update_hashes(self): 644 self.hash = Identity.truncated_hash(self.get_public_key()) 645 self.hexhash = self.hash.hex() 646 647 def load(self, path): 648 try: 649 with open(path, "rb") as key_file: 650 prv_bytes = key_file.read() 651 return self.load_private_key(prv_bytes) 652 return False 653 except Exception as e: 654 RNS.log("Error while loading identity from "+str(path), RNS.LOG_ERROR) 655 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 656 657 def get_salt(self): 658 return self.hash 659 660 def get_context(self): 661 return None 662 663 def encrypt(self, plaintext, ratchet=None): 664 """ 665 Encrypts information for the identity. 666 667 :param plaintext: The plaintext to be encrypted as *bytes*. 668 :returns: Ciphertext token as *bytes*. 669 :raises: *KeyError* if the instance does not hold a public key. 670 """ 671 if self.pub != None: 672 ephemeral_key = X25519PrivateKey.generate() 673 ephemeral_pub_bytes = ephemeral_key.public_key().public_bytes() 674 675 if ratchet != None: 676 target_public_key = X25519PublicKey.from_public_bytes(ratchet) 677 else: 678 target_public_key = self.pub 679 680 shared_key = ephemeral_key.exchange(target_public_key) 681 682 derived_key = RNS.Cryptography.hkdf( 683 length=Identity.DERIVED_KEY_LENGTH, 684 derive_from=shared_key, 685 salt=self.get_salt(), 686 context=self.get_context(), 687 ) 688 689 token = Token(derived_key) 690 ciphertext = token.encrypt(plaintext) 691 token = ephemeral_pub_bytes+ciphertext 692 693 return token 694 else: 695 raise KeyError("Encryption failed because identity does not hold a public key") 696 697 def __decrypt(self, shared_key, ciphertext): 698 derived_key = RNS.Cryptography.hkdf( 699 length=Identity.DERIVED_KEY_LENGTH, 700 derive_from=shared_key, 701 salt=self.get_salt(), 702 context=self.get_context()) 703 704 token = Token(derived_key) 705 plaintext = token.decrypt(ciphertext) 706 return plaintext 707 708 def decrypt(self, ciphertext_token, ratchets=None, enforce_ratchets=False, ratchet_id_receiver=None): 709 """ 710 Decrypts information for the identity. 711 712 :param ciphertext: The ciphertext to be decrypted as *bytes*. 713 :returns: Plaintext as *bytes*, or *None* if decryption fails. 714 :raises: *KeyError* if the instance does not hold a private key. 715 """ 716 717 if self.prv != None: 718 if len(ciphertext_token) > Identity.KEYSIZE//8//2: 719 plaintext = None 720 try: 721 peer_pub_bytes = ciphertext_token[:Identity.KEYSIZE//8//2] 722 peer_pub = X25519PublicKey.from_public_bytes(peer_pub_bytes) 723 ciphertext = ciphertext_token[Identity.KEYSIZE//8//2:] 724 725 if ratchets: 726 for ratchet in ratchets: 727 try: 728 ratchet_prv = X25519PrivateKey.from_private_bytes(ratchet) 729 ratchet_id = Identity._get_ratchet_id(ratchet_prv.public_key().public_bytes()) 730 shared_key = ratchet_prv.exchange(peer_pub) 731 plaintext = self.__decrypt(shared_key, ciphertext) 732 if ratchet_id_receiver: 733 ratchet_id_receiver.latest_ratchet_id = ratchet_id 734 735 break 736 737 except Exception as e: 738 pass 739 740 if enforce_ratchets and plaintext == None: 741 RNS.log("Decryption with ratchet enforcement by "+RNS.prettyhexrep(self.hash)+" failed. Dropping packet.", RNS.LOG_DEBUG) 742 if ratchet_id_receiver: 743 ratchet_id_receiver.latest_ratchet_id = None 744 return None 745 746 if plaintext == None: 747 shared_key = self.prv.exchange(peer_pub) 748 plaintext = self.__decrypt(shared_key, ciphertext) 749 750 if ratchet_id_receiver: 751 ratchet_id_receiver.latest_ratchet_id = None 752 753 except Exception as e: 754 RNS.log("Decryption by "+RNS.prettyhexrep(self.hash)+" failed: "+str(e), RNS.LOG_DEBUG) 755 if ratchet_id_receiver: 756 ratchet_id_receiver.latest_ratchet_id = None 757 758 return plaintext 759 760 else: 761 RNS.log("Decryption failed because the token size was invalid.", RNS.LOG_DEBUG) 762 return None 763 else: 764 raise KeyError("Decryption failed because identity does not hold a private key") 765 766 767 def sign(self, message): 768 """ 769 Signs information by the identity. 770 771 :param message: The message to be signed as *bytes*. 772 :returns: Signature as *bytes*. 773 :raises: *KeyError* if the instance does not hold a private key. 774 """ 775 if self.sig_prv != None: 776 try: 777 return self.sig_prv.sign(message) 778 except Exception as e: 779 RNS.log("The identity "+str(self)+" could not sign the requested message. The contained exception was: "+str(e), RNS.LOG_ERROR) 780 raise e 781 else: 782 raise KeyError("Signing failed because identity does not hold a private key") 783 784 def validate(self, signature, message): 785 """ 786 Validates the signature of a signed message. 787 788 :param signature: The signature to be validated as *bytes*. 789 :param message: The message to be validated as *bytes*. 790 :returns: True if the signature is valid, otherwise False. 791 :raises: *KeyError* if the instance does not hold a public key. 792 """ 793 if self.pub != None: 794 try: 795 self.sig_pub.verify(signature, message) 796 return True 797 except Exception as e: 798 return False 799 else: 800 raise KeyError("Signature validation failed because identity does not hold a public key") 801 802 def prove(self, packet, destination=None): 803 signature = self.sign(packet.packet_hash) 804 if RNS.Reticulum.should_use_implicit_proof(): 805 proof_data = signature 806 else: 807 proof_data = packet.packet_hash + signature 808 809 if destination == None: 810 destination = packet.generate_proof_destination() 811 812 proof = RNS.Packet(destination, proof_data, RNS.Packet.PROOF, attached_interface = packet.receiving_interface) 813 proof.send() 814 815 def __str__(self): 816 return RNS.prettyhexrep(self.hash)