Discovery.py
1 import os 2 import re 3 import RNS 4 import time 5 import random 6 import threading 7 import ipaddress 8 import subprocess 9 from .vendor import umsgpack as msgpack 10 11 NAME = 0xFF 12 TRANSPORT_ID = 0xFE 13 INTERFACE_TYPE = 0x00 14 TRANSPORT = 0x01 15 REACHABLE_ON = 0x02 16 LATITUDE = 0x03 17 LONGITUDE = 0x04 18 HEIGHT = 0x05 19 PORT = 0x06 20 IFAC_NETNAME = 0x07 21 IFAC_NETKEY = 0x08 22 FREQUENCY = 0x09 23 BANDWIDTH = 0x0A 24 SPREADINGFACTOR = 0x0B 25 CODINGRATE = 0x0C 26 MODULATION = 0x0D 27 CHANNEL = 0x0E 28 29 APP_NAME = "rnstransport" 30 31 class InterfaceAnnouncer(): 32 JOB_INTERVAL = 60 33 DEFAULT_STAMP_VALUE = 14 34 WORKBLOCK_EXPAND_ROUNDS = 20 35 36 DISCOVERABLE_INTERFACE_TYPES = ["BackboneInterface", "TCPServerInterface", "TCPClientInterface", 37 "RNodeInterface", "WeaveInterface", "I2PInterface", "KISSInterface"] 38 39 def __init__(self, owner): 40 import importlib.util 41 if importlib.util.find_spec('LXMF') != None: from LXMF import LXStamper 42 else: 43 RNS.log("Using on-network interface discovery requires the LXMF module to be installed.", RNS.LOG_CRITICAL) 44 RNS.log("You can install it with the command: pip install lxmf", RNS.LOG_CRITICAL) 45 RNS.panic() 46 47 self.owner = owner 48 self.should_run = False 49 self.job_interval = self.JOB_INTERVAL 50 self.stamper = LXStamper 51 self.stamp_cache = {} 52 53 if self.owner.has_network_identity(): identity = self.owner.network_identity 54 else: identity = self.owner.identity 55 56 self.discovery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, 57 APP_NAME, "discovery", "interface") 58 59 def start(self): 60 if not self.should_run: 61 self.should_run = True 62 threading.Thread(target=self.job, daemon=True).start() 63 64 def stop(self): self.should_run = False 65 66 def job(self): 67 while self.should_run: 68 time.sleep(self.job_interval) 69 try: 70 now = time.time() 71 due_interfaces = [i for i in self.owner.interfaces if i.supports_discovery and i.discoverable and now > (i.last_discovery_announce+i.discovery_announce_interval)] 72 due_interfaces.sort(key=lambda i: now-i.last_discovery_announce, reverse=True) 73 74 if len(due_interfaces) > 0: 75 selected_interface = due_interfaces[0] 76 selected_interface.last_discovery_announce = time.time() 77 RNS.log(f"Preparing interface discovery announce for {selected_interface.name}", RNS.LOG_DEBUG) 78 app_data = self.get_interface_announce_data(selected_interface) 79 if not app_data: RNS.log(f"Could not generate interface discovery announce data for {selected_interface.name}", RNS.LOG_ERROR) 80 else: 81 RNS.log(f"Sending interface discovery announce for {selected_interface.name} with {len(app_data)}B payload", RNS.LOG_DEBUG) 82 self.discovery_destination.announce(app_data=app_data) 83 84 except Exception as e: 85 RNS.log(f"Error while preparing interface discovery announces: {e}", RNS.LOG_ERROR) 86 RNS.trace_exception(e) 87 88 def sanitize(self, in_str): 89 sanitized = in_str.replace("\n", "") 90 sanitized = sanitized.replace("\r", "") 91 sanitized = sanitized.strip() 92 return sanitized 93 94 def get_interface_announce_data(self, interface): 95 interface_type = type(interface).__name__ 96 stamp_value = interface.discovery_stamp_value if interface.discovery_stamp_value else self.DEFAULT_STAMP_VALUE 97 98 if not interface_type in self.DISCOVERABLE_INTERFACE_TYPES: return None 99 else: 100 flags = 0x00 101 info = {INTERFACE_TYPE: interface_type, 102 TRANSPORT: RNS.Reticulum.transport_enabled(), 103 TRANSPORT_ID: RNS.Transport.identity.hash, 104 NAME: self.sanitize(interface.discovery_name), 105 LATITUDE: interface.discovery_latitude, 106 LONGITUDE: interface.discovery_longitude, 107 HEIGHT: interface.discovery_height} 108 109 reachable_on = self.sanitize(interface.reachable_on) 110 if not RNS.vendor.platformutils.is_windows(): 111 try: 112 exec_path = os.path.expanduser(reachable_on) 113 if os.path.isfile(exec_path) and os.access(exec_path, os.X_OK): 114 RNS.log(f"Evaluating reachable_on from executable at {exec_path}", RNS.LOG_DEBUG) 115 exec_result = subprocess.run([exec_path], stdout=subprocess.PIPE) 116 exec_stdout = exec_result.stdout.decode("utf-8") 117 if exec_result.returncode != 0: raise ValueError("Non-zero exit code from subprocess") 118 reachable_on = self.sanitize(exec_stdout) 119 if not (is_ip_address(reachable_on) or is_hostname(reachable_on)): 120 raise ValueError(f"Valid IP address or hostname was not found in external script output \"{reachable_on}\"") 121 122 except Exception as e: 123 RNS.log(f"Error while getting reachable_on from executable at {interface.reachable_on}: {e}", RNS.LOG_ERROR) 124 RNS.log(f"Aborting discovery announce", RNS.LOG_ERROR) 125 return None 126 127 if not (is_ip_address(reachable_on) or is_hostname(reachable_on)): 128 RNS.log(f"The configured reachable_on parameter \"{reachable_on}\" for {interface} is not a valid IP address or hostname", RNS.LOG_ERROR) 129 RNS.log(f"Aborting discovery announce", RNS.LOG_ERROR) 130 return None 131 132 if interface_type in ["BackboneInterface", "TCPServerInterface"]: 133 info[REACHABLE_ON] = reachable_on 134 info[PORT] = interface.bind_port 135 136 if interface_type == "I2PInterface" and interface.connectable and interface.b32: 137 info[REACHABLE_ON] = interface.b32 138 139 if interface_type == "RNodeInterface": 140 info[FREQUENCY] = interface.frequency 141 info[BANDWIDTH] = interface.bandwidth 142 info[SPREADINGFACTOR] = interface.sf 143 info[CODINGRATE] = interface.cr 144 145 if interface_type == "WeaveInterface": 146 info[FREQUENCY] = interface.discovery_frequency 147 info[BANDWIDTH] = interface.discovery_bandwidth 148 info[CHANNEL] = interface.discovery_channel 149 info[MODULATION] = interface.discovery_modulation 150 151 if interface_type == "KISSInterface" or (interface_type == "TCPClientInterface" and interface.kiss_framing): 152 info[INTERFACE_TYPE] = "KISSInterface" 153 info[FREQUENCY] = interface.discovery_frequency 154 info[BANDWIDTH] = interface.discovery_bandwidth 155 info[MODULATION] = self.sanitize(interface.discovery_modulation) 156 157 if interface.discovery_publish_ifac == True: 158 info[IFAC_NETNAME] = self.sanitize(interface.ifac_netname) 159 info[IFAC_NETKEY] = self.sanitize(interface.ifac_netkey) 160 161 packed = msgpack.packb(info) 162 infohash = RNS.Identity.full_hash(packed) 163 164 if infohash in self.stamp_cache: stamp = self.stamp_cache[infohash] 165 else: stamp, v = self.stamper.generate_stamp(infohash, stamp_cost=stamp_value, expand_rounds=self.WORKBLOCK_EXPAND_ROUNDS) 166 if not stamp: return None 167 else: self.stamp_cache[infohash] = stamp 168 169 if interface.discovery_encrypt: 170 flags |= InterfaceAnnounceHandler.FLAG_ENCRYPTED 171 if not self.owner.has_network_identity(): 172 RNS.log(f"Discovery encryption requested for {interface}, but no network identity configured. Aborting discovery announce.", RNS.LOG_ERROR) 173 return None 174 175 else: payload = self.owner.network_identity.encrypt(packed+stamp) 176 177 else: payload = packed+stamp 178 179 return bytes([flags])+payload 180 181 class InterfaceAnnounceHandler: 182 FLAG_SIGNED = 0b00000001 183 FLAG_ENCRYPTED = 0b00000010 184 185 def __init__(self, required_value=InterfaceAnnouncer.DEFAULT_STAMP_VALUE, callback=None): 186 import importlib.util 187 if importlib.util.find_spec('LXMF') != None: from LXMF import LXStamper 188 else: 189 RNS.log("Using on-network interface discovery requires the LXMF module to be installed.", RNS.LOG_CRITICAL) 190 RNS.log("You can install it with the command: pip install lxmf", RNS.LOG_CRITICAL) 191 RNS.panic() 192 193 self.aspect_filter = APP_NAME+".discovery.interface" 194 self.required_value = required_value 195 self.callback = callback 196 self.stamper = LXStamper 197 198 def received_announce(self, destination_hash, announced_identity, app_data): 199 try: 200 discovery_sources = RNS.Reticulum.interface_discovery_sources() 201 if discovery_sources and not announced_identity.hash in discovery_sources: 202 RNS.log(f"Interface discovered from non-authorized network identity {RNS.prettyhexrep(announced_identity.hash)}, ignoring", RNS.LOG_DEBUG) 203 return 204 205 if app_data and len(app_data) > self.stamper.STAMP_SIZE+1: 206 flags = app_data[0] 207 app_data = app_data[1:] 208 signed = flags & self.FLAG_SIGNED 209 encrypted = flags & self.FLAG_ENCRYPTED 210 211 if encrypted: 212 if not RNS.Transport.has_network_identity(): return 213 app_data = RNS.Transport.network_identity.decrypt(app_data) 214 if not app_data: return 215 216 stamp = app_data[-self.stamper.STAMP_SIZE:] 217 packed = app_data[:-self.stamper.STAMP_SIZE] 218 infohash = RNS.Identity.full_hash(packed) 219 workblock = self.stamper.stamp_workblock(infohash, expand_rounds=InterfaceAnnouncer.WORKBLOCK_EXPAND_ROUNDS) 220 value = self.stamper.stamp_value(workblock, stamp) 221 valid = self.stamper.stamp_valid(stamp, self.required_value, workblock) 222 223 if not valid: 224 RNS.log(f"Ignored discovered interface with invalid stamp", RNS.LOG_DEBUG) 225 return 226 227 if value < self.required_value: RNS.log(f"Ignored discovered interface with stamp value {value}", RNS.LOG_DEBUG) 228 else: 229 info = None 230 unpacked = msgpack.unpackb(packed) 231 if INTERFACE_TYPE in unpacked: 232 interface_type = unpacked[INTERFACE_TYPE] 233 info = {"type": interface_type, 234 "transport": unpacked[TRANSPORT], 235 "name": unpacked[NAME] or f"Discovered {interface_type}", 236 "received": time.time(), 237 "stamp": stamp, 238 "value": value, 239 "transport_id": RNS.hexrep(unpacked[TRANSPORT_ID], delimit=False), 240 "network_id": RNS.hexrep(announced_identity.hash, delimit=False), 241 "hops": RNS.Transport.hops_to(destination_hash), 242 "latitude": unpacked[LATITUDE], 243 "longitude": unpacked[LONGITUDE], 244 "height": unpacked[HEIGHT]} 245 246 if REACHABLE_ON in unpacked: 247 if not (is_ip_address(unpacked[REACHABLE_ON]) or is_hostname(unpacked[REACHABLE_ON])): 248 raise ValueError("Invalid data in reachable_on field of announce") 249 250 if IFAC_NETNAME in unpacked: info["ifac_netname"] = unpacked[IFAC_NETNAME] 251 if IFAC_NETKEY in unpacked: info["ifac_netkey"] = unpacked[IFAC_NETKEY] 252 253 if interface_type in ["BackboneInterface", "TCPServerInterface"]: 254 backbone_support = not RNS.vendor.platformutils.is_windows() 255 info["reachable_on"] = unpacked[REACHABLE_ON] 256 info["port"] = unpacked[PORT] 257 connection_interface = "BackboneInterface" if backbone_support else "TCPClientInterface" 258 remote_str = "remote" if backbone_support else "target_host" 259 cfg_name = info["name"] 260 cfg_remote = info["reachable_on"] 261 cfg_port = info["port"] 262 cfg_identity = info["transport_id"] 263 cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None 264 cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None 265 cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else "" 266 cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else "" 267 cfg_identity_str = f"\n transport_identity = {cfg_identity}" 268 info["config_entry"] = f"[[{cfg_name}]]\n type = {connection_interface}\n enabled = yes\n {remote_str} = {cfg_remote}\n target_port = {cfg_port}{cfg_identity_str}{cfg_netname_str}{cfg_netkey_str}" 269 270 if interface_type == "I2PInterface": 271 info["reachable_on"] = unpacked[REACHABLE_ON] 272 cfg_name = info["name"] 273 cfg_remote = info["reachable_on"] 274 cfg_identity = info["transport_id"] 275 cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None 276 cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None 277 cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else "" 278 cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else "" 279 cfg_identity_str = f"\n transport_identity = {cfg_identity}" 280 info["config_entry"] = f"[[{cfg_name}]]\n type = I2PInterface\n enabled = yes\n peers = {cfg_remote}{cfg_identity_str}{cfg_netname_str}{cfg_netkey_str}" 281 282 if interface_type == "RNodeInterface": 283 info["frequency"] = unpacked[FREQUENCY] 284 info["bandwidth"] = unpacked[BANDWIDTH] 285 info["sf"] = unpacked[SPREADINGFACTOR] 286 info["cr"] = unpacked[CODINGRATE] 287 cfg_name = info["name"] 288 cfg_frequency = info["frequency"] 289 cfg_bandwidth = info["bandwidth"] 290 cfg_sf = info["sf"] 291 cfg_cr = info["cr"] 292 cfg_identity = info["transport_id"] 293 cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None 294 cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None 295 cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else "" 296 cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else "" 297 cfg_identity_str = f"\n transport_identity = {cfg_identity}" 298 info["config_entry"] = f"[[{cfg_name}]]\n type = RNodeInterface\n enabled = yes\n port = \n frequency = {cfg_frequency}\n bandwidth = {cfg_bandwidth}\n spreadingfactor = {cfg_sf}\n codingrate = {cfg_cr}\n txpower = {cfg_netname_str}{cfg_netkey_str}" 299 300 if interface_type == "WeaveInterface": 301 info["frequency"] = unpacked[FREQUENCY] 302 info["bandwidth"] = unpacked[BANDWIDTH] 303 info["channel"] = unpacked[CHANNEL] 304 info["modulation"] = unpacked[MODULATION] 305 cfg_name = info["name"] 306 cfg_identity = info["transport_id"] 307 cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None 308 cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None 309 cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else "" 310 cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else "" 311 cfg_identity_str = f"\n transport_identity = {cfg_identity}" 312 info["config_entry"] = f"[[{cfg_name}]]\n type = WeaveInterface\n enabled = yes\n port = {cfg_netname_str}{cfg_netkey_str}" 313 314 if interface_type == "KISSInterface": 315 info["frequency"] = unpacked[FREQUENCY] 316 info["bandwidth"] = unpacked[BANDWIDTH] 317 info["modulation"] = unpacked[MODULATION] 318 cfg_name = info["name"] 319 cfg_frequency = info["frequency"] 320 cfg_bandwidth = info["bandwidth"] 321 cfg_modulation = info["modulation"] 322 cfg_identity = info["transport_id"] 323 cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None 324 cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None 325 cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else "" 326 cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else "" 327 cfg_identity_str = f"\n transport_identity = {cfg_identity}" 328 info["config_entry"] = f"[[{cfg_name}]]\n type = KISSInterface\n enabled = yes\n port = \n # Frequency: {cfg_frequency}\n # Bandwidth: {cfg_bandwidth}\n # Modulation: {cfg_modulation}{cfg_identity_str}{cfg_netname_str}{cfg_netkey_str}" 329 330 discovery_hash_material = info["transport_id"]+info["name"] 331 info["discovery_hash"] = RNS.Identity.full_hash(discovery_hash_material.encode("utf-8")) 332 333 if self.callback and callable(self.callback): self.callback(info) 334 335 except Exception as e: 336 RNS.log(f"An error occurred while trying to decode discovered interface. The contained exception was: {e}", RNS.LOG_DEBUG) 337 338 class InterfaceDiscovery(): 339 THRESHOLD_UNKNOWN = 24*60*60 340 THRESHOLD_STALE = 3*24*60*60 341 THRESHOLD_REMOVE = 7*24*60*60 342 343 MONITOR_INTERVAL = 5 344 DETACH_THRESHOLD = 12 345 346 STATUS_STALE = 0 347 STATUS_UNKNOWN = 100 348 STATUS_AVAILABLE = 1000 349 STATUS_CODE_MAP = {"available": STATUS_AVAILABLE, "unknown": STATUS_UNKNOWN, "stale": STATUS_STALE} 350 AUTOCONNECT_TYPES = ["BackboneInterface", "TCPServerInterface"] 351 352 def __init__(self, required_value=InterfaceAnnouncer.DEFAULT_STAMP_VALUE, callback=None, discover_interfaces=True): 353 if not required_value: required_value = InterfaceAnnouncer.DEFAULT_STAMP_VALUE 354 355 self.required_value = required_value 356 self.discovery_callback = callback 357 self.rns_instance = RNS.Reticulum.get_instance() 358 self.monitored_interfaces = [] 359 self.monitoring_autoconnects = False 360 self.monitor_interval = self.MONITOR_INTERVAL 361 self.detach_threshold = self.DETACH_THRESHOLD 362 self.initial_autoconnect_ran = False 363 364 if not self.rns_instance: raise SystemError("Attempt to start interface discovery listener without an active RNS instance") 365 self.storagepath = os.path.join(RNS.Reticulum.storagepath, "discovery", "interfaces") 366 if not os.path.isdir(self.storagepath): os.makedirs(self.storagepath) 367 368 if discover_interfaces: 369 self.handler = InterfaceAnnounceHandler(callback=self.interface_discovered, required_value=self.required_value) 370 RNS.Transport.register_announce_handler(self.handler) 371 threading.Thread(target=self.connect_discovered, daemon=True).start() 372 373 def list_discovered_interfaces(self, only_available=False, only_transport=False): 374 now = time.time() 375 discovered_interfaces = [] 376 discovery_sources = RNS.Reticulum.interface_discovery_sources() 377 for filename in os.listdir(self.storagepath): 378 try: 379 filepath = os.path.join(self.storagepath, filename) 380 with open(filepath, "rb") as f: info = msgpack.unpackb(f.read()) 381 should_remove = False 382 heard_delta = now-info["last_heard"] 383 384 if heard_delta > self.THRESHOLD_REMOVE: should_remove = True 385 elif discovery_sources and not "network_id" in info: should_remove = True 386 elif discovery_sources and not bytes.fromhex(info["network_id"]) in discovery_sources: should_remove = True 387 elif "reachable_on" in info: 388 if not (is_ip_address(info["reachable_on"]) or is_hostname(info["reachable_on"])): should_remove = True 389 390 if should_remove: 391 os.unlink(filepath) 392 continue 393 394 else: 395 if heard_delta > self.THRESHOLD_STALE: info["status"] = "stale" 396 elif heard_delta > self.THRESHOLD_UNKNOWN: info["status"] = "unknown" 397 else: info["status"] = "available" 398 399 info["status_code"] = self.STATUS_CODE_MAP[info["status"]] 400 if not only_available and not only_transport: discovered_interfaces.append(info) 401 else: 402 should_append = True 403 status = info["status"] 404 transport = info["transport"] 405 if only_available and status != "available": should_append = False 406 if only_transport and not transport: should_append = False 407 if should_append: discovered_interfaces.append(info) 408 409 except Exception as e: 410 RNS.log(f"Error while loading discovered interface data: {e}", RNS.LOG_ERROR) 411 RNS.log(f"The interface data file {os.path.join(self.storagepath, filename)} may be corrupt", RNS.LOG_ERROR) 412 RNS.trace_exception(e) 413 414 discovered_interfaces.sort(key=lambda info: (info["status_code"], info["value"], info["last_heard"]), reverse=True) 415 return discovered_interfaces 416 417 def interface_discovered(self, info): 418 try: 419 name = info["name"] 420 value = info["value"] 421 interface_type = info["type"] 422 discovery_hash = info["discovery_hash"] 423 hops = info["hops"]; ms = "" if hops == 1 else "s" 424 filename = RNS.hexrep(discovery_hash, delimit=False) 425 filepath = os.path.join(self.storagepath, filename) 426 RNS.log(f"Discovered {interface_type} {hops} hop{ms} away with stamp value {value}: {name}", RNS.LOG_DEBUG) 427 if not os.path.isfile(filepath): 428 try: 429 with open(filepath, "wb") as f: 430 info["discovered"] = info["received"] 431 info["last_heard"] = info["received"] 432 info["heard_count"] = 0 433 f.write(msgpack.packb(info)) 434 435 except Exception as e: 436 RNS.log(f"Error while persisting discovered interface data: {e}", RNS.LOG_ERROR) 437 RNS.trace_exception(e) 438 return 439 440 else: 441 discovered = None 442 heard_count = None 443 try: 444 with open(filepath, "rb") as f: 445 last_info = msgpack.unpackb(f.read()) 446 discovered = last_info["discovered"] 447 heard_count = last_info["heard_count"] 448 449 if discovered == None: discovered = info["discovered"] 450 if heard_count == None: heard_count = 0 451 452 with open(filepath, "wb") as f: 453 info["discovered"] = discovered 454 info["last_heard"] = info["received"] 455 info["heard_count"] = heard_count+1 456 f.write(msgpack.packb(info)) 457 458 except Exception as e: 459 RNS.log(f"Error while persisting discovered interface data: {e}", RNS.LOG_ERROR) 460 RNS.trace_exception(e) 461 return 462 463 except Exception as e: 464 RNS.log(f"Error processing discovered interface data: {e}", RNS.LOG_ERROR) 465 RNS.trace_exception(e) 466 return 467 468 self.autoconnect(info) 469 470 try: 471 if self.discovery_callback and callable(self.discovery_callback): self.discovery_callback(info) 472 except Exception as e: RNS.log(f"Error while processing external interface discovery callback: {e}", RNS.LOG_ERROR) 473 474 def monitor_interface(self, interface): 475 if not interface in self.monitored_interfaces: 476 self.monitored_interfaces.append(interface) 477 478 if not self.monitoring_autoconnects: 479 self.monitoring_autoconnects = True 480 threading.Thread(target=self.__monitor_job, daemon=True).start() 481 482 def __monitor_job(self): 483 while self.monitoring_autoconnects: 484 time.sleep(self.monitor_interval) 485 detached_interfaces = [] 486 online_interfaces = 0 487 autoconnected_interfaces = self.autoconnect_count() 488 for interface in self.monitored_interfaces: 489 try: 490 if interface.online: 491 online_interfaces += 1 492 if hasattr(interface, "autoconnect_down") and interface.autoconnect_down != None: 493 RNS.log(f"Auto-discovered interface {interface} reconnected") 494 interface.autoconnect_down = None 495 496 else: 497 if not hasattr(interface, "autoconnect_down") or interface.autoconnect_down == None: 498 RNS.log(f"Auto-discovered interface {interface} disconnected", RNS.LOG_DEBUG) 499 interface.autoconnect_down = time.time() 500 501 else: 502 down_for = time.time()-interface.autoconnect_down 503 if down_for >= self.detach_threshold: 504 RNS.log(f"Auto-discovered interface {interface} has been down for {RNS.prettytime(down_for)}, detaching", RNS.LOG_DEBUG) 505 detached_interfaces.append(interface) 506 507 except Exception as e: 508 RNS.log(f"Error while checking auto-connected interface state for {interface}: {e}", RNS.LOG_ERROR) 509 510 max_autoconnected_interfaces = RNS.Reticulum.max_autoconnected_interfaces() 511 free_slots = max(0, max_autoconnected_interfaces - autoconnected_interfaces) 512 reserved_slots = max_autoconnected_interfaces//4 513 514 if online_interfaces >= max_autoconnected_interfaces: 515 for interface in RNS.Transport.interfaces: 516 if hasattr(interface, "bootstrap_only") and interface.bootstrap_only == True: 517 RNS.log(f"Tearing down bootstrap-only {interface} since target connected auto-discovered interface count has been reached", RNS.LOG_INFO) 518 if not interface in detached_interfaces: detached_interfaces.append(interface) 519 520 if online_interfaces == 0: 521 if self.bootstrap_interface_count() == 0: 522 RNS.log(f"No auto-discovered interfaces connected, re-enabling bootstrap interfaces", RNS.LOG_NOTICE) 523 for config in RNS.Reticulum.get_instance().bootstrap_configs: 524 RNS.Reticulum.get_instance()._synthesize_interface(config, config["name"]) 525 526 if self.initial_autoconnect_ran and free_slots > reserved_slots: 527 candidate_interfaces = self.list_discovered_interfaces(only_available=True, only_transport=True) 528 if len(candidate_interfaces) > 0: 529 random.shuffle(candidate_interfaces) 530 selected_interface = candidate_interfaces[0] 531 if not self.interface_exists(selected_interface): self.autoconnect(selected_interface) 532 533 for interface in detached_interfaces: 534 try: self.teardown_interface(interface) 535 except Exception as e: 536 RNS.log(f"Error while de-registering auto-connected interface from transport: {e}", RNS.LOG_ERROR) 537 538 def teardown_interface(self, interface): 539 interface.detach() 540 if interface in RNS.Transport.interfaces: RNS.Transport.interfaces.remove(interface) 541 if interface in self.monitored_interfaces: self.monitored_interfaces.remove(interface) 542 543 def autoconnect_count(self): 544 return len([i for i in RNS.Transport.interfaces if hasattr(i, "autoconnect_hash")]) 545 546 def bootstrap_interface_count(self): 547 return len([i for i in RNS.Transport.interfaces if hasattr(i, "bootstrap_only") and i.bootstrap_only == True]) 548 549 def connect_discovered(self): 550 if RNS.Reticulum.should_autoconnect_discovered_interfaces(): 551 try: 552 discovered_interfaces = self.list_discovered_interfaces(only_transport=True) 553 for info in discovered_interfaces: 554 if self.autoconnect_count() >= RNS.Reticulum.max_autoconnected_interfaces(): break 555 self.autoconnect(info) 556 557 self.initial_autoconnect_ran = True 558 559 except Exception as e: 560 RNS.log(f"Error while reconnecting discovered interfaces: {e}", RNS.LOG_ERROR) 561 562 def endpoint_hash(self, info): 563 endpoint_specifier = "" 564 if "reachable_on" in info: endpoint_specifier += str(info["reachable_on"]) 565 if "port" in info: endpoint_specifier += ":"+str(info["port"]) 566 endpoint_hash = RNS.Identity.full_hash(endpoint_specifier.encode("utf-8")) 567 return endpoint_hash 568 569 def interface_exists(self, info): 570 exists = False 571 for interface in RNS.Transport.interfaces: 572 if hasattr(interface, "autoconnect_hash") and interface.autoconnect_hash == self.endpoint_hash(info): 573 exists = True 574 break 575 576 else: 577 dest_match = "reachable_on" in info and hasattr(interface, "target_ip") and interface.target_ip == info["reachable_on"] 578 port_match = not "port" in info or (hasattr(interface, "target_port") and "port" in info and interface.target_port == info["port"]) 579 b32d_match = "reachable_on" in info and hasattr(interface, "b32") and interface.b32 == info["reachable_on"] 580 581 if (dest_match and port_match) or b32d_match: 582 exists = True 583 break 584 585 return exists 586 587 def autoconnect(self, info): 588 try: 589 if RNS.Reticulum.should_autoconnect_discovered_interfaces(): 590 autoconnected_count = self.autoconnect_count() 591 if autoconnected_count < RNS.Reticulum.max_autoconnected_interfaces(): 592 interface_type = info["type"] 593 if interface_type in self.AUTOCONNECT_TYPES: 594 endpoint_hash = self.endpoint_hash(info) 595 exists = self.interface_exists(info) 596 597 if exists: RNS.log(f"Discovered {interface_type} already exists, not auto-connecting", RNS.LOG_DEBUG) 598 else: 599 if interface_type == "TCPClientInterface": 600 RNS.log(f"Your operating system does not support the Backbone interface type, and must degrade to using TCPClientInterface instead", RNS.LOG_WARNING) 601 RNS.log(f"Auto-connecting discovered TCPClient interfaces is not yet implemented, aborting auto-connect", RNS.LOG_WARNING) 602 RNS.log(f"You can obtain the configuration entry and add this interface manually instead using rnstatus -D", RNS.LOG_WARNING) 603 return 604 605 if interface_type == "I2PInterface": 606 RNS.log(f"Auto-connecting discovered I2P interfaces is not yet implemented, aborting auto-connect", RNS.LOG_WARNING) 607 RNS.log(f"You can obtain the configuration entry and add this interface manually instead using rnstatus -D", RNS.LOG_WARNING) 608 return 609 610 interface_name = info["name"] 611 RNS.log(f"Auto-connecting discovered {interface_type} {interface_name}") 612 config_entry = info["config_entry"] 613 interface_config = {} 614 interface_config["name"] = f"{interface_name}" 615 ifac_netname = info["ifac_netname"] if "ifac_netname" in info else None 616 ifac_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None 617 interface = None 618 619 if interface_type == "BackboneInterface": 620 from RNS.Interfaces import BackboneInterface 621 interface_config["target_host"] = info["reachable_on"] 622 interface_config["target_port"] = info["port"] 623 interface = BackboneInterface.BackboneClientInterface(RNS.Transport, interface_config) 624 625 if interface: 626 interface.autoconnect_hash = endpoint_hash 627 interface.autoconnect_source = info["network_id"] 628 RNS.Reticulum.get_instance()._add_interface(interface, ifac_netname=ifac_netname, ifac_netkey=ifac_netkey, configured_bitrate=5E6) 629 self.monitor_interface(interface) 630 631 except Exception as e: 632 RNS.log(f"Error while auto-connecting discovered interface: {e}", RNS.LOG_ERROR) 633 RNS.trace_exception(e) 634 635 class BlackholeUpdater(): 636 INITIAL_WAIT = 20 637 JOB_INTERVAL = 60 638 UPDATE_INTERVAL = 1*60*60 639 SOURCE_TIMEOUT = 25 640 641 def __init__(self): 642 self.last_updates = {} 643 self.should_run = False 644 self.job_interval = self.JOB_INTERVAL 645 self.update_lock = threading.Lock() 646 647 def start(self): 648 if not self.should_run: 649 source_count = len(RNS.Reticulum.blackhole_sources()) 650 ms = "" if source_count == 1 else "s" 651 RNS.log(f"Starting blackhole updater with {source_count} source{ms}", RNS.LOG_DEBUG) 652 self.should_run = True 653 threading.Thread(target=self.job, daemon=True).start() 654 655 def stop(self): self.should_run = False 656 657 def update_link_established(self, link): 658 remote_identity = link.get_remote_identity() 659 RNS.log(f"Link established for blackhole list update from {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG) 660 receipt = link.request("/list") 661 while not receipt.concluded(): time.sleep(0.2) 662 response = receipt.get_response() 663 link.teardown() 664 665 if type(response) == dict: blackhole_list = response 666 else: blackhole_list = None 667 668 if blackhole_list: 669 added = 0 670 for identity_hash in blackhole_list: 671 entry = blackhole_list[identity_hash] 672 if not identity_hash in RNS.Transport.blackholed_identities: 673 RNS.Transport.blackholed_identities[identity_hash] = entry 674 added += 1 675 676 if added > 0: 677 spec = "identity" if added == 1 else "identities" 678 RNS.log(f"Added {added} blackholed {spec} from {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG) 679 680 try: 681 sourcelistpath = os.path.join(RNS.Reticulum.blackholepath, RNS.hexrep(remote_identity.hash, delimit=False)) 682 tmppath = f"{sourcelistpath}.tmp" 683 with open(tmppath, "wb") as f: f.write(msgpack.packb(blackhole_list)) 684 if os.path.isfile(sourcelistpath): os.unlink(sourcelistpath) 685 os.rename(tmppath, sourcelistpath) 686 687 except Exception as e: 688 RNS.log(f"Error while persisting blackhole list from {RNS.prettyhexrep(remote_identity.hash)}: {e}", RNS.LOG_ERROR) 689 690 RNS.log(f"Blackhole list update from {RNS.prettyhexrep(remote_identity.hash)} completed", RNS.LOG_DEBUG) 691 692 def job(self): 693 time.sleep(self.INITIAL_WAIT) 694 while self.should_run: 695 try: 696 now = time.time() 697 for identity_hash in RNS.Reticulum.blackhole_sources(): 698 if identity_hash in self.last_updates: last_update = self.last_updates[identity_hash] 699 else: last_update = 0 700 701 if now > last_update+self.UPDATE_INTERVAL: 702 try: 703 destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.info.blackhole", identity_hash) 704 RNS.log(f"Attempting blackhole list update from {RNS.prettyhexrep(identity_hash)}...", RNS.LOG_DEBUG) 705 if not RNS.Transport.await_path(destination_hash): RNS.log(f"No path available for blackhole list update from {RNS.prettyhexrep(identity_hash)}, retrying later", RNS.LOG_VERBOSE) 706 else: 707 remote_identity = RNS.Identity.recall(destination_hash) 708 destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "rnstransport", "info", "blackhole") 709 RNS.Link(destination, established_callback=self.update_link_established) 710 self.last_updates[identity_hash] = time.time() 711 712 except Exception as e: 713 RNS.log(f"Error while establishing link for blackhole list update from {RNS.prettyhexrep(identity_hash)}: {e}", RNS.LOG_ERROR) 714 715 except Exception as e: 716 RNS.log(f"Error in blackhole list updater job: {e}", RNS.LOG_ERROR) 717 RNS.trace_exception(e) 718 719 time.sleep(self.job_interval) 720 721 def is_ip_address(address_string): 722 try: 723 ipaddress.ip_address(address_string) 724 return True 725 except: return False 726 727 def is_hostname(hostname): 728 if hostname[-1] == ".": hostname = hostname[:-1] 729 if len(hostname) > 253: return False 730 components = hostname.split(".") 731 if re.match(r"[0-9]+$", components[-1]): return False 732 allowed = re.compile(r"(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE) 733 return all(allowed.match(label) for label in components)