rnprobe.py
1 #!/usr/bin/env python3 2 3 # Reticulum License 4 # 5 # Copyright (c) 2016-2025 Mark Qvist 6 # 7 # Permission is hereby granted, free of charge, to any person obtaining a copy 8 # of this software and associated documentation files (the "Software"), to deal 9 # in the Software without restriction, including without limitation the rights 10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 # copies of the Software, and to permit persons to whom the Software is 12 # furnished to do so, subject to the following conditions: 13 # 14 # - The Software shall not be used in any kind of system which includes amongst 15 # its functions the ability to purposefully do harm to human beings. 16 # 17 # - The Software shall not be used, directly or indirectly, in the creation of 18 # an artificial intelligence, machine learning or language model training 19 # dataset, including but not limited to any use that contributes to the 20 # training or development of such a model or algorithm. 21 # 22 # - The above copyright notice and this permission notice shall be included in 23 # all copies or substantial portions of the Software. 24 # 25 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 26 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 27 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 28 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 29 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 30 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 31 # SOFTWARE. 32 33 import RNS 34 import os 35 import sys 36 import time 37 import argparse 38 39 from RNS._version import __version__ 40 41 DEFAULT_PROBE_SIZE = 16 42 DEFAULT_TIMEOUT = 12 43 44 def program_setup(configdir, destination_hexhash, size=None, full_name = None, verbosity = 0, timeout=None, wait=0, probes=1): 45 if size == None: size = DEFAULT_PROBE_SIZE 46 if full_name == None: 47 print("The full destination name including application name aspects must be specified for the destination") 48 exit() 49 50 try: 51 app_name, aspects = RNS.Destination.app_and_aspects_from_name(full_name) 52 53 except Exception as e: 54 print(str(e)) 55 exit() 56 57 try: 58 dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 59 if len(destination_hexhash) != dest_len: 60 raise ValueError("Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2)) 61 try: 62 destination_hash = bytes.fromhex(destination_hexhash) 63 except Exception as e: 64 raise ValueError("Invalid destination entered. Check your input.") 65 except Exception as e: 66 print(str(e)) 67 exit() 68 69 if verbosity > 0: 70 more_output = True 71 verbosity -= 1 72 else: 73 more_output = False 74 verbosity -= 1 75 76 77 reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity) 78 79 if not RNS.Transport.has_path(destination_hash): 80 RNS.Transport.request_path(destination_hash) 81 print("Path to "+RNS.prettyhexrep(destination_hash)+" requested ", end=" ") 82 sys.stdout.flush() 83 84 _timeout = time.time() + (timeout or DEFAULT_TIMEOUT+reticulum.get_first_hop_timeout(destination_hash)) 85 i = 0 86 syms = "⢄⢂⢁⡁⡈⡐⡠" 87 while not RNS.Transport.has_path(destination_hash) and not time.time() > _timeout: 88 time.sleep(0.1) 89 print(("\b\b"+syms[i]+" "), end="") 90 sys.stdout.flush() 91 i = (i+1)%len(syms) 92 93 if time.time() > _timeout: 94 print("\r \rPath request timed out") 95 exit(1) 96 97 server_identity = RNS.Identity.recall(destination_hash) 98 99 request_destination = RNS.Destination( 100 server_identity, 101 RNS.Destination.OUT, 102 RNS.Destination.SINGLE, 103 app_name, 104 *aspects 105 ) 106 107 sent = 0 108 replies = 0 109 while probes: 110 111 if sent > 0: 112 time.sleep(wait) 113 114 try: 115 probe = RNS.Packet(request_destination, os.urandom(size)) 116 probe.pack() 117 except OSError: 118 print("Error: Probe packet size of "+str(len(probe.raw))+" bytes exceed MTU of "+str(RNS.Reticulum.MTU)+" bytes") 119 exit(3) 120 121 receipt = probe.send() 122 sent += 1 123 124 if more_output: 125 nhd = reticulum.get_next_hop(destination_hash) 126 via_str = " via "+RNS.prettyhexrep(nhd) if nhd != None else "" 127 if_str = " on "+str(reticulum.get_next_hop_if_name(destination_hash)) if reticulum.get_next_hop_if_name(destination_hash) != "None" else "" 128 more = via_str+if_str 129 else: 130 more = "" 131 132 print("\rSent probe "+str(sent)+" ("+str(size)+" bytes) to "+RNS.prettyhexrep(destination_hash)+more+" ", end=" ") 133 134 _timeout = time.time() + (timeout or DEFAULT_TIMEOUT+reticulum.get_first_hop_timeout(destination_hash)) 135 i = 0 136 while receipt.status == RNS.PacketReceipt.SENT and not time.time() > _timeout: 137 time.sleep(0.1) 138 print(("\b\b"+syms[i]+" "), end="") 139 sys.stdout.flush() 140 i = (i+1)%len(syms) 141 142 if time.time() > _timeout: 143 print("\r \rProbe timed out") 144 145 else: 146 print("\b\b ") 147 sys.stdout.flush() 148 149 if receipt.status == RNS.PacketReceipt.DELIVERED: 150 replies += 1 151 hops = RNS.Transport.hops_to(destination_hash) 152 if hops != 1: 153 ms = "s" 154 else: 155 ms = "" 156 157 rtt = receipt.get_rtt() 158 if (rtt >= 1): 159 rtt = round(rtt, 3) 160 rttstring = str(rtt)+" seconds" 161 else: 162 rtt = round(rtt*1000, 3) 163 rttstring = str(rtt)+" milliseconds" 164 165 reception_stats = "" 166 if reticulum.is_connected_to_shared_instance: 167 reception_rssi = reticulum.get_packet_rssi(receipt.proof_packet.packet_hash) 168 reception_snr = reticulum.get_packet_snr(receipt.proof_packet.packet_hash) 169 reception_q = reticulum.get_packet_q(receipt.proof_packet.packet_hash) 170 171 if reception_rssi != None: 172 reception_stats += " [RSSI "+str(reception_rssi)+" dBm]" 173 174 if reception_snr != None: 175 reception_stats += " [SNR "+str(reception_snr)+" dB]" 176 177 if reception_q != None: 178 reception_stats += " [Link Quality "+str(reception_q)+"%]" 179 180 else: 181 if receipt.proof_packet != None: 182 if receipt.proof_packet.rssi != None: 183 reception_stats += " [RSSI "+str(receipt.proof_packet.rssi)+" dBm]" 184 185 if receipt.proof_packet.snr != None: 186 reception_stats += " [SNR "+str(receipt.proof_packet.snr)+" dB]" 187 188 print( 189 "Valid reply from "+ 190 RNS.prettyhexrep(receipt.destination.hash)+ 191 "\nRound-trip time is "+rttstring+ 192 " over "+str(hops)+" hop"+ms+ 193 reception_stats+"\n" 194 ) 195 196 else: 197 print("\r \rProbe timed out") 198 199 probes -= 1 200 201 loss = round((1-(replies/sent))*100, 2) 202 print(f"Sent {sent}, received {replies}, packet loss {loss}%") 203 if loss > 0: 204 exit(2) 205 else: 206 exit(0) 207 208 209 def main(): 210 try: 211 parser = argparse.ArgumentParser(description="Reticulum Probe Utility") 212 213 parser.add_argument("--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str) 214 parser.add_argument("-s", "--size", action="store", default=None, help="size of probe packet payload in bytes", type=int) 215 parser.add_argument("-n", "--probes", action="store", default=1, help="number of probes to send", type=int) 216 parser.add_argument("-t", "--timeout", metavar="seconds", action="store", default=None, help="timeout before giving up", type=float) 217 parser.add_argument("-w", "--wait", metavar="seconds", action="store", default=0, help="time between each probe", type=float) 218 parser.add_argument("--version", action="version", version="rnprobe {version}".format(version=__version__)) 219 parser.add_argument("full_name", nargs="?", default=None, help="full destination name in dotted notation", type=str) 220 parser.add_argument("destination_hash", nargs="?", default=None, help="hexadecimal hash of the destination", type=str) 221 222 parser.add_argument('-v', '--verbose', action='count', default=0) 223 224 args = parser.parse_args() 225 226 if args.config: 227 configarg = args.config 228 else: 229 configarg = None 230 231 if not args.destination_hash: 232 print("") 233 parser.print_help() 234 print("") 235 else: 236 program_setup( 237 configdir = configarg, 238 destination_hexhash = args.destination_hash, 239 size = args.size, 240 full_name = args.full_name, 241 verbosity = args.verbose, 242 probes = args.probes, 243 wait = args.wait, 244 timeout = args.timeout, 245 ) 246 247 except KeyboardInterrupt: 248 print("") 249 exit() 250 251 if __name__ == "__main__": 252 main()