Token.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import os 32 import time 33 34 from RNS.Cryptography import HMAC 35 from RNS.Cryptography import PKCS7 36 from RNS.Cryptography import AES 37 from RNS.Cryptography.AES import AES_128_CBC 38 from RNS.Cryptography.AES import AES_256_CBC 39 40 class Token(): 41 """ 42 This class provides a slightly modified implementation of the Fernet spec 43 found at: https://github.com/fernet/spec/blob/master/Spec.md 44 45 According to the spec, a Fernet token includes a one byte VERSION and 46 eight byte TIMESTAMP field at the start of each token. These fields are 47 not relevant to Reticulum. They are therefore stripped from this 48 implementation, since they incur overhead and leak initiator metadata. 49 """ 50 TOKEN_OVERHEAD = 48 # Bytes 51 52 @staticmethod 53 def generate_key(mode=AES_256_CBC): 54 if mode == AES_128_CBC: return os.urandom(32) 55 elif mode == AES_256_CBC: return os.urandom(64) 56 else: raise TypeError(f"Invalid token mode: {mode}") 57 58 def __init__(self, key=None, mode=AES): 59 if key == None: raise ValueError("Token key cannot be None") 60 61 if mode == AES: 62 if len(key) == 32: 63 self.mode = AES_128_CBC 64 self._signing_key = key[:16] 65 self._encryption_key = key[16:] 66 67 elif len(key) == 64: 68 self.mode = AES_256_CBC 69 self._signing_key = key[:32] 70 self._encryption_key = key[32:] 71 72 else: raise ValueError("Token key must be 128 or 256 bits, not "+str(len(key)*8)) 73 74 else: raise TypeError(f"Invalid token mode: {mode}") 75 76 77 def verify_hmac(self, token): 78 if len(token) <= 32: raise ValueError("Cannot verify HMAC on token of only "+str(len(token))+" bytes") 79 else: 80 received_hmac = token[-32:] 81 expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest() 82 83 if received_hmac == expected_hmac: return True 84 else: return False 85 86 87 def encrypt(self, data = None): 88 if not isinstance(data, bytes): raise TypeError("Token plaintext input must be bytes") 89 iv = os.urandom(16) 90 91 ciphertext = self.mode.encrypt( 92 plaintext = PKCS7.pad(data), 93 key = self._encryption_key, 94 iv = iv) 95 96 signed_parts = iv+ciphertext 97 return signed_parts + HMAC.new(self._signing_key, signed_parts).digest() 98 99 100 def decrypt(self, token = None): 101 if not isinstance(token, bytes): raise TypeError("Token must be bytes") 102 if not self.verify_hmac(token): raise ValueError("Token HMAC was invalid") 103 104 iv = token[:16] 105 ciphertext = token[16:-32] 106 107 try: 108 return PKCS7.unpad( 109 self.mode.decrypt( 110 ciphertext = ciphertext, 111 key = self._encryption_key, 112 iv = iv)) 113 114 except Exception as e: raise ValueError(f"Could not decrypt token: {e}")