/ tests / identity.py
identity.py
  1  import unittest
  2  
  3  import time
  4  import RNS
  5  import os
  6  
  7  signed_message = "e51a008b8b8ba855993d8892a40daad84a6fb69a7138e1b5f69b427fe03449826ab6ccb81f0d72b4725e8d55c814d3e8e151b495cf5b59702f197ec366d935ad04a98ca519d6964f96ea09910b020351d1cdff3befbad323a2a28a6ec7ced4d0d67f02c525f93b321d9b076d704408475bd2d123cd51916f7e49039246ac56add37ef87e32d7f9853ac44a7f77d26fedc83e4e67a45742b751c2599309f5eda6efa0dafd957f61af1f0e86c4d6c5052e0e5fa577db99846f2b7a0204c31cef4013ca51cb307506c9209fd18d0195a7c9ae628af1a1d9ee7a4cf30037ed190a9fdcaa4ce5bb7bea19803cb5b5cea8c21fdb98d8f73ff5aaad87f5f6c3b7bcfe8974e5b063cc1113d77b9e96bec1c9d10ed37b780c3f7349a34092bb3968daeced40eb0b5130c0d11595e30b9671896385d04289d067f671599386536eed8430a72e186fb95023d5ac5dd442443bfabfe13a84a38d060af73bf20f921f38a768672fdbcb1dfece7458166e2e15948d6b4fa81f42db48747d283c670f576a0b410b31a70d2594823d0e29135a488cb0408c9e5bc1e197ff99aef471924231ccc8e3eddc82dbcea4801f14c5fc7a389a26a52cc93cfe0770953ef595ff410b7033a6ed5c975dd922b3f48f9dffcfb412eeed5758f3aa51de7eb47cd2cb"
  8  sig_from_key_0 = "3020ef58f861591826a61c3d2d4a25b949cdb3094085ba6b1177a6f2a05f3cdd24d1095d6fdd078f0b2826e80b261c93c1ff97fbfd4857f25706d57dd073590c"
  9  
 10  encrypted_message = "71884a271ead43558fcf1e331c5aebcd43498f16da16f8056b0893ce6b15d521eaa4f31639cd34da1b57995944076c4f14f300f2d2612111d21a3429a9966ac1da68545c00c7887d8b26f6c1ab9defa020b9519849ca41b7904199882802b6542771df85144a79890289d3c02daef6c26652c5ce9de231a2"
 11  fixed_token = "e37705f9b432d3711acf028678b0b9d37fdf7e00a3b47c95251aad61447df2620b5b9978783c3d9f2fb762e68c8b57c554928fb70dd79c1033ce5865f91761aad3e992790f63456092cb69b7b045f539147f7ba10d480e300f193576ae2d75a7884809b76bd17e05a735383305c0aa5621395bbf51e8cc66c1c536f339f2bea600f08f8f9a76564b2522cd904b6c2b6e553ec3d4df718ae70434c734297b313539338d184d2c64a9c4ddbc9b9a4947d0b45f5a274f65ae9f6bb203562fd5cede6abd3c615b699156e08fa33b841647a0"
 12  
 13  fixed_keys = [
 14      ("f8953ffaf607627e615603ff1530c82c434cf87c07179dd7689ea776f30b964cfb7ba6164af00c5111a45e69e57d885e1285f8dbfe3a21e95ae17cf676b0f8b7", "650b5d76b6bec0390d1f8cfca5bd33f9"),
 15      ("d85d036245436a3c33d3228affae06721f8203bc364ee0ee7556368ac62add650ebf8f926abf628da9d92baaa12db89bd6516ee92ec29765f3afafcb8622d697", "1469e89450c361b253aefb0c606b6111"),
 16      ("8893e2bfd30fc08455997caf7abb7a6341716768dbbf9a91cc1455bd7eeaf74cdc10ec72a4d4179696040bac620ee97ebc861e2443e5270537ae766d91b58181", "e5fe93ee4acba095b3b9b6541515ed3e"),
 17      ("b82c7a4f047561d974de7e38538281d7f005d3663615f30d9663bad35a716063c931672cd452175d55bcdd70bb7aa35a9706872a97963dc52029938ea7341b39", "1333b911fa8ebb16726996adbe3c6262"),
 18      ("08bb35f92b06a0832991165a0d9b4fd91af7b7765ce4572aa6222070b11b767092b61b0fd18b3a59cae6deb9db6d4bfb1c7fcfe076cfd66eea7ddd5f877543b9", "d13712efc45ef87674fb5ac26c37c912"),
 19  ]
 20  
 21  class TestIdentity(unittest.TestCase):
 22  
 23      def test_0_create_from_bytes(self):
 24          for entry in fixed_keys:
 25              key, id_hash = entry
 26              i = RNS.Identity.from_bytes(bytes.fromhex(key))
 27              self.assertEqual(i.hash, bytes.fromhex(id_hash))
 28              self.assertEqual(i.get_private_key(), bytes.fromhex(key))
 29  
 30      def test_1_sign(self):
 31          print("")
 32  
 33          # Test known signature
 34          fid = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
 35          sig = fid.sign(signed_message.encode("utf-8"))
 36  
 37          self.assertEqual(sig, bytes.fromhex(sig_from_key_0))
 38  
 39          # Test signature time jitter
 40          id1 = RNS.Identity()
 41          id2 = RNS.Identity(create_keys=False)
 42          id2.load_public_key(id1.get_public_key())
 43  
 44          if RNS.Cryptography.backend() == "internal":
 45              rounds = 2000
 46          else:
 47              rounds = 20000
 48  
 49          times = []
 50          for i in range(1, rounds):
 51              msg = os.urandom(512)
 52              start = time.time()
 53              signature = id1.sign(msg)
 54              t = time.time() - start
 55              times.append(t)
 56  
 57          import statistics
 58          tmin  = min(times)*1000
 59          tmax  = max(times)*1000
 60          tmean  = (sum(times)/len(times))*1000
 61          tmed = statistics.median(times)*1000
 62          tmdev = tmax - tmin
 63          mpct = (tmax/tmed)*100
 64          print("Random messages:")
 65          print("  Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3)))
 66          print("  Max deviation from median: "+str(round(mpct, 1))+"%")
 67          print()
 68  
 69          id1 = RNS.Identity()
 70          id2 = RNS.Identity(create_keys=False)
 71          id2.load_public_key(id1.get_public_key())
 72  
 73          times = []
 74          for i in range(1, rounds):
 75              msg = bytes([0x00])*512
 76              start = time.time()
 77              signature = id1.sign(msg)
 78              t = time.time() - start
 79              times.append(t)
 80  
 81          tmin  = min(times)*1000
 82          tmax  = max(times)*1000
 83          tmean  = (sum(times)/len(times))*1000
 84          tmed = statistics.median(times)*1000
 85          tmdev = tmax - tmin
 86          mpct = (tmax/tmed)*100
 87          print("All 0xff messages:")
 88          print("  Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3)))
 89          print("  Max deviation from median: "+str(round(mpct, 1))+"%")
 90          print()
 91  
 92          id1 = RNS.Identity()
 93          id2 = RNS.Identity(create_keys=False)
 94          id2.load_public_key(id1.get_public_key())
 95  
 96          times = []
 97          for i in range(1, rounds):
 98              msg = bytes([0xff])*512
 99              start = time.time()
100              signature = id1.sign(msg)
101              t = time.time() - start
102              times.append(t)
103  
104          tmin  = min(times)*1000
105          tmax  = max(times)*1000
106          tmean  = (sum(times)/len(times))*1000
107          tmed = statistics.median(times)*1000
108          tmdev = tmax - tmin
109          mpct = (tmax/tmed)*100
110          print("All 0x00 messages:")
111          print("  Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3)))
112          print("  Max deviation from median: "+str(round(mpct, 1))+"%")
113          print()
114  
115          b = 0
116          t = 0
117          for i in range(1, 500):
118              mlen = i % (RNS.Reticulum.MTU//2) + (RNS.Reticulum.MTU//2)
119              msg = os.urandom(mlen)
120              b += mlen
121              id1 = RNS.Identity()
122              id2 = RNS.Identity(create_keys=False)
123              id2.load_public_key(id1.get_public_key())
124  
125              start = time.time()
126              signature = id1.sign(msg)
127              self.assertEqual(True, id2.validate(signature, msg))
128              t += time.time() - start
129  
130          print("Sign/validate chunks < MTU: "+self.size_str(b/t, "b")+"ps")
131  
132          for i in range(1, 500):
133              mlen = 16*1024
134              msg = os.urandom(mlen)
135              b += mlen
136              id1 = RNS.Identity()
137              id2 = RNS.Identity(create_keys=False)
138              id2.load_public_key(id1.get_public_key())
139  
140              start = time.time()
141              signature = id1.sign(msg)
142              self.assertEqual(True, id2.validate(signature, msg))
143              t += time.time() - start
144  
145          print("Sign/validate 16KB chunks: "+self.size_str(b/t, "b")+"ps")
146  
147  
148      def test_2_encrypt(self):
149          print("")
150  
151          # Test loading identity from bytes
152          fid = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
153          self.assertEqual(fid.hash, bytes.fromhex(fixed_keys[0][1]))
154  
155          # Test decryption of known AES-256 token
156          print("Testing decryption of known token")
157          plaintext = fid.decrypt(bytes.fromhex(fixed_token))
158          self.assertEqual(plaintext, bytes.fromhex(encrypted_message))
159  
160          # Test encrypt and decrypt of random chunks
161          print("Testing random small chunk encrypt/decrypt")
162          b = 0
163          e_t = 0
164          d_t = 0
165          e_times = []
166          d_times = []
167          for i in range(1, 500):
168              mlen = i % (RNS.Reticulum.MTU//2) + (RNS.Reticulum.MTU//2)
169              msg = os.urandom(mlen)
170              b += mlen
171              id1 = RNS.Identity()
172              id2 = RNS.Identity(create_keys=False)
173              id2.load_public_key(id1.get_public_key())
174  
175              e_start = time.time()
176              token = id2.encrypt(msg)
177              e_now = time.time()
178              e_t += e_now - e_start
179              e_times.append(e_now - e_start)
180  
181              d_start = time.time()
182              decrypted = id1.decrypt(token)
183              self.assertEqual(msg, decrypted)
184              d_now = time.time()
185              d_t += d_now - d_start
186              d_times.append(d_now - d_start)
187  
188          import statistics
189          e_tmin  = min(e_times)*1000; d_tmin  = min(d_times)*1000
190          e_tmax  = max(e_times)*1000; d_tmax  = max(d_times)*1000
191          e_tmean  = (sum(e_times)/len(e_times))*1000; d_tmean  = (sum(d_times)/len(d_times))*1000
192          e_tmed = statistics.median(e_times)*1000; d_tmed = statistics.median(d_times)*1000
193          e_tmdev = e_tmax - e_tmin; d_tmdev = d_tmax - d_tmin
194          e_mpct = (e_tmax/e_tmed)*100; d_mpct = (d_tmax/d_tmed)*100
195  
196          print("  Encrypt chunks < MTU: "+self.size_str(b/e_t, "b")+"ps")
197          print("    Encryption timing min/avg/med/max/mdev: "+str(round(e_tmin, 3))+"/"+str(round(e_tmean, 3))+"/"+str(round(e_tmed, 3))+"/"+str(round(e_tmax, 3))+"/"+str(round(e_tmdev, 3)))
198          print("    Max deviation from median: "+str(round(e_mpct, 1))+"%")
199          print()
200  
201          print("  Decrypt chunks < MTU: "+self.size_str(b/d_t, "b")+"ps")
202          print("    Decryption timing min/avg/med/max/mdev: "+str(round(d_tmin, 3))+"/"+str(round(d_tmean, 3))+"/"+str(round(d_tmed, 3))+"/"+str(round(d_tmax, 3))+"/"+str(round(d_tmdev, 3)))
203          print("    Max deviation from median: "+str(round(d_mpct, 1))+"%")
204          print()
205  
206          # Test encrypt and decrypt of large chunks
207          print("Testing large chunk encrypt/decrypt")
208          mlen = 8*1000*1000
209          if RNS.Cryptography.backend() == "internal":
210              lb = 1
211          else:
212              lb = 8
213          
214          for i in range(1, lb):
215              msg = os.urandom(mlen)
216              b += mlen
217              id1 = RNS.Identity()
218              id2 = RNS.Identity(create_keys=False)
219              id2.load_public_key(id1.get_public_key())
220  
221              e_start = time.time()
222              token = id2.encrypt(msg)
223              e_now = time.time()
224              e_t += e_now - e_start
225              e_times.append(e_now - e_start)
226  
227              d_start = time.time()
228              self.assertEqual(msg, id1.decrypt(token))
229              d_now = time.time()
230              d_t += d_now - d_start
231              d_times.append(d_now - d_start)
232  
233          e_tmin  = min(e_times)*1000; d_tmin  = min(d_times)*1000
234          e_tmax  = max(e_times)*1000; d_tmax  = max(d_times)*1000
235          e_tmean  = (sum(e_times)/len(e_times))*1000; d_tmean  = (sum(d_times)/len(d_times))*1000
236          e_tmed = statistics.median(e_times)*1000; d_tmed = statistics.median(d_times)*1000
237          e_tmdev = e_tmax - e_tmin; d_tmdev = d_tmax - d_tmin
238          e_mpct = (e_tmax/e_tmed)*100; d_mpct = (d_tmax/d_tmed)*100
239  
240          print("  Encrypt "+self.size_str(mlen)+" chunks: "+self.size_str(b/e_t, "b")+"ps")
241          print("    Encryption timing min/avg/med/max/mdev: "+str(round(e_tmin, 3))+"/"+str(round(e_tmean, 3))+"/"+str(round(e_tmed, 3))+"/"+str(round(e_tmax, 3))+"/"+str(round(e_tmdev, 3)))
242          print("    Max deviation from median: "+str(round(e_mpct, 1))+"%")
243          print()
244  
245          print("  Decrypt "+self.size_str(mlen)+" chunks: "+self.size_str(b/d_t, "b")+"ps")
246          print("    Decryption timing min/avg/med/max/mdev: "+str(round(d_tmin, 3))+"/"+str(round(d_tmean, 3))+"/"+str(round(d_tmed, 3))+"/"+str(round(d_tmax, 3))+"/"+str(round(d_tmdev, 3)))
247          print("    Max deviation from median: "+str(round(d_mpct, 1))+"%")
248          print()
249  
250      def size_str(self, num, suffix='B'):
251          units = ['','K','M','G','T','P','E','Z']
252          last_unit = 'Y'
253  
254          if suffix == 'b':
255              num *= 8
256              units = ['','K','M','G','T','P','E','Z']
257              last_unit = 'Y'
258  
259          for unit in units:
260              if abs(num) < 1000.0:
261                  if unit == "":
262                      return "%.0f %s%s" % (num, unit, suffix)
263                  else:
264                      return "%.2f %s%s" % (num, unit, suffix)
265              num /= 1000.0
266  
267          return "%.2f%s%s" % (num, last_unit, suffix)
268  
269  if __name__ == '__main__':
270      unittest.main(verbosity=2)