Interface.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import RNS 32 import time 33 import threading 34 from collections import deque 35 from RNS.vendor.configobj import ConfigObj 36 37 class Interface: 38 IN = False 39 OUT = False 40 FWD = False 41 RPT = False 42 name = None 43 44 # Interface mode definitions 45 MODE_FULL = 0x01 46 MODE_POINT_TO_POINT = 0x02 47 MODE_ACCESS_POINT = 0x03 48 MODE_ROAMING = 0x04 49 MODE_BOUNDARY = 0x05 50 MODE_GATEWAY = 0x06 51 52 # Which interface modes a Transport Node should 53 # actively discover paths for. 54 DISCOVER_PATHS_FOR = [MODE_ACCESS_POINT, MODE_GATEWAY, MODE_ROAMING] 55 56 # How many samples to use for announce 57 # frequency calculations 58 IA_FREQ_SAMPLES = 6 59 OA_FREQ_SAMPLES = 6 60 61 # Maximum amount of ingress limited announces 62 # to hold at any given time. 63 MAX_HELD_ANNOUNCES = 256 64 65 # How long a spawned interface will be 66 # considered to be newly created. Two 67 # hours by default. 68 IC_NEW_TIME = 2*60*60 69 IC_BURST_FREQ_NEW = 3.5 70 IC_BURST_FREQ = 12 71 IC_BURST_HOLD = 1*60 72 IC_BURST_PENALTY = 5*60 73 IC_HELD_RELEASE_INTERVAL = 30 74 75 AUTOCONFIGURE_MTU = False 76 FIXED_MTU = False 77 78 def __init__(self): 79 self.rxb = 0 80 self.txb = 0 81 self.created = time.time() 82 self.detached = False 83 self.online = False 84 self.bitrate = 62500 85 self.HW_MTU = None 86 87 self.supports_discovery = False 88 self.discoverable = False 89 self.last_discovery_announce = 0 90 self.bootstrap_only = False 91 self.parent_interface = None 92 self.spawned_interfaces = None 93 self.tunnel_id = None 94 self.ingress_control = True 95 self.ic_max_held_announces = Interface.MAX_HELD_ANNOUNCES 96 self.ic_burst_hold = Interface.IC_BURST_HOLD 97 self.ic_burst_active = False 98 self.ic_burst_activated = 0 99 self.ic_held_release = 0 100 self.ic_burst_freq_new = Interface.IC_BURST_FREQ_NEW 101 self.ic_burst_freq = Interface.IC_BURST_FREQ 102 self.ic_new_time = Interface.IC_NEW_TIME 103 self.ic_burst_penalty = Interface.IC_BURST_PENALTY 104 self.ic_held_release_interval = Interface.IC_HELD_RELEASE_INTERVAL 105 self.held_announces = {} 106 107 self.ia_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES) 108 self.oa_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES) 109 110 def get_hash(self): 111 return RNS.Identity.full_hash(str(self).encode("utf-8")) 112 113 # This is a generic function for determining when an interface 114 # should activate ingress limiting. Since this can vary for 115 # different interface types, this function should be overwritten 116 # in case a particular interface requires a different approach. 117 def should_ingress_limit(self): 118 if self.ingress_control: 119 freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq 120 ia_freq = self.incoming_announce_frequency() 121 122 if self.ic_burst_active: 123 if ia_freq < freq_threshold and time.time() > self.ic_burst_activated+self.ic_burst_hold: 124 self.ic_burst_active = False 125 self.ic_held_release = time.time() + self.ic_burst_penalty 126 return True 127 128 else: 129 if ia_freq > freq_threshold: 130 self.ic_burst_active = True 131 self.ic_burst_activated = time.time() 132 return True 133 134 else: 135 return False 136 137 else: 138 return False 139 140 def optimise_mtu(self): 141 if self.AUTOCONFIGURE_MTU: 142 if self.bitrate >= 1_000_000_000: 143 self.HW_MTU = 524288 144 elif self.bitrate > 750_000_000: 145 self.HW_MTU = 262144 146 elif self.bitrate > 400_000_000: 147 self.HW_MTU = 131072 148 elif self.bitrate > 200_000_000: 149 self.HW_MTU = 65536 150 elif self.bitrate > 100_000_000: 151 self.HW_MTU = 32768 152 elif self.bitrate > 10_000_000: 153 self.HW_MTU = 16384 154 elif self.bitrate > 5_000_000: 155 self.HW_MTU = 8192 156 elif self.bitrate > 2_000_000: 157 self.HW_MTU = 4096 158 elif self.bitrate > 1_000_000: 159 self.HW_MTU = 2048 160 elif self.bitrate > 62_500: 161 self.HW_MTU = 1024 162 else: 163 self.HW_MTU = None 164 165 RNS.log(f"{self} hardware MTU set to {self.HW_MTU}", RNS.LOG_DEBUG) # TODO: Remove debug 166 167 def age(self): 168 return time.time()-self.created 169 170 def hold_announce(self, announce_packet): 171 if announce_packet.destination_hash in self.held_announces: 172 self.held_announces[announce_packet.destination_hash] = announce_packet 173 elif not len(self.held_announces) >= self.ic_max_held_announces: 174 self.held_announces[announce_packet.destination_hash] = announce_packet 175 176 def process_held_announces(self): 177 try: 178 if not self.should_ingress_limit() and len(self.held_announces) > 0 and time.time() > self.ic_held_release: 179 freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq 180 ia_freq = self.incoming_announce_frequency() 181 if ia_freq < freq_threshold: 182 selected_announce_packet = None 183 min_hops = RNS.Transport.PATHFINDER_M 184 for destination_hash in self.held_announces: 185 announce_packet = self.held_announces[destination_hash] 186 if announce_packet.hops < min_hops: 187 min_hops = announce_packet.hops 188 selected_announce_packet = announce_packet 189 190 if selected_announce_packet != None: 191 RNS.log("Releasing held announce packet "+str(selected_announce_packet)+" from "+str(self), RNS.LOG_EXTREME) 192 self.ic_held_release = time.time() + self.ic_held_release_interval 193 self.held_announces.pop(selected_announce_packet.destination_hash) 194 def release(): 195 RNS.Transport.inbound(selected_announce_packet.raw, selected_announce_packet.receiving_interface) 196 threading.Thread(target=release, daemon=True).start() 197 198 except Exception as e: 199 RNS.log("An error occurred while processing held announces for "+str(self), RNS.LOG_ERROR) 200 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 201 202 def received_announce(self, from_spawned=False): 203 self.ia_freq_deque.append(time.time()) 204 if hasattr(self, "parent_interface") and self.parent_interface != None: 205 self.parent_interface.received_announce(from_spawned=True) 206 207 def sent_announce(self, from_spawned=False): 208 self.oa_freq_deque.append(time.time()) 209 if hasattr(self, "parent_interface") and self.parent_interface != None: 210 self.parent_interface.sent_announce(from_spawned=True) 211 212 def incoming_announce_frequency(self): 213 if not len(self.ia_freq_deque) > 1: 214 return 0 215 else: 216 dq_len = len(self.ia_freq_deque) 217 delta_sum = 0 218 for i in range(1,dq_len): 219 delta_sum += self.ia_freq_deque[i]-self.ia_freq_deque[i-1] 220 delta_sum += time.time() - self.ia_freq_deque[dq_len-1] 221 222 if delta_sum == 0: 223 avg = 0 224 else: 225 avg = 1/(delta_sum/(dq_len)) 226 227 return avg 228 229 def outgoing_announce_frequency(self): 230 if not len(self.oa_freq_deque) > 1: 231 return 0 232 else: 233 dq_len = len(self.oa_freq_deque) 234 delta_sum = 0 235 for i in range(1,dq_len): 236 delta_sum += self.oa_freq_deque[i]-self.oa_freq_deque[i-1] 237 delta_sum += time.time() - self.oa_freq_deque[dq_len-1] 238 239 if delta_sum == 0: 240 avg = 0 241 else: 242 avg = 1/(delta_sum/(dq_len)) 243 244 return avg 245 246 def process_announce_queue(self): 247 if not hasattr(self, "announce_cap"): 248 self.announce_cap = RNS.Reticulum.ANNOUNCE_CAP 249 250 if hasattr(self, "announce_queue"): 251 try: 252 now = time.time() 253 stale = [] 254 for a in self.announce_queue: 255 if now > a["time"]+RNS.Reticulum.QUEUED_ANNOUNCE_LIFE: 256 stale.append(a) 257 258 for s in stale: 259 if s in self.announce_queue: 260 self.announce_queue.remove(s) 261 262 if len(self.announce_queue) > 0: 263 min_hops = min(entry["hops"] for entry in self.announce_queue) 264 entries = list(filter(lambda e: e["hops"] == min_hops, self.announce_queue)) 265 entries.sort(key=lambda e: e["time"]) 266 selected = entries[0] 267 268 now = time.time() 269 tx_time = (len(selected["raw"])*8) / self.bitrate 270 wait_time = (tx_time / self.announce_cap) 271 self.announce_allowed_at = now + wait_time 272 273 self.process_outgoing(selected["raw"]) 274 self.sent_announce() 275 276 if selected in self.announce_queue: 277 self.announce_queue.remove(selected) 278 279 if len(self.announce_queue) > 0: 280 timer = threading.Timer(wait_time, self.process_announce_queue) 281 timer.start() 282 283 except Exception as e: 284 self.announce_queue = [] 285 RNS.log("Error while processing announce queue on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 286 RNS.log("The announce queue for this interface has been cleared.", RNS.LOG_ERROR) 287 288 def final_init(self): 289 pass 290 291 def detach(self): 292 pass 293 294 @staticmethod 295 def get_config_obj(config_in): 296 if type(config_in) == ConfigObj: 297 return config_in 298 else: 299 try: 300 return ConfigObj(config_in) 301 except Exception as e: 302 RNS.log(f"Could not parse supplied configuration data. The contained exception was: {e}", RNS.LOG_ERROR) 303 raise SystemError("Invalid configuration data supplied")