/ Examples / Ratchets.py
Ratchets.py
  1  ##########################################################
  2  # This RNS example demonstrates a simple client/server   #
  3  # echo utility that uses ratchets to rotate encryption   #
  4  # keys everytime an announce is sent.                    #
  5  ##########################################################
  6  
  7  import argparse
  8  import sys
  9  import RNS
 10  
 11  # Let's define an app name. We'll use this for all
 12  # destinations we create. Since this echo example
 13  # is part of a range of example utilities, we'll put
 14  # them all within the app namespace "example_utilities"
 15  APP_NAME = "example_utilities"
 16  
 17  
 18  ##########################################################
 19  #### Server Part #########################################
 20  ##########################################################
 21  
 22  # This initialisation is executed when the users chooses
 23  # to run as a server
 24  def server(configpath):
 25      global reticulum
 26  
 27      # We must first initialise Reticulum
 28      reticulum = RNS.Reticulum(configpath)
 29      
 30      # Randomly create a new identity for our echo server
 31      server_identity = RNS.Identity()
 32  
 33      # We create a destination that clients can query. We want
 34      # to be able to verify echo replies to our clients, so we
 35      # create a "single" destination that can receive encrypted
 36      # messages. This way the client can send a request and be
 37      # certain that no-one else than this destination was able
 38      # to read it. 
 39      echo_destination = RNS.Destination(
 40          server_identity,
 41          RNS.Destination.IN,
 42          RNS.Destination.SINGLE,
 43          APP_NAME,
 44          "ratchet",
 45          "echo",
 46          "request"
 47      )
 48  
 49      # Enable ratchets on the destination by providing a file
 50      # path to store ratchets. In this example, we will just
 51      # use a temporary file, but in real-world applications,
 52      # it's extremely important to keep this file secure, since
 53      # it contains encryption keys for the destination.
 54      destination_hexhash = RNS.hexrep(echo_destination.hash, delimit=False)
 55      echo_destination.enable_ratchets(f"/tmp/{destination_hexhash}.ratchets")
 56  
 57      # We configure the destination to automatically prove all
 58      # packets addressed to it. By doing this, RNS will automatically
 59      # generate a proof for each incoming packet and transmit it
 60      # back to the sender of that packet.
 61      echo_destination.set_proof_strategy(RNS.Destination.PROVE_ALL)
 62      
 63      # Tell the destination which function in our program to
 64      # run when a packet is received. We do this so we can
 65      # print a log message when the server receives a request
 66      echo_destination.set_packet_callback(server_callback)
 67  
 68      # Everything's ready!
 69      # Let's Wait for client requests or user input
 70      announceLoop(echo_destination)
 71  
 72  
 73  def announceLoop(destination):
 74      # Let the user know that everything is ready
 75      RNS.log(
 76          "Ratcheted echo server "+
 77          RNS.prettyhexrep(destination.hash)+
 78          " running, hit enter to manually send an announce (Ctrl-C to quit)"
 79      )
 80  
 81      # We enter a loop that runs until the users exits.
 82      # If the user hits enter, we will announce our server
 83      # destination on the network, which will let clients
 84      # know how to create messages directed towards it.
 85      while True:
 86          entered = input()
 87          destination.announce()
 88          RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash))
 89  
 90  
 91  def server_callback(message, packet):
 92      global reticulum
 93      
 94      # Tell the user that we received an echo request, and
 95      # that we are going to send a reply to the requester.
 96      # Sending the proof is handled automatically, since we
 97      # set up the destination to prove all incoming packets.
 98  
 99      reception_stats = ""
