/ RNS / Utilities / rnprobe.py
rnprobe.py
  1  #!/usr/bin/env python3
  2  
  3  # Reticulum License
  4  #
  5  # Copyright (c) 2016-2025 Mark Qvist
  6  #
  7  # Permission is hereby granted, free of charge, to any person obtaining a copy
  8  # of this software and associated documentation files (the "Software"), to deal
  9  # in the Software without restriction, including without limitation the rights
 10  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 11  # copies of the Software, and to permit persons to whom the Software is
 12  # furnished to do so, subject to the following conditions:
 13  #
 14  # - The Software shall not be used in any kind of system which includes amongst
 15  #   its functions the ability to purposefully do harm to human beings.
 16  #
 17  # - The Software shall not be used, directly or indirectly, in the creation of
 18  #   an artificial intelligence, machine learning or language model training
 19  #   dataset, including but not limited to any use that contributes to the
 20  #   training or development of such a model or algorithm.
 21  #
 22  # - The above copyright notice and this permission notice shall be included in
 23  #   all copies or substantial portions of the Software.
 24  #
 25  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 26  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 27  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 28  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 29  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 30  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 31  # SOFTWARE.
 32  
 33  import RNS
 34  import os
 35  import sys
 36  import time
 37  import argparse
 38  
 39  from RNS._version import __version__
 40  
 41  DEFAULT_PROBE_SIZE = 16
 42  DEFAULT_TIMEOUT = 12
 43  
 44  def program_setup(configdir, destination_hexhash, size=None, full_name = None, verbosity = 0, timeout=None, wait=0, probes=1):
 45      if size == None: size = DEFAULT_PROBE_SIZE
 46      if full_name == None:
 47          print("The full destination name including application name aspects must be specified for the destination")
 48          exit()
 49      
 50      try:
 51          app_name, aspects = RNS.Destination.app_and_aspects_from_name(full_name)
 52  
 53      except Exception as e:
 54          print(str(e))
 55          exit()
 56  
 57      try:
 58          dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
 59          if len(destination_hexhash) != dest_len:
 60              raise ValueError("Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2))
 61          try:
 62              destination_hash = bytes.fromhex(destination_hexhash)
 63          except Exception as e:
 64              raise ValueError("Invalid destination entered. Check your input.")
 65      except Exception as e:
 66          print(str(e))
 67          exit()
 68  
 69      if verbosity > 0:
 70          more_output = True
 71          verbosity -= 1
 72      else:
 73          more_output = False
 74          verbosity -= 1
 75  
 76  
 77      reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity)
 78  
 79      if not RNS.Transport.has_path(destination_hash):
 80          RNS.Transport.request_path(destination_hash)
 81          print("Path to "+RNS.prettyhexrep(destination_hash)+" requested  ", end=" ")
 82          sys.stdout.flush()
 83  
 84      _timeout = time.time() + (timeout or DEFAULT_TIMEOUT+reticulum.get_first_hop_timeout(destination_hash))
 85      i = 0
 86      syms = "⢄⢂⢁⡁⡈⡐⡠"
 87      while not RNS.Transport.has_path(destination_hash) and not time.time() > _timeout:
 88          time.sleep(0.1)
 89          print(("\b\b"+syms[i]+" "), end="")
 90          sys.stdout.flush()
 91          i = (i+1)%len(syms)
 92  
 93      if time.time() > _timeout:
 94          print("\r                                                          \rPath request timed out")
 95          exit(1)
 96  
 97      server_identity = RNS.Identity.recall(destination_hash)
 98  
 99      request_destination = RNS.Destination(
100          server_identity,
101          RNS.Destination.OUT,
102          RNS.Destination.SINGLE,
103          app_name,
104          *aspects
105      )
106  
107      sent = 0
108      replies = 0
109      while probes:
110  
111          if sent > 0:
112              time.sleep(wait)
113  
114          try:
115              probe = RNS.Packet(request_destination, os.urandom(size))
116              probe.pack()
117          except OSError:
118              print("Error: Probe packet size of "+str(len(probe.raw))+" bytes exceed MTU of "+str(RNS.Reticulum.MTU)+" bytes")
119              exit(3)
120  
121          receipt = probe.send()
122          sent += 1
123  
124          if more_output:
125              nhd = reticulum.get_next_hop(destination_hash)
126              via_str = " via "+RNS.prettyhexrep(nhd) if nhd != None else ""
127              if_str  = " on "+str(reticulum.get_next_hop_if_name(destination_hash)) if reticulum.get_next_hop_if_name(destination_hash) != "None" else ""
128              more = via_str+if_str
129          else:
130              more = ""
131  
132          print("\rSent probe "+str(sent)+" ("+str(size)+" bytes) to "+RNS.prettyhexrep(destination_hash)+more+"  ", end=" ")
133  
134          _timeout = time.time() + (timeout or DEFAULT_TIMEOUT+reticulum.get_first_hop_timeout(destination_hash))
135          i = 0
136          while receipt.status == RNS.PacketReceipt.SENT and not time.time() > _timeout:
137              time.sleep(0.1)
138              print(("\b\b"+syms[i]+" "), end="")
139              sys.stdout.flush()
140              i = (i+1)%len(syms)
141  
142          if time.time() > _timeout:
143              print("\r                                                                \rProbe timed out")
144          
145          else:
146              print("\b\b ")
147              sys.stdout.flush()
148  
149              if receipt.status == RNS.PacketReceipt.DELIVERED:
150                  replies += 1
151                  hops = RNS.Transport.hops_to(destination_hash)
152                  if hops != 1:
153                      ms = "s"
154                  else:
155                      ms = ""
156  
157                  rtt = receipt.get_rtt()
158                  if (rtt >= 1):
159                      rtt = round(rtt, 3)
160                      rttstring = str(rtt)+" seconds"
161                  else:
162                      rtt = round(rtt*1000, 3)
163                      rttstring = str(rtt)+" milliseconds"
164  
165                  reception_stats = ""
166                  if reticulum.is_connected_to_shared_instance:
167                      reception_rssi = reticulum.get_packet_rssi(receipt.proof_packet.packet_hash)
168                      reception_snr  = reticulum.get_packet_snr(receipt.proof_packet.packet_hash)
169                      reception_q    = reticulum.get_packet_q(receipt.proof_packet.packet_hash)
170  
171                      if reception_rssi != None:
172                          reception_stats += " [RSSI "+str(reception_rssi)+" dBm]"
173                      
174                      if reception_snr != None:
175                          reception_stats += " [SNR "+str(reception_snr)+" dB]"
176                      
177                      if reception_q != None:
178                          reception_stats += " [Link Quality "+str(reception_q)+"%]"
179  
180                  else:
181                      if receipt.proof_packet != None:
182                          if receipt.proof_packet.rssi != None:
183                              reception_stats += " [RSSI "+str(receipt.proof_packet.rssi)+" dBm]"
184                          
185                          if receipt.proof_packet.snr != None:
186                              reception_stats += " [SNR "+str(receipt.proof_packet.snr)+" dB]"
187  
188                  print(
189                      "Valid reply from "+
190                      RNS.prettyhexrep(receipt.destination.hash)+
191                      "\nRound-trip time is "+rttstring+
192                      " over "+str(hops)+" hop"+ms+
193                      reception_stats+"\n"
194                  )
195  
196              else:
197                  print("\r                                                          \rProbe timed out")
198  
199          probes -= 1
200  
201      loss = round((1-(replies/sent))*100, 2)
202      print(f"Sent {sent}, received {replies}, packet loss {loss}%")
203      if loss > 0:
204          exit(2)
205      else:
206          exit(0)
207      
208  
209  def main():
210      try:
211          parser = argparse.ArgumentParser(description="Reticulum Probe Utility")
212  
213          parser.add_argument("--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str)
214          parser.add_argument("-s", "--size", action="store", default=None, help="size of probe packet payload in bytes", type=int)
215          parser.add_argument("-n", "--probes", action="store", default=1, help="number of probes to send", type=int)
216          parser.add_argument("-t", "--timeout", metavar="seconds", action="store", default=None, help="timeout before giving up", type=float)
217          parser.add_argument("-w", "--wait", metavar="seconds", action="store", default=0, help="time between each probe", type=float)
218          parser.add_argument("--version", action="version", version="rnprobe {version}".format(version=__version__))
219          parser.add_argument("full_name", nargs="?", default=None, help="full destination name in dotted notation", type=str)
220          parser.add_argument("destination_hash", nargs="?", default=None, help="hexadecimal hash of the destination", type=str)
221  
222          parser.add_argument('-v', '--verbose', action='count', default=0)
223  
224          args = parser.parse_args()
225  
226          if args.config:
227              configarg = args.config
228          else:
229              configarg = None
230  
231          if not args.destination_hash:
232              print("")
233              parser.print_help()
234              print("")
235          else:
236              program_setup(
237                  configdir = configarg,
238                  destination_hexhash = args.destination_hash,
239                  size = args.size,
240                  full_name = args.full_name,
241                  verbosity = args.verbose,
242                  probes = args.probes,
243                  wait = args.wait,
244                  timeout = args.timeout,
245              )
246  
247      except KeyboardInterrupt:
248          print("")
249          exit()
250  
251  if __name__ == "__main__":
252      main()