Ratchets.py
1 ########################################################## 2 # This RNS example demonstrates a simple client/server # 3 # echo utility that uses ratchets to rotate encryption # 4 # keys everytime an announce is sent. # 5 ########################################################## 6 7 import argparse 8 import sys 9 import RNS 10 11 # Let's define an app name. We'll use this for all 12 # destinations we create. Since this echo example 13 # is part of a range of example utilities, we'll put 14 # them all within the app namespace "example_utilities" 15 APP_NAME = "example_utilities" 16 17 18 ########################################################## 19 #### Server Part ######################################### 20 ########################################################## 21 22 # This initialisation is executed when the users chooses 23 # to run as a server 24 def server(configpath): 25 global reticulum 26 27 # We must first initialise Reticulum 28 reticulum = RNS.Reticulum(configpath) 29 30 # Randomly create a new identity for our echo server 31 server_identity = RNS.Identity() 32 33 # We create a destination that clients can query. We want 34 # to be able to verify echo replies to our clients, so we 35 # create a "single" destination that can receive encrypted 36 # messages. This way the client can send a request and be 37 # certain that no-one else than this destination was able 38 # to read it. 39 echo_destination = RNS.Destination( 40 server_identity, 41 RNS.Destination.IN, 42 RNS.Destination.SINGLE, 43 APP_NAME, 44 "ratchet", 45 "echo", 46 "request" 47 ) 48 49 # Enable ratchets on the destination by providing a file 50 # path to store ratchets. In this example, we will just 51 # use a temporary file, but in real-world applications, 52 # it's extremely important to keep this file secure, since 53 # it contains encryption keys for the destination. 54 destination_hexhash = RNS.hexrep(echo_destination.hash, delimit=False) 55 echo_destination.enable_ratchets(f"/tmp/{destination_hexhash}.ratchets") 56 57 # We configure the destination to automatically prove all 58 # packets addressed to it. By doing this, RNS will automatically 59 # generate a proof for each incoming packet and transmit it 60 # back to the sender of that packet. 61 echo_destination.set_proof_strategy(RNS.Destination.PROVE_ALL) 62 63 # Tell the destination which function in our program to 64 # run when a packet is received. We do this so we can 65 # print a log message when the server receives a request 66 echo_destination.set_packet_callback(server_callback) 67 68 # Everything's ready! 69 # Let's Wait for client requests or user input 70 announceLoop(echo_destination) 71 72 73 def announceLoop(destination): 74 # Let the user know that everything is ready 75 RNS.log( 76 "Ratcheted echo server "+ 77 RNS.prettyhexrep(destination.hash)+ 78 " running, hit enter to manually send an announce (Ctrl-C to quit)" 79 ) 80 81 # We enter a loop that runs until the users exits. 82 # If the user hits enter, we will announce our server 83 # destination on the network, which will let clients 84 # know how to create messages directed towards it. 85 while True: 86 entered = input() 87 destination.announce() 88 RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) 89 90 91 def server_callback(message, packet): 92 global reticulum 93 94 # Tell the user that we received an echo request, and 95 # that we are going to send a reply to the requester. 96 # Sending the proof is handled automatically, since we 97 # set up the destination to prove all incoming packets. 98 99 reception_stats = "" 100 if reticulum.is_connected_to_shared_instance: 101 reception_rssi = reticulum.get_packet_rssi(packet.packet_hash) 102 reception_snr = reticulum.get_packet_snr(packet.packet_hash) 103 104 if reception_rssi != None: 105 reception_stats += " [RSSI "+str(reception_rssi)+" dBm]" 106 107 if reception_snr != None: 108 reception_stats += " [SNR "+str(reception_snr)+" dBm]" 109 110 else: 111 if packet.rssi != None: 112 reception_stats += " [RSSI "+str(packet.rssi)+" dBm]" 113 114 if packet.snr != None: 115 reception_stats += " [SNR "+str(packet.snr)+" dB]" 116 117 RNS.log("Received packet from echo client, proof sent"+reception_stats) 118 119 120 ########################################################## 121 #### Client Part ######################################### 122 ########################################################## 123 124 # This initialisation is executed when the users chooses 125 # to run as a client 126 def client(destination_hexhash, configpath, timeout=None): 127 global reticulum 128 129 # We need a binary representation of the destination 130 # hash that was entered on the command line 131 try: 132 dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 133 if len(destination_hexhash) != dest_len: 134 raise ValueError( 135 "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) 136 ) 137 138 destination_hash = bytes.fromhex(destination_hexhash) 139 except Exception as e: 140 RNS.log("Invalid destination entered. Check your input!") 141 RNS.log(str(e)+"\n") 142 sys.exit(0) 143 144 # We must first initialise Reticulum 145 reticulum = RNS.Reticulum(configpath) 146 147 # We override the loglevel to provide feedback when 148 # an announce is received 149 if RNS.loglevel < RNS.LOG_INFO: 150 RNS.loglevel = RNS.LOG_INFO 151 152 # Tell the user that the client is ready! 153 RNS.log( 154 "Echo client ready, hit enter to send echo request to "+ 155 destination_hexhash+ 156 " (Ctrl-C to quit)" 157 ) 158 159 # We enter a loop that runs until the user exits. 160 # If the user hits enter, we will try to send an 161 # echo request to the destination specified on the 162 # command line. 163 while True: 164 input() 165 166 # Let's first check if RNS knows a path to the destination. 167 # If it does, we'll load the server identity and create a packet 168 if RNS.Transport.has_path(destination_hash): 169 170 # To address the server, we need to know it's public 171 # key, so we check if Reticulum knows this destination. 172 # This is done by calling the "recall" method of the 173 # Identity module. If the destination is known, it will 174 # return an Identity instance that can be used in 175 # outgoing destinations. 176 server_identity = RNS.Identity.recall(destination_hash) 177 178 # We got the correct identity instance from the 179 # recall method, so let's create an outgoing 180 # destination. We use the naming convention: 181 # example_utilities.ratchet.echo.request 182 # This matches the naming we specified in the 183 # server part of the code. 184 request_destination = RNS.Destination( 185 server_identity, 186 RNS.Destination.OUT, 187 RNS.Destination.SINGLE, 188 APP_NAME, 189 "ratchet", 190 "echo", 191 "request" 192 ) 193 194 # The destination is ready, so let's create a packet. 195 # We set the destination to the request_destination 196 # that was just created, and the only data we add 197 # is a random hash. 198 echo_request = RNS.Packet(request_destination, RNS.Identity.get_random_hash()) 199 200 # Send the packet! If the packet is successfully 201 # sent, it will return a PacketReceipt instance. 202 packet_receipt = echo_request.send() 203 204 # If the user specified a timeout, we set this 205 # timeout on the packet receipt, and configure 206 # a callback function, that will get called if 207 # the packet times out. 208 if timeout != None: 209 packet_receipt.set_timeout(timeout) 210 packet_receipt.set_timeout_callback(packet_timed_out) 211 212 # We can then set a delivery callback on the receipt. 213 # This will get automatically called when a proof for 214 # this specific packet is received from the destination. 215 packet_receipt.set_delivery_callback(packet_delivered) 216 217 # Tell the user that the echo request was sent 218 RNS.log("Sent echo request to "+RNS.prettyhexrep(request_destination.hash)) 219 else: 220 # If we do not know this destination, tell the 221 # user to wait for an announce to arrive. 222 RNS.log("Destination is not yet known. Requesting path...") 223 RNS.log("Hit enter to manually retry once an announce is received.") 224 RNS.Transport.request_path(destination_hash) 225 226 # This function is called when our reply destination 227 # receives a proof packet. 228 def packet_delivered(receipt): 229 global reticulum 230 231 if receipt.status == RNS.PacketReceipt.DELIVERED: 232 rtt = receipt.get_rtt() 233 if (rtt >= 1): 234 rtt = round(rtt, 3) 235 rttstring = str(rtt)+" seconds" 236 else: 237 rtt = round(rtt*1000, 3) 238 rttstring = str(rtt)+" milliseconds" 239 240 reception_stats = "" 241 if reticulum.is_connected_to_shared_instance: 242 reception_rssi = reticulum.get_packet_rssi(receipt.proof_packet.packet_hash) 243 reception_snr = reticulum.get_packet_snr(receipt.proof_packet.packet_hash) 244 245 if reception_rssi != None: 246 reception_stats += " [RSSI "+str(reception_rssi)+" dBm]" 247 248 if reception_snr != None: 249 reception_stats += " [SNR "+str(reception_snr)+" dB]" 250 251 else: 252 if receipt.proof_packet != None: 253 if receipt.proof_packet.rssi != None: 254 reception_stats += " [RSSI "+str(receipt.proof_packet.rssi)+" dBm]" 255 256 if receipt.proof_packet.snr != None: 257 reception_stats += " [SNR "+str(receipt.proof_packet.snr)+" dB]" 258 259 RNS.log( 260 "Valid reply received from "+ 261 RNS.prettyhexrep(receipt.destination.hash)+ 262 ", round-trip time is "+rttstring+ 263 reception_stats 264 ) 265 266 # This function is called if a packet times out. 267 def packet_timed_out(receipt): 268 if receipt.status == RNS.PacketReceipt.FAILED: 269 RNS.log("Packet "+RNS.prettyhexrep(receipt.hash)+" timed out") 270 271 272 ########################################################## 273 #### Program Startup ##################################### 274 ########################################################## 275 276 # This part of the program gets run at startup, 277 # and parses input from the user, and then starts 278 # the desired program mode. 279 if __name__ == "__main__": 280 try: 281 parser = argparse.ArgumentParser(description="Simple ratcheted echo server and client utility") 282 283 parser.add_argument( 284 "-s", 285 "--server", 286 action="store_true", 287 help="wait for incoming packets from clients" 288 ) 289 290 parser.add_argument( 291 "-t", 292 "--timeout", 293 action="store", 294 metavar="s", 295 default=None, 296 help="set a reply timeout in seconds", 297 type=float 298 ) 299 300 parser.add_argument("--config", 301 action="store", 302 default=None, 303 help="path to alternative Reticulum config directory", 304 type=str 305 ) 306 307 parser.add_argument( 308 "destination", 309 nargs="?", 310 default=None, 311 help="hexadecimal hash of the server destination", 312 type=str 313 ) 314 315 args = parser.parse_args() 316 317 if args.server: 318 configarg=None 319 if args.config: 320 configarg = args.config 321 server(configarg) 322 else: 323 if args.config: 324 configarg = args.config 325 else: 326 configarg = None 327 328 if args.timeout: 329 timeoutarg = float(args.timeout) 330 else: 331 timeoutarg = None 332 333 if (args.destination == None): 334 print("") 335 parser.print_help() 336 print("") 337 else: 338 client(args.destination, configarg, timeout=timeoutarg) 339 except KeyboardInterrupt: 340 print("") 341 sys.exit(0)