100      if reticulum.is_connected_to_shared_instance:
101          reception_rssi = reticulum.get_packet_rssi(packet.packet_hash)
102          reception_snr  = reticulum.get_packet_snr(packet.packet_hash)
103  
104          if reception_rssi != None:
105              reception_stats += " [RSSI "+str(reception_rssi)+" dBm]"
106          
107          if reception_snr != None:
108              reception_stats += " [SNR "+str(reception_snr)+" dBm]"
109  
110      else:
111          if packet.rssi != None:
112              reception_stats += " [RSSI "+str(packet.rssi)+" dBm]"
113          
114          if packet.snr != None:
115              reception_stats += " [SNR "+str(packet.snr)+" dB]"
116  
117      RNS.log("Received packet from echo client, proof sent"+reception_stats)
118  
119  
120  ##########################################################
121  #### Client Part #########################################
122  ##########################################################
123  
124  # This initialisation is executed when the users chooses
125  # to run as a client
126  def client(destination_hexhash, configpath, timeout=None):
127      global reticulum
128      
129      # We need a binary representation of the destination
130      # hash that was entered on the command line
131      try:
132          dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
133          if len(destination_hexhash) != dest_len:
134              raise ValueError(
135                  "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2)
136              )
137  
138          destination_hash = bytes.fromhex(destination_hexhash)
139      except Exception as e:
140          RNS.log("Invalid destination entered. Check your input!")
141          RNS.log(str(e)+"\n")
142          sys.exit(0)
143  
144      # We must first initialise Reticulum
145      reticulum = RNS.Reticulum(configpath)
146  
147      # We override the loglevel to provide feedback when
148      # an announce is received
149      if RNS.loglevel < RNS.LOG_INFO:
150          RNS.loglevel = RNS.LOG_INFO
151  
152      # Tell the user that the client is ready!
153      RNS.log(
154          "Echo client ready, hit enter to send echo request to "+
155          destination_hexhash+
156          " (Ctrl-C to quit)"
157      )
158  
159      # We enter a loop that runs until the user exits.
160      # If the user hits enter, we will try to send an
161      # echo request to the destination specified on the
162      # command line.
163      while True:
164          input()
165          
166          # Let's first check if RNS knows a path to the destination.
167          # If it does, we'll load the server identity and create a packet
168          if RNS.Transport.has_path(destination_hash):
169  
170              # To address the server, we need to know it's public
171              # key, so we check if Reticulum knows this destination.
172              # This is done by calling the "recall" method of the
173              # Identity module. If the destination is known, it will
174              # return an Identity instance that can be used in
175              # outgoing destinations.
176              server_identity = RNS.Identity.recall(destination_hash)
177  
178              # We got the correct identity instance from the
179              # recall method, so let's create an outgoing
180              # destination. We use the naming convention:
181              # example_utilities.ratchet.echo.request
182              # This matches the naming we specified in the
183              # server part of the code.
184              request_destination = RNS.Destination(
185                  server_identity,
186                  RNS.Destination.OUT,
187                  RNS.Destination.SINGLE,
188                  APP_NAME,
189                  "ratchet",
190                  "echo",
191                  "request"
192              )
193  
194              # The destination is ready, so let's create a packet.
195              # We set the destination to the request_destination
196              # that was just created, and the only data we add
197              # is a random hash.
198              echo_request = RNS.Packet(request_destination, RNS.Identity.get_random_hash())
199  
200              # Send the packet! If the packet is successfully
201              # sent, it will return a PacketReceipt instance.
202              packet_receipt = echo_request.send()
203  
204              # If the user specified a timeout, we set this
205              # timeout on the packet receipt, and configure
206              # a callback function, that will get called if
207              # the packet times out.
208              if timeout != None:
209                  packet_receipt.set_timeout(timeout)
210                  packet_receipt.set_timeout_callback(packet_timed_out)
211  
212              # We can then set a delivery callback on the receipt.
213              # This will get automatically called when a proof for
214              # this specific packet is received from the destination.
215              packet_receipt.set_delivery_callback(packet_delivered)
216  
217              # Tell the user that the echo request was sent
218              RNS.log("Sent echo request to "+RNS.prettyhexrep(request_destination.hash))
219          else:
220              # If we do not know this destination, tell the
221              # user to wait for an announce to arrive.
222              RNS.log("Destination is not yet known. Requesting path...")
223              RNS.log("Hit enter to manually retry once an announce is received.")
224              RNS.Transport.request_path(destination_hash)
225  
226  # This function is called when our reply destination
227  # receives a proof packet.
228  def packet_delivered(receipt):
229      global reticulum
230  
231      if receipt.status == RNS.PacketReceipt.DELIVERED:
232          rtt = receipt.get_rtt()
233          if (rtt >= 1):
234              rtt = round(rtt, 3)
235              rttstring = str(rtt)+" seconds"
236          else:
237              rtt = round(rtt*1000, 3)
238              rttstring = str(rtt)+" milliseconds"
239  
240          reception_stats = ""
241          if reticulum.is_connected_to_shared_instance:
242              reception_rssi = reticulum.get_packet_rssi(receipt.proof_packet.packet_hash)
243              reception_snr  = reticulum.get_packet_snr(receipt.proof_packet.packet_hash)
244  
245              if reception_rssi != None:
246                  reception_stats += " [RSSI "+str(reception_rssi)+" dBm]"
247              
248              if reception_snr != None:
249                  reception_stats += " [SNR "+str(reception_snr)+" dB]"
250  
251          else:
252              if receipt.proof_packet != None:
253                  if receipt.proof_packet.rssi != None:
254                      reception_stats += " [RSSI "+str(receipt.proof_packet.rssi)+" dBm]"
255                  
256                  if receipt.proof_packet.snr != None:
257                      reception_stats += " [SNR "+str(receipt.proof_packet.snr)+" dB]"
258  
259          RNS.log(
260              "Valid reply received from "+
261              RNS.prettyhexrep(receipt.destination.hash)+
262              ", round-trip time is "+rttstring+
263              reception_stats
264          )
265  
266  # This function is called if a packet times out.
267  def packet_timed_out(receipt):
268      if receipt.status == RNS.PacketReceipt.FAILED:
269          RNS.log("Packet "+RNS.prettyhexrep(receipt.hash)+" timed out")
270  
271  
272  ##########################################################
273  #### Program Startup #####################################
274  ##########################################################
275  
276  # This part of the program gets run at startup,
277  # and parses input from the user, and then starts
278  # the desired program mode.
279  if __name__ == "__main__":
280      try:
281          parser = argparse.ArgumentParser(description="Simple ratcheted echo server and client utility")
282  
283          parser.add_argument(
284              "-s",
285              "--server",
286              action="store_true",
287              help="wait for incoming packets from clients"
288          )
289  
290          parser.add_argument(
291              "-t",
292              "--timeout",
293              action="store",
294              metavar="s",
295              default=None,
296              help="set a reply timeout in seconds",
297              type=float
298          )
299  
300          parser.add_argument("--config",
301              action="store",
302              default=None,
303              help="path to alternative Reticulum config directory",
304              type=str
305          )
306  
307          parser.add_argument(
308              "destination",
309              nargs="?",
310              default=None,
311              help="hexadecimal hash of the server destination",
312              type=str
313          )
314  
315          args = parser.parse_args()
316  
317          if args.server:
318              configarg=None
319              if args.config:
320                  configarg = args.config
321              server(configarg)
322          else:
323              if args.config:
324                  configarg = args.config
325              else:
326                  configarg = None
327  
328              if args.timeout:
329                  timeoutarg = float(args.timeout)
330              else:
331                  timeoutarg = None
332  
333              if (args.destination == None):
334                  print("")
335                  parser.print_help()
336                  print("")
337              else:
338                  client(args.destination, configarg, timeout=timeoutarg)
339      except KeyboardInterrupt:
340          print("")
341          sys.exit(0)