identity.py
1 import unittest 2 3 import time 4 import RNS 5 import os 6 7 signed_message = "e51a008b8b8ba855993d8892a40daad84a6fb69a7138e1b5f69b427fe03449826ab6ccb81f0d72b4725e8d55c814d3e8e151b495cf5b59702f197ec366d935ad04a98ca519d6964f96ea09910b020351d1cdff3befbad323a2a28a6ec7ced4d0d67f02c525f93b321d9b076d704408475bd2d123cd51916f7e49039246ac56add37ef87e32d7f9853ac44a7f77d26fedc83e4e67a45742b751c2599309f5eda6efa0dafd957f61af1f0e86c4d6c5052e0e5fa577db99846f2b7a0204c31cef4013ca51cb307506c9209fd18d0195a7c9ae628af1a1d9ee7a4cf30037ed190a9fdcaa4ce5bb7bea19803cb5b5cea8c21fdb98d8f73ff5aaad87f5f6c3b7bcfe8974e5b063cc1113d77b9e96bec1c9d10ed37b780c3f7349a34092bb3968daeced40eb0b5130c0d11595e30b9671896385d04289d067f671599386536eed8430a72e186fb95023d5ac5dd442443bfabfe13a84a38d060af73bf20f921f38a768672fdbcb1dfece7458166e2e15948d6b4fa81f42db48747d283c670f576a0b410b31a70d2594823d0e29135a488cb0408c9e5bc1e197ff99aef471924231ccc8e3eddc82dbcea4801f14c5fc7a389a26a52cc93cfe0770953ef595ff410b7033a6ed5c975dd922b3f48f9dffcfb412eeed5758f3aa51de7eb47cd2cb" 8 sig_from_key_0 = "3020ef58f861591826a61c3d2d4a25b949cdb3094085ba6b1177a6f2a05f3cdd24d1095d6fdd078f0b2826e80b261c93c1ff97fbfd4857f25706d57dd073590c" 9 10 encrypted_message = "71884a271ead43558fcf1e331c5aebcd43498f16da16f8056b0893ce6b15d521eaa4f31639cd34da1b57995944076c4f14f300f2d2612111d21a3429a9966ac1da68545c00c7887d8b26f6c1ab9defa020b9519849ca41b7904199882802b6542771df85144a79890289d3c02daef6c26652c5ce9de231a2" 11 fixed_token = "e37705f9b432d3711acf028678b0b9d37fdf7e00a3b47c95251aad61447df2620b5b9978783c3d9f2fb762e68c8b57c554928fb70dd79c1033ce5865f91761aad3e992790f63456092cb69b7b045f539147f7ba10d480e300f193576ae2d75a7884809b76bd17e05a735383305c0aa5621395bbf51e8cc66c1c536f339f2bea600f08f8f9a76564b2522cd904b6c2b6e553ec3d4df718ae70434c734297b313539338d184d2c64a9c4ddbc9b9a4947d0b45f5a274f65ae9f6bb203562fd5cede6abd3c615b699156e08fa33b841647a0" 12 13 fixed_keys = [ 14 ("f8953ffaf607627e615603ff1530c82c434cf87c07179dd7689ea776f30b964cfb7ba6164af00c5111a45e69e57d885e1285f8dbfe3a21e95ae17cf676b0f8b7", "650b5d76b6bec0390d1f8cfca5bd33f9"), 15 ("d85d036245436a3c33d3228affae06721f8203bc364ee0ee7556368ac62add650ebf8f926abf628da9d92baaa12db89bd6516ee92ec29765f3afafcb8622d697", "1469e89450c361b253aefb0c606b6111"), 16 ("8893e2bfd30fc08455997caf7abb7a6341716768dbbf9a91cc1455bd7eeaf74cdc10ec72a4d4179696040bac620ee97ebc861e2443e5270537ae766d91b58181", "e5fe93ee4acba095b3b9b6541515ed3e"), 17 ("b82c7a4f047561d974de7e38538281d7f005d3663615f30d9663bad35a716063c931672cd452175d55bcdd70bb7aa35a9706872a97963dc52029938ea7341b39", "1333b911fa8ebb16726996adbe3c6262"), 18 ("08bb35f92b06a0832991165a0d9b4fd91af7b7765ce4572aa6222070b11b767092b61b0fd18b3a59cae6deb9db6d4bfb1c7fcfe076cfd66eea7ddd5f877543b9", "d13712efc45ef87674fb5ac26c37c912"), 19 ] 20 21 class TestIdentity(unittest.TestCase): 22 23 def test_0_create_from_bytes(self): 24 for entry in fixed_keys: 25 key, id_hash = entry 26 i = RNS.Identity.from_bytes(bytes.fromhex(key)) 27 self.assertEqual(i.hash, bytes.fromhex(id_hash)) 28 self.assertEqual(i.get_private_key(), bytes.fromhex(key)) 29 30 def test_1_sign(self): 31 print("") 32 33 # Test known signature 34 fid = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) 35 sig = fid.sign(signed_message.encode("utf-8")) 36 37 self.assertEqual(sig, bytes.fromhex(sig_from_key_0)) 38 39 # Test signature time jitter 40 id1 = RNS.Identity() 41 id2 = RNS.Identity(create_keys=False) 42 id2.load_public_key(id1.get_public_key()) 43 44 if RNS.Cryptography.backend() == "internal": 45 rounds = 2000 46 else: 47 rounds = 20000 48 49 times = [] 50 for i in range(1, rounds): 51 msg = os.urandom(512) 52 start = time.time() 53 signature = id1.sign(msg) 54 t = time.time() - start 55 times.append(t) 56 57 import statistics 58 tmin = min(times)*1000 59 tmax = max(times)*1000 60 tmean = (sum(times)/len(times))*1000 61 tmed = statistics.median(times)*1000 62 tmdev = tmax - tmin 63 mpct = (tmax/tmed)*100 64 print("Random messages:") 65 print(" Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3))) 66 print(" Max deviation from median: "+str(round(mpct, 1))+"%") 67 print() 68 69 id1 = RNS.Identity() 70 id2 = RNS.Identity(create_keys=False) 71 id2.load_public_key(id1.get_public_key()) 72 73 times = [] 74 for i in range(1, rounds): 75 msg = bytes([0x00])*512 76 start = time.time() 77 signature = id1.sign(msg) 78 t = time.time() - start 79 times.append(t) 80 81 tmin = min(times)*1000 82 tmax = max(times)*1000 83 tmean = (sum(times)/len(times))*1000 84 tmed = statistics.median(times)*1000 85 tmdev = tmax - tmin 86 mpct = (tmax/tmed)*100 87 print("All 0xff messages:") 88 print(" Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3))) 89 print(" Max deviation from median: "+str(round(mpct, 1))+"%") 90 print() 91 92 id1 = RNS.Identity() 93 id2 = RNS.Identity(create_keys=False) 94 id2.load_public_key(id1.get_public_key()) 95 96 times = [] 97 for i in range(1, rounds): 98 msg = bytes([0xff])*512 99 start = time.time() 100 signature = id1.sign(msg) 101 t = time.time() - start 102 times.append(t) 103 104 tmin = min(times)*1000 105 tmax = max(times)*1000 106 tmean = (sum(times)/len(times))*1000 107 tmed = statistics.median(times)*1000 108 tmdev = tmax - tmin 109 mpct = (tmax/tmed)*100 110 print("All 0x00 messages:") 111 print(" Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3))) 112 print(" Max deviation from median: "+str(round(mpct, 1))+"%") 113 print() 114 115 b = 0 116 t = 0 117 for i in range(1, 500): 118 mlen = i % (RNS.Reticulum.MTU//2) + (RNS.Reticulum.MTU//2) 119 msg = os.urandom(mlen) 120 b += mlen 121 id1 = RNS.Identity() 122 id2 = RNS.Identity(create_keys=False) 123 id2.load_public_key(id1.get_public_key()) 124 125 start = time.time() 126 signature = id1.sign(msg) 127 self.assertEqual(True, id2.validate(signature, msg)) 128 t += time.time() - start 129 130 print("Sign/validate chunks < MTU: "+self.size_str(b/t, "b")+"ps") 131 132 for i in range(1, 500): 133 mlen = 16*1024 134 msg = os.urandom(mlen) 135 b += mlen 136 id1 = RNS.Identity() 137 id2 = RNS.Identity(create_keys=False) 138 id2.load_public_key(id1.get_public_key()) 139 140 start = time.time() 141 signature = id1.sign(msg) 142 self.assertEqual(True, id2.validate(signature, msg)) 143 t += time.time() - start 144 145 print("Sign/validate 16KB chunks: "+self.size_str(b/t, "b")+"ps") 146 147 148 def test_2_encrypt(self): 149 print("") 150 151 # Test loading identity from bytes 152 fid = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) 153 self.assertEqual(fid.hash, bytes.fromhex(fixed_keys[0][1])) 154 155 # Test decryption of known AES-256 token 156 print("Testing decryption of known token") 157 plaintext = fid.decrypt(bytes.fromhex(fixed_token)) 158 self.assertEqual(plaintext, bytes.fromhex(encrypted_message)) 159 160 # Test encrypt and decrypt of random chunks 161 print("Testing random small chunk encrypt/decrypt") 162 b = 0 163 e_t = 0 164 d_t = 0 165 e_times = [] 166 d_times = [] 167 for i in range(1, 500): 168 mlen = i % (RNS.Reticulum.MTU//2) + (RNS.Reticulum.MTU//2) 169 msg = os.urandom(mlen) 170 b += mlen 171 id1 = RNS.Identity() 172 id2 = RNS.Identity(create_keys=False) 173 id2.load_public_key(id1.get_public_key()) 174 175 e_start = time.time() 176 token = id2.encrypt(msg) 177 e_now = time.time() 178 e_t += e_now - e_start 179 e_times.append(e_now - e_start) 180 181 d_start = time.time() 182 decrypted = id1.decrypt(token) 183 self.assertEqual(msg, decrypted) 184 d_now = time.time() 185 d_t += d_now - d_start 186 d_times.append(d_now - d_start) 187 188 import statistics 189 e_tmin = min(e_times)*1000; d_tmin = min(d_times)*1000 190 e_tmax = max(e_times)*1000; d_tmax = max(d_times)*1000 191 e_tmean = (sum(e_times)/len(e_times))*1000; d_tmean = (sum(d_times)/len(d_times))*1000 192 e_tmed = statistics.median(e_times)*1000; d_tmed = statistics.median(d_times)*1000 193 e_tmdev = e_tmax - e_tmin; d_tmdev = d_tmax - d_tmin 194 e_mpct = (e_tmax/e_tmed)*100; d_mpct = (d_tmax/d_tmed)*100 195 196 print(" Encrypt chunks < MTU: "+self.size_str(b/e_t, "b")+"ps") 197 print(" Encryption timing min/avg/med/max/mdev: "+str(round(e_tmin, 3))+"/"+str(round(e_tmean, 3))+"/"+str(round(e_tmed, 3))+"/"+str(round(e_tmax, 3))+"/"+str(round(e_tmdev, 3))) 198 print(" Max deviation from median: "+str(round(e_mpct, 1))+"%") 199 print() 200 201 print(" Decrypt chunks < MTU: "+self.size_str(b/d_t, "b")+"ps") 202 print(" Decryption timing min/avg/med/max/mdev: "+str(round(d_tmin, 3))+"/"+str(round(d_tmean, 3))+"/"+str(round(d_tmed, 3))+"/"+str(round(d_tmax, 3))+"/"+str(round(d_tmdev, 3))) 203 print(" Max deviation from median: "+str(round(d_mpct, 1))+"%") 204 print() 205 206 # Test encrypt and decrypt of large chunks 207 print("Testing large chunk encrypt/decrypt") 208 mlen = 8*1000*1000 209 if RNS.Cryptography.backend() == "internal": 210 lb = 1 211 else: 212 lb = 8 213 214 for i in range(1, lb): 215 msg = os.urandom(mlen) 216 b += mlen 217 id1 = RNS.Identity() 218 id2 = RNS.Identity(create_keys=False) 219 id2.load_public_key(id1.get_public_key()) 220 221 e_start = time.time() 222 token = id2.encrypt(msg) 223 e_now = time.time() 224 e_t += e_now - e_start 225 e_times.append(e_now - e_start) 226 227 d_start = time.time() 228 self.assertEqual(msg, id1.decrypt(token)) 229 d_now = time.time() 230 d_t += d_now - d_start 231 d_times.append(d_now - d_start) 232 233 e_tmin = min(e_times)*1000; d_tmin = min(d_times)*1000 234 e_tmax = max(e_times)*1000; d_tmax = max(d_times)*1000 235 e_tmean = (sum(e_times)/len(e_times))*1000; d_tmean = (sum(d_times)/len(d_times))*1000 236 e_tmed = statistics.median(e_times)*1000; d_tmed = statistics.median(d_times)*1000 237 e_tmdev = e_tmax - e_tmin; d_tmdev = d_tmax - d_tmin 238 e_mpct = (e_tmax/e_tmed)*100; d_mpct = (d_tmax/d_tmed)*100 239 240 print(" Encrypt "+self.size_str(mlen)+" chunks: "+self.size_str(b/e_t, "b")+"ps") 241 print(" Encryption timing min/avg/med/max/mdev: "+str(round(e_tmin, 3))+"/"+str(round(e_tmean, 3))+"/"+str(round(e_tmed, 3))+"/"+str(round(e_tmax, 3))+"/"+str(round(e_tmdev, 3))) 242 print(" Max deviation from median: "+str(round(e_mpct, 1))+"%") 243 print() 244 245 print(" Decrypt "+self.size_str(mlen)+" chunks: "+self.size_str(b/d_t, "b")+"ps") 246 print(" Decryption timing min/avg/med/max/mdev: "+str(round(d_tmin, 3))+"/"+str(round(d_tmean, 3))+"/"+str(round(d_tmed, 3))+"/"+str(round(d_tmax, 3))+"/"+str(round(d_tmdev, 3))) 247 print(" Max deviation from median: "+str(round(d_mpct, 1))+"%") 248 print() 249 250 def size_str(self, num, suffix='B'): 251 units = ['','K','M','G','T','P','E','Z'] 252 last_unit = 'Y' 253 254 if suffix == 'b': 255 num *= 8 256 units = ['','K','M','G','T','P','E','Z'] 257 last_unit = 'Y' 258 259 for unit in units: 260 if abs(num) < 1000.0: 261 if unit == "": 262 return "%.0f %s%s" % (num, unit, suffix) 263 else: 264 return "%.2f %s%s" % (num, unit, suffix) 265 num /= 1000.0 266 267 return "%.2f%s%s" % (num, last_unit, suffix) 268 269 if __name__ == '__main__': 270 unittest.main(verbosity=2)