LXMRouter.py
1 import os 2 import sys 3 import time 4 import math 5 import random 6 import base64 7 import atexit 8 import signal 9 import threading 10 11 from collections import deque 12 13 import RNS 14 import RNS.vendor.umsgpack as msgpack 15 16 from .LXMF import APP_NAME 17 from .LXMF import FIELD_TICKET 18 from .LXMF import PN_META_NAME 19 from .LXMF import pn_announce_data_is_valid 20 21 from .LXMPeer import LXMPeer 22 from .LXMessage import LXMessage 23 from .Handlers import LXMFDeliveryAnnounceHandler 24 from .Handlers import LXMFPropagationAnnounceHandler 25 26 import LXMF.LXStamper as LXStamper 27 28 class LXMRouter: 29 MAX_DELIVERY_ATTEMPTS = 5 30 PROCESSING_INTERVAL = 4 31 DELIVERY_RETRY_WAIT = 10 32 PATH_REQUEST_WAIT = 7 33 MAX_PATHLESS_TRIES = 1 34 LINK_MAX_INACTIVITY = 10*60 35 P_LINK_MAX_INACTIVITY = 3*60 36 37 MESSAGE_EXPIRY = 30*24*60*60 38 STAMP_COST_EXPIRY = 45*24*60*60 39 40 NODE_ANNOUNCE_DELAY = 20 41 42 MAX_PEERS = 20 43 AUTOPEER = True 44 AUTOPEER_MAXDEPTH = 4 45 FASTEST_N_RANDOM_POOL = 2 46 ROTATION_HEADROOM_PCT = 10 47 ROTATION_AR_MAX = 0.5 48 49 PEERING_COST = 18 50 MAX_PEERING_COST = 26 51 PROPAGATION_COST_MIN = 13 52 PROPAGATION_COST_FLEX = 3 53 PROPAGATION_COST = 16 54 PROPAGATION_LIMIT = 256 55 SYNC_LIMIT = PROPAGATION_LIMIT*40 56 DELIVERY_LIMIT = 1000 57 58 PR_PATH_TIMEOUT = 10 59 PN_STAMP_THROTTLE = 180 60 61 PR_IDLE = 0x00 62 PR_PATH_REQUESTED = 0x01 63 PR_LINK_ESTABLISHING = 0x02 64 PR_LINK_ESTABLISHED = 0x03 65 PR_REQUEST_SENT = 0x04 66 PR_RECEIVING = 0x05 67 PR_RESPONSE_RECEIVED = 0x06 68 PR_COMPLETE = 0x07 69 PR_NO_PATH = 0xf0 70 PR_LINK_FAILED = 0xf1 71 PR_TRANSFER_FAILED = 0xf2 72 PR_NO_IDENTITY_RCVD = 0xf3 73 PR_NO_ACCESS = 0xf4 74 PR_FAILED = 0xfe 75 76 PR_ALL_MESSAGES = 0x00 77 78 DUPLICATE_SIGNAL = "lxmf_duplicate" 79 80 STATS_GET_PATH = "/pn/get/stats" 81 SYNC_REQUEST_PATH = "/pn/peer/sync" 82 UNPEER_REQUEST_PATH = "/pn/peer/unpeer" 83 84 85 ### Developer-facing API ############################## 86 ####################################################### 87 88 def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None, 89 propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT, 90 enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None, 91 from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT, 92 propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX, 93 peering_cost=PEERING_COST, max_peering_cost=MAX_PEERING_COST, name=None): 94 95 random.seed(os.urandom(10)) 96 97 self.pending_inbound = [] 98 self.pending_outbound = [] 99 self.failed_outbound = [] 100 self.direct_links = {} 101 self.backchannel_links = {} 102 self.delivery_destinations = {} 103 104 self.prioritised_list = [] 105 self.ignored_list = [] 106 self.allowed_list = [] 107 self.control_allowed_list = [] 108 self.auth_required = False 109 self.retain_synced_on_node = False 110 111 self.default_sync_strategy = sync_strategy 112 self.processing_inbound = False 113 self.processing_count = 0 114 self.name = name 115 116 self.propagation_node = False 117 self.propagation_node_start_time = None 118 119 if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path") 120 else: 121 self.storagepath = storagepath+"/lxmf" 122 self.ratchetpath = self.storagepath+"/ratchets" 123 124 self.outbound_propagation_node = None 125 self.outbound_propagation_link = None 126 127 if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT 128 if propagation_cost < LXMRouter.PROPAGATION_COST_MIN: propagation_cost = LXMRouter.PROPAGATION_COST_MIN 129 130 self.message_storage_limit = None 131 self.information_storage_limit = None 132 self.propagation_per_transfer_limit = propagation_limit 133 self.propagation_per_sync_limit = sync_limit 134 self.delivery_per_transfer_limit = delivery_limit 135 self.propagation_stamp_cost = propagation_cost 136 self.propagation_stamp_cost_flexibility = propagation_cost_flexibility 137 self.peering_cost = peering_cost 138 self.max_peering_cost = max_peering_cost 139 self.enforce_ratchets = enforce_ratchets 140 self._enforce_stamps = enforce_stamps 141 self.pending_deferred_stamps = {} 142 self.throttled_peers = {} 143 144 if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit: 145 self.propagation_per_sync_limit = self.propagation_per_transfer_limit 146 147 self.wants_download_on_path_available_from = None 148 self.wants_download_on_path_available_to = None 149 self.propagation_transfer_state = LXMRouter.PR_IDLE 150 self.propagation_transfer_progress = 0.0 151 self.propagation_transfer_last_result = None 152 self.propagation_transfer_last_duplicates = None 153 self.propagation_transfer_max_messages = None 154 self.prioritise_rotating_unreachable_peers = False 155 self.active_propagation_links = [] 156 self.validated_peer_links = {} 157 self.locally_delivered_transient_ids = {} 158 self.locally_processed_transient_ids = {} 159 self.outbound_stamp_costs = {} 160 self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} 161 162 self.outbound_processing_lock = threading.Lock() 163 self.cost_file_lock = threading.Lock() 164 self.ticket_file_lock = threading.Lock() 165 self.stamp_gen_lock = threading.Lock() 166 self.exit_handler_running = False 167 168 if identity == None: 169 identity = RNS.Identity() 170 171 self.identity = identity 172 self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") 173 self.propagation_destination.set_default_app_data(self.get_propagation_node_app_data) 174 self.control_destination = None 175 self.client_propagation_messages_received = 0 176 self.client_propagation_messages_served = 0 177 self.unpeered_propagation_incoming = 0 178 self.unpeered_propagation_rx_bytes = 0 179 180 if autopeer != None: self.autopeer = autopeer 181 else: self.autopeer = LXMRouter.AUTOPEER 182 183 if autopeer_maxdepth != None: self.autopeer_maxdepth = autopeer_maxdepth 184 else: self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH 185 186 if max_peers == None: self.max_peers = LXMRouter.MAX_PEERS 187 else: 188 if type(max_peers) == int and max_peers >= 0: self.max_peers = max_peers 189 else: raise ValueError(f"Invalid value for max_peers: {max_peers}") 190 191 self.from_static_only = from_static_only 192 if type(static_peers) != list: raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}") 193 else: 194 for static_peer in static_peers: 195 if type(static_peer) != bytes: raise ValueError(f"Invalid static peer destination hash: {static_peer}") 196 else: 197 if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Invalid static peer destination hash: {static_peer}") 198 199 self.static_peers = static_peers 200 201 self.peers = {} 202 self.propagation_entries = {} 203 204 self.peer_distribution_queue = deque() 205 206 RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self)) 207 RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self)) 208 209 self.__delivery_callback = None 210 211 try: 212 if os.path.isfile(self.storagepath+"/local_deliveries"): 213 locally_delivered_file = open(self.storagepath+"/local_deliveries", "rb") 214 data = locally_delivered_file.read() 215 locally_delivered_file.close() 216 self.locally_delivered_transient_ids = msgpack.unpackb(data) 217 if not type(self.locally_delivered_transient_ids) == dict: 218 RNS.log("Invalid data format for loaded locally delivered transient IDs, recreating...", RNS.LOG_ERROR) 219 self.locally_delivered_transient_ids = {} 220 221 except Exception as e: 222 RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR) 223 self.locally_delivered_transient_ids = {} 224 225 try: 226 if os.path.isfile(self.storagepath+"/locally_processed"): 227 locally_processed_file = open(self.storagepath+"/locally_processed", "rb") 228 data = locally_processed_file.read() 229 locally_processed_file.close() 230 self.locally_processed_transient_ids = msgpack.unpackb(data) 231 if not type(self.locally_processed_transient_ids) == dict: 232 RNS.log("Invalid data format for loaded locally processed transient IDs, recreating...", RNS.LOG_ERROR) 233 self.locally_processed_transient_ids = {} 234 235 236 except Exception as e: 237 RNS.log("Could not load locally processed message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 238 self.locally_processed_transient_ids = {} 239 240 try: 241 self.clean_transient_id_caches() 242 243 except Exception as e: 244 RNS.log("Could not clean transient ID caches. The contained exception was : "+str(e), RNS.LOG_ERROR) 245 self.locally_delivered_transient_ids = {} 246 self.locally_processed_transient_ids = {} 247 248 try: 249 if os.path.isfile(self.storagepath+"/outbound_stamp_costs"): 250 with self.cost_file_lock: 251 with open(self.storagepath+"/outbound_stamp_costs", "rb") as outbound_stamp_cost_file: 252 data = outbound_stamp_cost_file.read() 253 self.outbound_stamp_costs = msgpack.unpackb(data) 254 if not type(self.outbound_stamp_costs) == dict: 255 RNS.log("Invalid data format for loaded outbound stamp costs, recreating...", RNS.LOG_ERROR) 256 self.outbound_stamp_costs = {} 257 258 self.clean_outbound_stamp_costs() 259 self.save_outbound_stamp_costs() 260 261 except Exception as e: 262 RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 263 264 try: 265 if os.path.isfile(self.storagepath+"/available_tickets"): 266 with self.ticket_file_lock: 267 with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file: 268 data = available_tickets_file.read() 269 self.available_tickets = msgpack.unpackb(data) 270 if not type(self.available_tickets) == dict: 271 RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR) 272 self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} 273 if not "outbound" in self.available_tickets: 274 RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR) 275 self.available_tickets["outbound"] = {} 276 if not "inbound" in self.available_tickets: 277 RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR) 278 self.available_tickets["inbound"] = {} 279 if not "last_deliveries" in self.available_tickets: 280 RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR) 281 self.available_tickets["last_deliveries"] = {} 282 283 self.clean_available_tickets() 284 self.save_available_tickets() 285 286 except Exception as e: 287 RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 288 289 atexit.register(self.exit_handler) 290 signal.signal(signal.SIGINT, self.sigint_handler) 291 signal.signal(signal.SIGTERM, self.sigterm_handler) 292 293 job_thread = threading.Thread(target=self.jobloop) 294 job_thread.setDaemon(True) 295 job_thread.start() 296 297 def announce(self, destination_hash, attached_interface=None): 298 if destination_hash in self.delivery_destinations: 299 self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface) 300 301 def get_propagation_node_announce_metadata(self): 302 metadata = {} 303 if self.name: metadata[PN_META_NAME] = str(self.name).encode("utf-8") 304 return metadata 305 306 def get_propagation_node_app_data(self): 307 metadata = self.get_propagation_node_announce_metadata() 308 node_state = self.propagation_node and not self.from_static_only 309 stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost] 310 announce_data = [ False, # 0: Legacy LXMF PN support 311 int(time.time()), # 1: Current node timebase 312 node_state, # 2: Boolean flag signalling propagation node state 313 self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes 314 self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs 315 stamp_cost, # 5: Propagation stamp cost for this node 316 metadata ] # 6: Node metadata 317 318 return msgpack.packb(announce_data) 319 320 def announce_propagation_node(self): 321 def delayed_announce(): 322 time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) 323 self.propagation_destination.announce(app_data=self.get_propagation_node_app_data()) 324 if len(self.control_allowed_list) > 1: self.control_destination.announce() 325 326 da_thread = threading.Thread(target=delayed_announce) 327 da_thread.setDaemon(True) 328 da_thread.start() 329 330 def register_delivery_identity(self, identity, display_name = None, stamp_cost = None): 331 if len(self.delivery_destinations) != 0: 332 RNS.log("Currently only one delivery identity is supported per LXMF router instance", RNS.LOG_ERROR) 333 return None 334 335 if not os.path.isdir(self.ratchetpath): 336 os.makedirs(self.ratchetpath) 337 338 delivery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "delivery") 339 delivery_destination.enable_ratchets(f"{self.ratchetpath}/{RNS.hexrep(delivery_destination.hash, delimit=False)}.ratchets") 340 delivery_destination.set_packet_callback(self.delivery_packet) 341 delivery_destination.set_link_established_callback(self.delivery_link_established) 342 delivery_destination.display_name = display_name 343 344 if self.enforce_ratchets: 345 delivery_destination.enforce_ratchets() 346 347 if display_name != None: 348 def get_app_data(): 349 return self.get_announce_app_data(delivery_destination.hash) 350 delivery_destination.set_default_app_data(get_app_data) 351 352 self.delivery_destinations[delivery_destination.hash] = delivery_destination 353 self.set_inbound_stamp_cost(delivery_destination.hash, stamp_cost) 354 355 return delivery_destination 356 357 def register_delivery_callback(self, callback): 358 self.__delivery_callback = callback 359 360 def set_inbound_stamp_cost(self, destination_hash, stamp_cost): 361 if destination_hash in self.delivery_destinations: 362 delivery_destination = self.delivery_destinations[destination_hash] 363 if stamp_cost == None: 364 delivery_destination.stamp_cost = None 365 return True 366 elif type(stamp_cost) == int: 367 if stamp_cost < 1: 368 delivery_destination.stamp_cost = None 369 elif stamp_cost < 255: 370 delivery_destination.stamp_cost = stamp_cost 371 else: 372 return False 373 374 return True 375 376 return False 377 378 def get_outbound_stamp_cost(self, destination_hash): 379 if destination_hash in self.outbound_stamp_costs: 380 stamp_cost = self.outbound_stamp_costs[destination_hash][1] 381 return stamp_cost 382 else: 383 return None 384 385 def set_active_propagation_node(self, destination_hash): 386 self.set_outbound_propagation_node(destination_hash) 387 # self.set_inbound_propagation_node(destination_hash) 388 389 def set_outbound_propagation_node(self, destination_hash): 390 if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8 or type(destination_hash) != bytes: 391 raise ValueError("Invalid destination hash for outbound propagation node") 392 else: 393 if self.outbound_propagation_node != destination_hash: 394 self.outbound_propagation_node = destination_hash 395 if self.outbound_propagation_link != None: 396 if self.outbound_propagation_link.destination.hash != destination_hash: 397 self.outbound_propagation_link.teardown() 398 self.outbound_propagation_link = None 399 400 def get_outbound_propagation_node(self): 401 return self.outbound_propagation_node 402 403 def get_outbound_propagation_cost(self): 404 target_propagation_cost = None 405 pn_destination_hash = self.get_outbound_propagation_node() 406 pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash) 407 if pn_announce_data_is_valid(pn_app_data): 408 pn_config = msgpack.unpackb(pn_app_data) 409 target_propagation_cost = pn_config[5][0] 410 411 if not target_propagation_cost: 412 RNS.log(f"Could not retrieve cached propagation node config. Requesting path to propagation node to get target propagation cost...", RNS.LOG_DEBUG) 413 RNS.Transport.request_path(pn_destination_hash) 414 timeout = time.time() + LXMRouter.PATH_REQUEST_WAIT 415 while not RNS.Identity.recall_app_data(pn_destination_hash) and time.time() < timeout: 416 time.sleep(0.5) 417 418 pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash) 419 if pn_announce_data_is_valid(pn_app_data): 420 pn_config = msgpack.unpackb(pn_app_data) 421 target_propagation_cost = pn_config[5][0] 422 423 if not target_propagation_cost: RNS.log("Propagation node stamp cost still unavailable after path request", RNS.LOG_ERROR) 424 return target_propagation_cost 425 426 def set_inbound_propagation_node(self, destination_hash): 427 # TODO: Implement 428 raise NotImplementedError("Inbound/outbound propagation node differentiation is currently not implemented") 429 430 def get_inbound_propagation_node(self): 431 return self.get_outbound_propagation_node() 432 433 def set_retain_node_lxms(self, retain): 434 if retain == True: 435 self.retain_synced_on_node = True 436 else: 437 self.retain_synced_on_node = False 438 439 def set_authentication(self, required=None): 440 if required != None: 441 self.auth_required = required 442 443 def requires_authentication(self): 444 return self.auth_required 445 446 def allow(self, identity_hash=None): 447 if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8: 448 if not identity_hash in self.allowed_list: 449 self.allowed_list.append(identity_hash) 450 else: 451 raise ValueError("Allowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes") 452 453 def disallow(self, identity_hash=None): 454 if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8: 455 if identity_hash in self.allowed_list: 456 self.allowed_list.pop(identity_hash) 457 else: 458 raise ValueError("Disallowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes") 459 460 def allow_control(self, identity_hash=None): 461 if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8: 462 if not identity_hash in self.control_allowed_list: self.control_allowed_list.append(identity_hash) 463 else: raise ValueError("Allowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes") 464 465 def disallow_control(self, identity_hash=None): 466 if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8: 467 if identity_hash in self.control_allowed_list: self.control_allowed_list.pop(identity_hash) 468 else: raise ValueError("Disallowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes") 469 470 def prioritise(self, destination_hash=None): 471 if isinstance(destination_hash, bytes) and len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: 472 if not destination_hash in self.prioritised_list: 473 self.prioritised_list.append(destination_hash) 474 else: 475 raise ValueError("Prioritised destination hash must be "+str(RNS.Reticulum.TRUNCATED_HASHLENGTH//8)+" bytes") 476 477 def unprioritise(self, identity_hash=None): 478 if isinstance(destination_hash, bytes) and len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: 479 if destination_hash in self.prioritised_list: 480 self.prioritised_list.pop(destination_hash) 481 else: 482 raise ValueError("Prioritised destination hash must be "+str(RNS.Reticulum.TRUNCATED_HASHLENGTH//8)+" bytes") 483 484 def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES): 485 if max_messages == None: 486 max_messages = LXMRouter.PR_ALL_MESSAGES 487 488 self.propagation_transfer_progress = 0.0 489 self.propagation_transfer_max_messages = max_messages 490 if self.outbound_propagation_node != None: 491 if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE: 492 self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED 493 RNS.log("Requesting message list from propagation node", RNS.LOG_DEBUG) 494 self.outbound_propagation_link.identify(identity) 495 self.outbound_propagation_link.request( 496 LXMPeer.MESSAGE_GET_PATH, 497 [None, None], # Set both want and have fields to None to get message list 498 response_callback=self.message_list_response, 499 failed_callback=self.message_get_failed 500 ) 501 self.propagation_transfer_state = LXMRouter.PR_REQUEST_SENT 502 else: 503 if self.outbound_propagation_link == None: 504 if RNS.Transport.has_path(self.outbound_propagation_node): 505 self.wants_download_on_path_available_from = None 506 self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHING 507 RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for message download", RNS.LOG_DEBUG) 508 propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) 509 propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") 510 def msg_request_established_callback(link): 511 self.request_messages_from_propagation_node(identity, self.propagation_transfer_max_messages) 512 513 self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=msg_request_established_callback) 514 else: 515 RNS.log("No path known for message download from propagation node "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) 516 RNS.Transport.request_path(self.outbound_propagation_node) 517 self.wants_download_on_path_available_from = self.outbound_propagation_node 518 self.wants_download_on_path_available_to = identity 519 self.wants_download_on_path_available_timeout = time.time() + LXMRouter.PR_PATH_TIMEOUT 520 self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED 521 self.request_messages_path_job() 522 else: 523 RNS.log("Waiting for propagation node link to become active", RNS.LOG_EXTREME) 524 else: 525 RNS.log("Cannot request LXMF propagation node sync, no default propagation node configured", RNS.LOG_WARNING) 526 527 def cancel_propagation_node_requests(self): 528 if self.outbound_propagation_link != None: 529 self.outbound_propagation_link.teardown() 530 self.outbound_propagation_link = None 531 532 self.acknowledge_sync_completion(reset_state=True) 533 534 def enable_propagation(self): 535 try: 536 self.messagepath = self.storagepath+"/messagestore" 537 538 if not os.path.isdir(self.storagepath): 539 os.makedirs(self.storagepath) 540 541 if not os.path.isdir(self.messagepath): 542 os.makedirs(self.messagepath) 543 544 self.propagation_entries = {} 545 546 st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE) 547 for filename in os.listdir(self.messagepath): 548 components = filename.split("_") 549 if len(components) >= 3: 550 if float(components[1]) > 0: 551 if len(components[0]) == RNS.Identity.HASHLENGTH//8*2: 552 try: 553 transient_id = bytes.fromhex(components[0]) 554 received = float(components[1]) 555 stamp_value = int(components[2]) 556 filepath = self.messagepath+"/"+filename 557 msg_size = os.path.getsize(filepath) 558 file = open(filepath, "rb") 559 destination_hash = file.read(LXMessage.DESTINATION_LENGTH) 560 file.close() 561 562 self.propagation_entries[transient_id] = [ 563 destination_hash, # 0: Destination hash 564 filepath, # 1: Storage location 565 received, # 2: Receive timestamp 566 msg_size, # 3: Message size 567 [], # 4: Handled peers 568 [], # 5: Unhandled peers 569 stamp_value, # 6: Stamp value 570 ] 571 572 except Exception as e: 573 RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) 574 575 et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st)) 576 RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE) 577 RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) 578 st = time.time() 579 580 peers_storage_path = self.storagepath+"/peers" 581 if os.path.isfile(peers_storage_path): 582 peers_file = open(peers_storage_path, "rb") 583 peers_data = peers_file.read() 584 peers_file.close() 585 586 if len(peers_data) > 0: 587 try: serialised_peers = msgpack.unpackb(peers_data) 588 except Exception as e: 589 RNS.log(f"Could not load propagation node peering data from storage. The contained exception was: {e}", RNS.LOG_ERROR) 590 RNS.log(f"The peering data file located at {peers_storage_path} is likely corrupt.", RNS.LOG_ERROR) 591 RNS.log(f"You can delete this file and attempt starting the propagation node again, but peer synchronization states will be lost.", RNS.LOG_ERROR) 592 raise e 593 594 del peers_data 595 596 while len(serialised_peers) > 0: 597 serialised_peer = serialised_peers.pop() 598 peer = LXMPeer.from_bytes(serialised_peer, self) 599 del serialised_peer 600 if peer.destination_hash in self.static_peers and peer.last_heard == 0: 601 RNS.Transport.request_path(peer.destination_hash) 602 if peer.identity != None: 603 self.peers[peer.destination_hash] = peer 604 lim_str = ", no transfer limit" 605 if peer.propagation_transfer_limit != None: 606 lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit" 607 RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG) 608 else: 609 RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) 610 del peer 611 612 del serialised_peers 613 614 if len(self.static_peers) > 0: 615 for static_peer in self.static_peers: 616 if not static_peer in self.peers: 617 RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE) 618 self.peers[static_peer] = LXMPeer(self, static_peer, sync_strategy=self.default_sync_strategy) 619 if self.peers[static_peer].last_heard == 0: 620 # TODO: Allow path request responses through announce handler 621 # momentarily here, so peering config can be updated even if 622 # the static peer is not available to directly send an announce. 623 RNS.Transport.request_path(static_peer) 624 625 RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE) 626 627 try: 628 if os.path.isfile(self.storagepath+"/node_stats"): 629 node_stats_file = open(self.storagepath+"/node_stats", "rb") 630 data = node_stats_file.read() 631 node_stats_file.close() 632 node_stats = msgpack.unpackb(data) 633 634 if not type(node_stats) == dict: 635 RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR) 636 else: 637 self.client_propagation_messages_received = node_stats["client_propagation_messages_received"] 638 self.client_propagation_messages_served = node_stats["client_propagation_messages_served"] 639 self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"] 640 self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"] 641 642 except Exception as e: 643 RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR) 644 645 self.propagation_node = True 646 self.propagation_node_start_time = time.time() 647 self.propagation_destination.set_link_established_callback(self.propagation_link_established) 648 self.propagation_destination.set_packet_callback(self.propagation_packet) 649 650 self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) 651 self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) 652 653 self.control_allowed_list = [self.identity.hash] 654 self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") 655 self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list) 656 self.control_destination.register_request_handler(LXMRouter.SYNC_REQUEST_PATH, self.peer_sync_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list) 657 self.control_destination.register_request_handler(LXMRouter.UNPEER_REQUEST_PATH, self.peer_unpeer_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list) 658 659 if self.message_storage_limit != None: 660 limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit) 661 else: 662 limit_str = "" 663 664 RNS.log("LXMF Propagation Node message store size is "+RNS.prettysize(self.message_storage_size())+limit_str, RNS.LOG_DEBUG) 665 666 self.announce_propagation_node() 667 668 except Exception as e: 669 RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR) 670 raise e 671 RNS.panic() 672 673 def disable_propagation(self): 674 self.propagation_node = False 675 self.announce_propagation_node() 676 677 def enforce_stamps(self): 678 self._enforce_stamps = True 679 680 def ignore_stamps(self): 681 self._enforce_stamps = False 682 683 def ignore_destination(self, destination_hash): 684 if not destination_hash in self.ignored_list: 685 self.ignored_list.append(destination_hash) 686 687 def unignore_destination(self, destination_hash): 688 if destination_hash in self.ignored_list: 689 self.ignored_list.remove(destination_hash) 690 691 def set_message_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None): 692 limit_bytes = 0 693 694 if kilobytes != None: 695 limit_bytes += kilobytes*1000 696 697 if megabytes != None: 698 limit_bytes += megabytes*1000*1000 699 700 if gigabytes != None: 701 limit_bytes += gigabytes*1000*1000*1000 702 703 if limit_bytes == 0: 704 limit_bytes = None 705 706 try: 707 if limit_bytes == None or int(limit_bytes) > 0: 708 self.message_storage_limit = int(limit_bytes) 709 else: 710 raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) 711 712 except Exception as e: 713 raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) 714 715 def message_storage_limit(self): 716 return self.message_storage_limit 717 718 def message_storage_size(self): 719 if self.propagation_node: 720 return sum(self.propagation_entries[f][3] for f in self.propagation_entries) 721 else: 722 return None 723 724 def set_information_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None): 725 limit_bytes = 0 726 if kilobytes != None: limit_bytes += kilobytes*1000 727 if megabytes != None: limit_bytes += megabytes*1000*1000 728 if gigabytes != None: limit_bytes += gigabytes*1000*1000*1000 729 if limit_bytes == 0: limit_bytes = None 730 731 try: 732 if limit_bytes == None or int(limit_bytes) > 0: self.information_storage_limit = int(limit_bytes) 733 else: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) 734 except Exception as e: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) 735 736 def information_storage_limit(self): 737 return self.information_storage_limit 738 739 def information_storage_size(self): 740 pass 741 742 def delivery_link_available(self, destination_hash): 743 if destination_hash in self.direct_links or destination_hash in self.backchannel_links: return True 744 else: return False 745 746 747 ### Propagation Node Control ########################## 748 ####################################################### 749 750 def compile_stats(self): 751 if not self.propagation_node: return None 752 else: 753 peer_stats = {} 754 for peer_id in self.peers.copy(): 755 peer = self.peers[peer_id] 756 peer_stats[peer_id] = { 757 "type": "static" if peer_id in self.static_peers else "discovered", 758 "state": peer.state, 759 "alive": peer.alive, 760 "name": peer.name, 761 "last_heard": int(peer.last_heard), 762 "next_sync_attempt": peer.next_sync_attempt, 763 "last_sync_attempt": peer.last_sync_attempt, 764 "sync_backoff": peer.sync_backoff, 765 "peering_timebase": peer.peering_timebase, 766 "ler": int(peer.link_establishment_rate), 767 "str": int(peer.sync_transfer_rate), 768 "transfer_limit": peer.propagation_transfer_limit, 769 "sync_limit": peer.propagation_sync_limit, 770 "target_stamp_cost": peer.propagation_stamp_cost, 771 "stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility, 772 "peering_cost": peer.peering_cost, 773 "peering_key": peer.peering_key_value(), 774 "network_distance": RNS.Transport.hops_to(peer_id), 775 "rx_bytes": peer.rx_bytes, 776 "tx_bytes": peer.tx_bytes, 777 "acceptance_rate": peer.acceptance_rate, 778 "messages": { 779 "offered": peer.offered, 780 "outgoing": peer.outgoing, 781 "incoming": peer.incoming, 782 "unhandled": peer.unhandled_message_count 783 }, 784 } 785 786 node_stats = { 787 "identity_hash": self.identity.hash, 788 "destination_hash": self.propagation_destination.hash, 789 "uptime": time.time()-self.propagation_node_start_time, 790 "delivery_limit": self.delivery_per_transfer_limit, 791 "propagation_limit": self.propagation_per_transfer_limit, 792 "sync_limit": self.propagation_per_sync_limit, 793 "target_stamp_cost": self.propagation_stamp_cost, 794 "stamp_cost_flexibility": self.propagation_stamp_cost_flexibility, 795 "peering_cost": self.peering_cost, 796 "max_peering_cost": self.max_peering_cost, 797 "autopeer_maxdepth": self.autopeer_maxdepth, 798 "from_static_only": self.from_static_only, 799 "messagestore": { 800 "count": len(self.propagation_entries), 801 "bytes": self.message_storage_size(), 802 "limit": self.message_storage_limit, 803 }, 804 "clients" : { 805 "client_propagation_messages_received": self.client_propagation_messages_received, 806 "client_propagation_messages_served": self.client_propagation_messages_served, 807 }, 808 "unpeered_propagation_incoming": self.unpeered_propagation_incoming, 809 "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, 810 "static_peers": len(self.static_peers), 811 "discovered_peers": len(self.peers)-len(self.static_peers), 812 "total_peers": len(self.peers), 813 "max_peers": self.max_peers, 814 "peers": peer_stats, 815 } 816 817 return node_stats 818 819 def stats_get_request(self, path, data, request_id, remote_identity, requested_at): 820 if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY 821 elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS 822 else: return self.compile_stats() 823 824 def peer_sync_request(self, path, data, request_id, remote_identity, requested_at): 825 if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY 826 elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS 827 else: 828 if type(data) != bytes: return LXMPeer.ERROR_INVALID_DATA 829 elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA 830 else: 831 if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND 832 else: 833 self.peers[data].sync() 834 return True 835 836 def peer_unpeer_request(self, path, data, request_id, remote_identity, requested_at): 837 if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY 838 elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS 839 else: 840 if type(data) != bytes: return LXMPeer.ERROR_INVALID_DATA 841 elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA 842 else: 843 if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND 844 else: 845 self.unpeer(data) 846 return True 847 848 849 ### Utility & Maintenance ############################# 850 ####################################################### 851 852 JOB_OUTBOUND_INTERVAL = 1 853 JOB_STAMPS_INTERVAL = 1 854 JOB_LINKS_INTERVAL = 1 855 JOB_TRANSIENT_INTERVAL = 60 856 JOB_STORE_INTERVAL = 120 857 JOB_PEERSYNC_INTERVAL = 6 858 JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL 859 JOB_ROTATE_INTERVAL = 56*JOB_PEERINGEST_INTERVAL 860 def jobs(self): 861 if not self.exit_handler_running: 862 self.processing_count += 1 863 864 if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: 865 self.process_outbound() 866 867 if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0: 868 threading.Thread(target=self.process_deferred_stamps, daemon=True).start() 869 870 if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: 871 self.clean_links() 872 873 if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: 874 self.clean_transient_id_caches() 875 876 if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: 877 if self.propagation_node == True: self.clean_message_store() 878 879 if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0: 880 if self.propagation_node == True: self.flush_queues() 881 882 if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0: 883 if self.propagation_node == True: self.rotate_peers() 884 885 if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: 886 if self.propagation_node == True: self.sync_peers() 887 self.clean_throttled_peers() 888 889 def jobloop(self): 890 while (True): 891 # TODO: Improve this to scheduling, so manual 892 # triggers can delay next run 893 try: 894 self.jobs() 895 except Exception as e: 896 RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR) 897 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 898 RNS.trace_exception(e) 899 time.sleep(LXMRouter.PROCESSING_INTERVAL) 900 901 def flush_queues(self): 902 if len(self.peers) > 0: 903 self.flush_peer_distribution_queue() 904 RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time() 905 for peer_id in self.peers.copy(): 906 if peer_id in self.peers: 907 peer = self.peers[peer_id] 908 if peer.queued_items(): 909 peer.process_queues() 910 911 RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG) 912 913 def clean_links(self): 914 closed_links = [] 915 for link_hash in self.direct_links: 916 link = self.direct_links[link_hash] 917 inactive_time = link.no_data_for() 918 919 if inactive_time > LXMRouter.LINK_MAX_INACTIVITY: 920 link.teardown() 921 closed_links.append(link_hash) 922 if link.link_id in self.validated_peer_links: 923 self.validated_peer_links.pop(link.link_id) 924 925 for link_hash in closed_links: 926 cleaned_link = self.direct_links.pop(link_hash) 927 RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG) 928 929 try: 930 inactive_links = [] 931 for link in self.active_propagation_links: 932 if link.no_data_for() > LXMRouter.P_LINK_MAX_INACTIVITY: 933 inactive_links.append(link) 934 935 for link in inactive_links: 936 self.active_propagation_links.remove(link) 937 link.teardown() 938 939 except Exception as e: 940 RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR) 941 942 if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED: 943 self.outbound_propagation_link = None 944 if self.propagation_transfer_state == LXMRouter.PR_COMPLETE: 945 self.acknowledge_sync_completion() 946 elif self.propagation_transfer_state < LXMRouter.PR_LINK_ESTABLISHED: 947 self.acknowledge_sync_completion(failure_state=LXMRouter.PR_LINK_FAILED) 948 elif self.propagation_transfer_state >= LXMRouter.PR_LINK_ESTABLISHED and self.propagation_transfer_state < LXMRouter.PR_COMPLETE: 949 self.acknowledge_sync_completion(failure_state=LXMRouter.PR_TRANSFER_FAILED) 950 else: 951 RNS.log(f"Unknown propagation transfer state on link cleaning: {self.propagation_transfer_state}", RNS.LOG_DEBUG) 952 self.acknowledge_sync_completion() 953 954 RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG) 955 956 def clean_transient_id_caches(self): 957 now = time.time() 958 removed_entries = [] 959 for transient_id in self.locally_delivered_transient_ids: 960 timestamp = self.locally_delivered_transient_ids[transient_id] 961 if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0: 962 removed_entries.append(transient_id) 963 964 for transient_id in removed_entries: 965 self.locally_delivered_transient_ids.pop(transient_id) 966 RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG) 967 968 removed_entries = [] 969 for transient_id in self.locally_processed_transient_ids: 970 timestamp = self.locally_processed_transient_ids[transient_id] 971 if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0: 972 removed_entries.append(transient_id) 973 974 for transient_id in removed_entries: 975 self.locally_processed_transient_ids.pop(transient_id) 976 RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG) 977 978 def update_stamp_cost(self, destination_hash, stamp_cost): 979 RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG) 980 self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost] 981 982 def job(): self.save_outbound_stamp_costs() 983 threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start() 984 985 def get_announce_app_data(self, destination_hash): 986 if destination_hash in self.delivery_destinations: 987 delivery_destination = self.delivery_destinations[destination_hash] 988 989 display_name = None 990 if delivery_destination.display_name != None: 991 display_name = delivery_destination.display_name.encode("utf-8") 992 993 stamp_cost = None 994 if delivery_destination.stamp_cost != None and type(delivery_destination.stamp_cost) == int: 995 if delivery_destination.stamp_cost > 0 and delivery_destination.stamp_cost < 255: 996 stamp_cost = delivery_destination.stamp_cost 997 998 peer_data = [display_name, stamp_cost] 999 1000 return msgpack.packb(peer_data) 1001 1002 def get_size(self, transient_id): 1003 lxm_size = self.propagation_entries[transient_id][3] 1004 return lxm_size 1005 1006 def get_weight(self, transient_id): 1007 dst_hash = self.propagation_entries[transient_id][0] 1008 lxm_rcvd = self.propagation_entries[transient_id][2] 1009 lxm_size = self.propagation_entries[transient_id][3] 1010 1011 now = time.time() 1012 age_weight = max(1, (now - lxm_rcvd)/60/60/24/4) 1013 1014 if dst_hash in self.prioritised_list: priority_weight = 0.1 1015 else: priority_weight = 1.0 1016 1017 return priority_weight * age_weight * lxm_size 1018 1019 def get_stamp_value(self, transient_id): 1020 if not transient_id in self.propagation_entries: return None 1021 else: return self.propagation_entries[transient_id][6] 1022 1023 def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY): 1024 now = time.time() 1025 ticket = None 1026 if destination_hash in self.available_tickets["last_deliveries"]: 1027 last_delivery = self.available_tickets["last_deliveries"][destination_hash] 1028 elapsed = now - last_delivery 1029 if elapsed < LXMessage.TICKET_INTERVAL: 1030 RNS.log(f"A ticket for {RNS.prettyhexrep(destination_hash)} was already delivered {RNS.prettytime(elapsed)} ago, not including another ticket yet", RNS.LOG_DEBUG) 1031 return None 1032 1033 if destination_hash in self.available_tickets["inbound"]: 1034 for ticket in self.available_tickets["inbound"][destination_hash]: 1035 ticket_entry = self.available_tickets["inbound"][destination_hash][ticket] 1036 expires = ticket_entry[0]; validity_left = expires - now 1037 if validity_left > LXMessage.TICKET_RENEW: 1038 RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG) 1039 return [expires, ticket] 1040 1041 else: 1042 self.available_tickets["inbound"][destination_hash] = {} 1043 1044 RNS.log(f"No generated tickets for {RNS.prettyhexrep(destination_hash)} with enough validity found, generating a new one", RNS.LOG_DEBUG) 1045 expires = now+expiry 1046 ticket = os.urandom(LXMessage.TICKET_LENGTH) 1047 self.available_tickets["inbound"][destination_hash][ticket] = [expires] 1048 self.save_available_tickets() 1049 1050 return [expires, ticket] 1051 1052 def remember_ticket(self, destination_hash, ticket_entry): 1053 expires = ticket_entry[0]-time.time() 1054 RNS.log(f"Remembering ticket for {RNS.prettyhexrep(destination_hash)}, expires in {RNS.prettytime(expires)}", RNS.LOG_DEBUG) 1055 self.available_tickets["outbound"][destination_hash] = [ticket_entry[0], ticket_entry[1]] 1056 1057 def get_outbound_ticket(self, destination_hash): 1058 if destination_hash in self.available_tickets["outbound"]: 1059 entry = self.available_tickets["outbound"][destination_hash] 1060 if entry[0] > time.time(): 1061 return entry[1] 1062 1063 return None 1064 1065 def get_outbound_ticket_expiry(self, destination_hash): 1066 if destination_hash in self.available_tickets["outbound"]: 1067 entry = self.available_tickets["outbound"][destination_hash] 1068 if entry[0] > time.time(): 1069 return entry[0] 1070 1071 return None 1072 1073 def get_inbound_tickets(self, destination_hash): 1074 now = time.time() 1075 available_tickets = [] 1076 if destination_hash in self.available_tickets["inbound"]: 1077 for inbound_ticket in self.available_tickets["inbound"][destination_hash]: 1078 if now < self.available_tickets["inbound"][destination_hash][inbound_ticket][0]: 1079 available_tickets.append(inbound_ticket) 1080 1081 if len(available_tickets) == 0: 1082 return None 1083 else: 1084 return available_tickets 1085 1086 def clean_throttled_peers(self): 1087 expired_entries = [] 1088 now = time.time() 1089 for peer_hash in self.throttled_peers: 1090 if now > self.throttled_peers[peer_hash]: expired_entries.append(peer_hash) 1091 1092 for peer_hash in expired_entries: self.throttled_peers.pop(peer_hash) 1093 1094 def clean_message_store(self): 1095 RNS.log("Cleaning message store", RNS.LOG_VERBOSE) 1096 # Check and remove expired messages 1097 now = time.time() 1098 removed_entries = {} 1099 for transient_id in self.propagation_entries.copy(): 1100 entry = self.propagation_entries[transient_id] 1101 filepath = entry[1] 1102 stamp_value = entry[6] 1103 filename = os.path.split(filepath)[-1] 1104 components = filename.split("_") 1105 1106 if len(components) == 3 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2 and int(components[2]) == stamp_value: 1107 timestamp = float(components[1]) 1108 if now > timestamp+LXMRouter.MESSAGE_EXPIRY: 1109 RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME) 1110 removed_entries[transient_id] = filepath 1111 else: 1112 RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) 1113 removed_entries[transient_id] = filepath 1114 1115 removed_count = 0 1116 for transient_id in removed_entries: 1117 try: 1118 filepath = removed_entries[transient_id] 1119 self.propagation_entries.pop(transient_id) 1120 if os.path.isfile(filepath): 1121 os.unlink(filepath) 1122 removed_count += 1 1123 except Exception as e: 1124 RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) 1125 1126 if removed_count > 0: 1127 RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE) 1128 1129 # Check size of message store and cull if needed 1130 try: 1131 message_storage_size = self.message_storage_size() 1132 if message_storage_size != None: 1133 if self.message_storage_limit != None and message_storage_size > self.message_storage_limit: 1134 # Clean the message storage according to priorities 1135 bytes_needed = message_storage_size - self.message_storage_limit 1136 bytes_cleaned = 0 1137 1138 weighted_entries = [] 1139 for transient_id in self.propagation_entries.copy(): 1140 weighted_entries.append([ 1141 self.propagation_entries[transient_id], 1142 self.get_weight(transient_id), 1143 transient_id 1144 ]) 1145 1146 weighted_entries.sort(key=lambda we: we[1], reverse=True) 1147 1148 i = 0 1149 while i < len(weighted_entries) and bytes_cleaned < bytes_needed: 1150 try: 1151 w = weighted_entries[i] 1152 entry = w[0] 1153 transient_id = w[2] 1154 filepath = entry[1] 1155 1156 if os.path.isfile(filepath): 1157 os.unlink(filepath) 1158 1159 self.propagation_entries.pop(transient_id) 1160 bytes_cleaned += entry[3] 1161 1162 RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME) 1163 1164 except Exception as e: 1165 RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) 1166 1167 finally: 1168 i += 1 1169 1170 RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size())+" for "+str(len(self.propagation_entries))+" items", RNS.LOG_EXTREME) 1171 1172 1173 except Exception as e: 1174 RNS.log("Could not clean the LXMF message store. The contained exception was: "+str(e), RNS.LOG_ERROR) 1175 1176 def save_locally_delivered_transient_ids(self): 1177 try: 1178 if len(self.locally_delivered_transient_ids) > 0: 1179 if not os.path.isdir(self.storagepath): 1180 os.makedirs(self.storagepath) 1181 1182 with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file: 1183 locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) 1184 1185 except Exception as e: 1186 RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 1187 1188 def save_locally_processed_transient_ids(self): 1189 try: 1190 if len(self.locally_processed_transient_ids) > 0: 1191 if not os.path.isdir(self.storagepath): 1192 os.makedirs(self.storagepath) 1193 1194 with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file: 1195 locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids)) 1196 1197 except Exception as e: 1198 RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 1199 1200 def save_node_stats(self): 1201 try: 1202 if not os.path.isdir(self.storagepath): 1203 os.makedirs(self.storagepath) 1204 1205 with open(self.storagepath+"/node_stats", "wb") as stats_file: 1206 node_stats = { 1207 "client_propagation_messages_received": self.client_propagation_messages_received, 1208 "client_propagation_messages_served": self.client_propagation_messages_served, 1209 "unpeered_propagation_incoming": self.unpeered_propagation_incoming, 1210 "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes, 1211 } 1212 stats_file.write(msgpack.packb(node_stats)) 1213 1214 except Exception as e: 1215 RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 1216 1217 1218 def clean_outbound_stamp_costs(self): 1219 try: 1220 expired = [] 1221 for destination_hash in self.outbound_stamp_costs: 1222 entry = self.outbound_stamp_costs[destination_hash] 1223 if time.time() > entry[0] + LXMRouter.STAMP_COST_EXPIRY: 1224 expired.append(destination_hash) 1225 1226 for destination_hash in expired: 1227 self.outbound_stamp_costs.pop(destination_hash) 1228 1229 except Exception as e: 1230 RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR) 1231 RNS.trace_exception(e) 1232 1233 def save_outbound_stamp_costs(self): 1234 with self.cost_file_lock: 1235 try: 1236 if not os.path.isdir(self.storagepath): 1237 os.makedirs(self.storagepath) 1238 1239 outbound_stamp_costs_file = open(self.storagepath+"/outbound_stamp_costs", "wb") 1240 outbound_stamp_costs_file.write(msgpack.packb(self.outbound_stamp_costs.copy())) 1241 outbound_stamp_costs_file.close() 1242 1243 except Exception as e: 1244 RNS.log("Could not save outbound stamp costs to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 1245 1246 def clean_available_tickets(self): 1247 try: 1248 # Clean outbound tickets 1249 expired_outbound = [] 1250 for destination_hash in self.available_tickets["outbound"]: 1251 entry = self.available_tickets["outbound"][destination_hash] 1252 if time.time() > entry[0]: 1253 expired_outbound.append(destination_hash) 1254 1255 for destination_hash in expired_outbound: 1256 self.available_tickets["outbound"].pop(destination_hash) 1257 1258 # Clean inbound tickets 1259 for destination_hash in self.available_tickets["inbound"]: 1260 expired_inbound = [] 1261 for inbound_ticket in self.available_tickets["inbound"][destination_hash]: 1262 entry = self.available_tickets["inbound"][destination_hash][inbound_ticket] 1263 ticket_expiry = entry[0] 1264 if time.time() > ticket_expiry+LXMessage.TICKET_GRACE: 1265 expired_inbound.append(inbound_ticket) 1266 1267 for inbound_ticket in expired_inbound: 1268 self.available_tickets["inbound"][destination_hash].pop(inbound_ticket) 1269 1270 except Exception as e: 1271 RNS.log(f"Error while cleaning available tickets. The contained exception was: {e}", RNS.LOG_ERROR) 1272 RNS.trace_exception(e) 1273 1274 def save_available_tickets(self): 1275 with self.ticket_file_lock: 1276 try: 1277 if not os.path.isdir(self.storagepath): 1278 os.makedirs(self.storagepath) 1279 1280 available_tickets_file = open(self.storagepath+"/available_tickets", "wb") 1281 available_tickets_file.write(msgpack.packb(self.available_tickets)) 1282 available_tickets_file.close() 1283 1284 except Exception as e: 1285 RNS.log("Could not save available tickets to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 1286 1287 def reload_available_tickets(self): 1288 RNS.log("Reloading available tickets from storage", RNS.LOG_DEBUG) 1289 try: 1290 with self.ticket_file_lock: 1291 with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file: 1292 data = available_tickets_file.read() 1293 self.available_tickets = msgpack.unpackb(data) 1294 if not type(self.available_tickets) == dict: 1295 RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR) 1296 self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} 1297 if not "outbound" in self.available_tickets: 1298 RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR) 1299 self.available_tickets["outbound"] = {} 1300 if not "inbound" in self.available_tickets: 1301 RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR) 1302 self.available_tickets["inbound"] = {} 1303 if not "last_deliveries" in self.available_tickets: 1304 RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR) 1305 self.available_tickets["last_deliveries"] = {} 1306 1307 except Exception as e: 1308 RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR) 1309 1310 def exit_handler(self): 1311 if self.exit_handler_running: 1312 return 1313 1314 self.exit_handler_running = True 1315 1316 RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE) 1317 for destination_hash in self.delivery_destinations: 1318 delivery_destination = self.delivery_destinations[destination_hash] 1319 delivery_destination.set_packet_callback(None) 1320 delivery_destination.set_link_established_callback(None) 1321 for link in delivery_destination.links: 1322 try: 1323 if link.status == RNS.Link.ACTIVE: 1324 link.teardown() 1325 except Exception as e: 1326 RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) 1327 1328 if self.propagation_node: 1329 RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE) 1330 self.propagation_destination.set_link_established_callback(None) 1331 self.propagation_destination.set_packet_callback(None) 1332 self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH) 1333 self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH) 1334 self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH) 1335 self.propagation_destination.deregister_request_handler(LXMRouter.SYNC_REQUEST_PATH) 1336 self.propagation_destination.deregister_request_handler(LXMRouter.UNPEER_REQUEST_PATH) 1337 for link in self.active_propagation_links: 1338 try: 1339 if link.status == RNS.Link.ACTIVE: 1340 link.teardown() 1341 except Exception as e: 1342 RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR) 1343 1344 RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE) 1345 self.flush_queues() 1346 if self.propagation_node: 1347 try: 1348 st = time.time(); RNS.log(f"Saving {len(self.peers)} peer synchronisation states to storage...", RNS.LOG_NOTICE) 1349 serialised_peers = [] 1350 peer_dict = self.peers.copy() 1351 for peer_id in peer_dict: 1352 peer = self.peers[peer_id] 1353 serialised_peers.append(peer.to_bytes()) 1354 1355 peers_file = open(self.storagepath+"/peers", "wb") 1356 peers_file.write(msgpack.packb(serialised_peers)) 1357 peers_file.close() 1358 1359 RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_NOTICE) 1360 1361 except Exception as e: 1362 RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) 1363 1364 self.save_locally_delivered_transient_ids() 1365 self.save_locally_processed_transient_ids() 1366 self.save_node_stats() 1367 1368 def sigint_handler(self, signal, frame): 1369 if threading.current_thread() != threading.main_thread(): 1370 RNS.log(f"SIGINT on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING) 1371 os._exit(0) 1372 else: 1373 if not self.exit_handler_running: 1374 RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING) 1375 self.exit_handler() 1376 RNS.exit(0) 1377 else: 1378 RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) 1379 1380 def sigterm_handler(self, signal, frame): 1381 if threading.current_thread() != threading.main_thread(): 1382 RNS.log(f"SIGTERM on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING) 1383 os._exit(0) 1384 else: 1385 if not self.exit_handler_running: 1386 RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING) 1387 self.exit_handler() 1388 RNS.exit(0) 1389 else: 1390 RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING) 1391 1392 def __str__(self): 1393 return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">" 1394 1395 1396 ### Message Download ################################## 1397 ####################################################### 1398 1399 def request_messages_path_job(self): 1400 job_thread = threading.Thread(target=self.__request_messages_path_job) 1401 job_thread.setDaemon(True) 1402 job_thread.start() 1403 1404 def __request_messages_path_job(self): 1405 path_timeout = self.wants_download_on_path_available_timeout 1406 while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < path_timeout: 1407 time.sleep(0.1) 1408 1409 if RNS.Transport.has_path(self.wants_download_on_path_available_from): 1410 self.request_messages_from_propagation_node(self.wants_download_on_path_available_to, self.propagation_transfer_max_messages) 1411 else: 1412 RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG) 1413 self.acknowledge_sync_completion(failure_state=LXMRouter.PR_NO_PATH) 1414 1415 def identity_allowed(self, identity): 1416 if self.auth_required: 1417 if identity.hash in self.allowed_list: 1418 return True 1419 else: 1420 return False 1421 1422 else: 1423 return True 1424 1425 def message_get_request(self, path, data, request_id, remote_identity, requested_at): 1426 if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY 1427 elif not self.identity_allowed(remote_identity): return LXMPeer.ERROR_NO_ACCESS 1428 else: 1429 try: 1430 remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery") 1431 1432 # If both want and have fields are empty, send a list of 1433 # available messages. 1434 if data[0] == None and data[1] == None: 1435 available_messages = [] 1436 for transient_id in self.propagation_entries: 1437 message_entry = self.propagation_entries[transient_id] 1438 if message_entry[0] == remote_destination.hash: 1439 message_size = os.path.getsize(message_entry[1]) 1440 available_entry = [transient_id, message_size] 1441 available_messages.append(available_entry) 1442 1443 available_messages.sort(key=lambda e: e[1], reverse=False) 1444 1445 transient_ids = [] 1446 for available_entry in available_messages: transient_ids.append(available_entry[0]) 1447 return transient_ids 1448 1449 else: 1450 # Process messages the client already have 1451 if data[1] != None and len(data[1]) > 0: 1452 for transient_id in data[1]: 1453 if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash: 1454 try: 1455 filepath = self.propagation_entries[transient_id][1] 1456 self.propagation_entries.pop(transient_id) 1457 os.unlink(filepath) 1458 # TODO: Remove debug 1459 # RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) 1460 1461 except Exception as e: 1462 RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1463 1464 1465 # Process wanted messages 1466 response_messages = [] 1467 if data[0] != None and len(data[0]) > 0: 1468 client_transfer_limit = None 1469 if len(data) >= 3: 1470 try: 1471 client_transfer_limit = float(data[2])*1000 1472 RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG) 1473 except: pass 1474 1475 per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now 1476 cumulative_size = 24 # Initialised to highest reasonable binary structure overhead 1477 for transient_id in data[0]: 1478 if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash: 1479 try: 1480 filepath = self.propagation_entries[transient_id][1] 1481 RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" requested message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) 1482 1483 message_file = open(filepath, "rb") 1484 lxmf_data = message_file.read() 1485 message_file.close() 1486 1487 lxm_size = len(lxmf_data) 1488 next_size = cumulative_size + (lxm_size+per_message_overhead) 1489 1490 if client_transfer_limit != None and next_size > client_transfer_limit: pass 1491 else: 1492 response_messages.append(lxmf_data[:-LXStamper.STAMP_SIZE]) 1493 cumulative_size += (lxm_size+per_message_overhead) 1494 1495 except Exception as e: 1496 RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1497 1498 self.client_propagation_messages_served += len(response_messages) 1499 return response_messages 1500 1501 except Exception as e: 1502 RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG) 1503 return None 1504 1505 def message_list_response(self, request_receipt): 1506 if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: 1507 RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG) 1508 if self.outbound_propagation_link != None: 1509 self.outbound_propagation_link.teardown() 1510 self.propagation_transfer_state = LXMRouter.PR_NO_IDENTITY_RCVD 1511 1512 elif request_receipt.response == LXMPeer.ERROR_NO_ACCESS: 1513 RNS.log("Propagation node did not allow list request, tearing down link.", RNS.LOG_DEBUG) 1514 if self.outbound_propagation_link != None: 1515 self.outbound_propagation_link.teardown() 1516 self.propagation_transfer_state = LXMRouter.PR_NO_ACCESS 1517 1518 else: 1519 if request_receipt.response != None and isinstance(request_receipt.response, list): 1520 haves = [] 1521 wants = [] 1522 if len(request_receipt.response) > 0: 1523 for transient_id in request_receipt.response: 1524 if self.has_message(transient_id): 1525 if not self.retain_synced_on_node: 1526 haves.append(transient_id) 1527 else: 1528 if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages: 1529 wants.append(transient_id) 1530 1531 ms = "" if len(wants) == 1 else "s" 1532 RNS.log(f"Requesting {len(wants)} message{ms} from propagation node", RNS.LOG_DEBUG) 1533 request_receipt.link.request( 1534 LXMPeer.MESSAGE_GET_PATH, 1535 [wants, haves, self.delivery_per_transfer_limit], 1536 response_callback=self.message_get_response, 1537 failed_callback=self.message_get_failed, 1538 progress_callback=self.message_get_progress) 1539 1540 else: 1541 self.propagation_transfer_state = LXMRouter.PR_COMPLETE 1542 self.propagation_transfer_progress = 1.0 1543 self.propagation_transfer_last_result = 0 1544 1545 else: 1546 RNS.log("Invalid message list data received from propagation node", RNS.LOG_DEBUG) 1547 if self.outbound_propagation_link != None: 1548 self.outbound_propagation_link.teardown() 1549 1550 def message_get_response(self, request_receipt): 1551 if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: 1552 RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG) 1553 if self.outbound_propagation_link != None: 1554 self.outbound_propagation_link.teardown() 1555 self.propagation_transfer_state = LXMRouter.PR_NO_IDENTITY_RCVD 1556 1557 elif request_receipt.response == LXMPeer.ERROR_NO_ACCESS: 1558 RNS.log("Propagation node did not allow get request, tearing down link.", RNS.LOG_DEBUG) 1559 if self.outbound_propagation_link != None: 1560 self.outbound_propagation_link.teardown() 1561 self.propagation_transfer_state = LXMRouter.PR_NO_ACCESS 1562 1563 else: 1564 duplicates = 0 1565 if request_receipt.response != None and len(request_receipt.response) > 0: 1566 haves = [] 1567 for lxmf_data in request_receipt.response: 1568 result = self.lxmf_propagation(lxmf_data, signal_duplicate=LXMRouter.DUPLICATE_SIGNAL) 1569 if result == LXMRouter.DUPLICATE_SIGNAL: duplicates += 1 1570 haves.append(RNS.Identity.full_hash(lxmf_data)) 1571 1572 # Return a list of successfully received messages to the node. 1573 # This deletes the messages on the propagation node. 1574 # TODO: Add option to keep messages on node. 1575 request_receipt.link.request( 1576 LXMPeer.MESSAGE_GET_PATH, 1577 [None, haves], 1578 # response_callback=self.message_syncfinal_response, 1579 failed_callback=self.message_get_failed, 1580 # progress_callback=self.message_get_progress 1581 ) 1582 1583 self.propagation_transfer_state = LXMRouter.PR_COMPLETE 1584 self.propagation_transfer_progress = 1.0 1585 self.propagation_transfer_last_duplicates = duplicates 1586 self.propagation_transfer_last_result = len(request_receipt.response) 1587 self.save_locally_delivered_transient_ids() 1588 1589 def message_get_progress(self, request_receipt): 1590 self.propagation_transfer_state = LXMRouter.PR_RECEIVING 1591 self.propagation_transfer_progress = request_receipt.get_progress() 1592 1593 def message_get_failed(self, request_receipt): 1594 RNS.log("Message list/get request failed", RNS.LOG_DEBUG) 1595 if self.outbound_propagation_link != None: 1596 self.outbound_propagation_link.teardown() 1597 1598 def acknowledge_sync_completion(self, reset_state=False, failure_state=None): 1599 self.propagation_transfer_last_result = None 1600 if reset_state or self.propagation_transfer_state <= LXMRouter.PR_COMPLETE: 1601 if failure_state == None: 1602 self.propagation_transfer_state = LXMRouter.PR_IDLE 1603 else: 1604 self.propagation_transfer_state = failure_state 1605 1606 self.propagation_transfer_progress = 0.0 1607 self.wants_download_on_path_available_from = None 1608 self.wants_download_on_path_available_to = None 1609 1610 def has_message(self, transient_id): 1611 if transient_id in self.locally_delivered_transient_ids: 1612 return True 1613 else: 1614 return False 1615 1616 def cancel_outbound(self, message_id, cancel_state=LXMessage.CANCELLED): 1617 try: 1618 if message_id in self.pending_deferred_stamps: 1619 lxm = self.pending_deferred_stamps[message_id] 1620 RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG) 1621 lxm.state = cancel_state 1622 LXStamper.cancel_work(message_id) 1623 1624 lxmessage = None 1625 for lxm in self.pending_outbound: 1626 if lxm.message_id == message_id: 1627 lxmessage = lxm 1628 1629 if lxmessage != None: 1630 lxmessage.state = cancel_state 1631 if lxmessage in self.pending_outbound: 1632 RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG) 1633 if lxmessage.representation == LXMessage.RESOURCE: 1634 if lxmessage.resource_representation != None: 1635 lxmessage.resource_representation.cancel() 1636 1637 self.process_outbound() 1638 1639 except Exception as e: 1640 RNS.log(f"An error occurred while cancelling {lxmessage}: {e}", RNS.LOG_ERROR) 1641 RNS.trace_exception(e) 1642 1643 def handle_outbound(self, lxmessage): 1644 destination_hash = lxmessage.get_destination().hash 1645 1646 if lxmessage.stamp_cost == None: 1647 if destination_hash in self.outbound_stamp_costs: 1648 stamp_cost = self.outbound_stamp_costs[destination_hash][1] 1649 lxmessage.stamp_cost = stamp_cost 1650 RNS.log(f"No stamp cost set on LXM to {RNS.prettyhexrep(destination_hash)}, autoconfigured to {stamp_cost}, as required by latest announce", RNS.LOG_DEBUG) 1651 1652 lxmessage.state = LXMessage.OUTBOUND 1653 1654 # If an outbound ticket is available for this 1655 # destination, attach it to the message. 1656 lxmessage.outbound_ticket = self.get_outbound_ticket(destination_hash) 1657 if lxmessage.outbound_ticket != None and lxmessage.defer_stamp: 1658 RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but outbound ticket was applied, processing immediately", RNS.LOG_DEBUG) 1659 lxmessage.defer_stamp = False 1660 1661 # If requested, include a ticket to allow the 1662 # destination to reply without generating a stamp. 1663 if lxmessage.include_ticket: 1664 ticket = self.generate_ticket(lxmessage.destination_hash) 1665 if ticket: lxmessage.fields[FIELD_TICKET] = ticket 1666 1667 if not lxmessage.packed: lxmessage.pack() 1668 1669 unknown_path_requested = False 1670 if not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC: 1671 RNS.log(f"Pre-emptively requesting unknown path for opportunistic {lxmessage}", RNS.LOG_DEBUG) 1672 RNS.Transport.request_path(destination_hash) 1673 lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT 1674 unknown_path_requested = True 1675 1676 lxmessage.determine_transport_encryption() 1677 1678 if lxmessage.defer_stamp and lxmessage.stamp_cost == None: 1679 RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG) 1680 lxmessage.defer_stamp = False 1681 1682 if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp): 1683 while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05) 1684 1685 self.pending_outbound.append(lxmessage) 1686 if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start() 1687 1688 else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage 1689 1690 def get_outbound_progress(self, lxm_hash): 1691 for lxm in self.pending_outbound: 1692 if lxm.hash == lxm_hash: 1693 return lxm.progress 1694 1695 for lxm_id in self.pending_deferred_stamps: 1696 if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: 1697 return self.pending_deferred_stamps[lxm_id].progress 1698 1699 return None 1700 1701 def get_outbound_lxm_stamp_cost(self, lxm_hash): 1702 for lxm in self.pending_outbound: 1703 if lxm.hash == lxm_hash: 1704 if lxm.outbound_ticket: return None 1705 else: return lxm.stamp_cost 1706 1707 for lxm_id in self.pending_deferred_stamps: 1708 if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: 1709 lxm = self.pending_deferred_stamps[lxm_id] 1710 if lxm.outbound_ticket: return None 1711 else: return lxm.stamp_cost 1712 1713 return None 1714 1715 def get_outbound_lxm_propagation_stamp_cost(self, lxm_hash): 1716 for lxm in self.pending_outbound: 1717 if lxm.hash == lxm_hash: 1718 return lxm.propagation_target_cost 1719 1720 for lxm_id in self.pending_deferred_stamps: 1721 if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: 1722 return self.pending_deferred_stamps[lxm_id].propagation_target_cost 1723 1724 return None 1725 1726 1727 ### Message Routing & Delivery ######################## 1728 ####################################################### 1729 1730 def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False): 1731 try: 1732 message = LXMessage.unpack_from_bytes(lxmf_data) 1733 if ratchet_id and not message.ratchet_id: 1734 message.ratchet_id = ratchet_id 1735 1736 if method: 1737 message.method = method 1738 1739 if message.signature_validated and FIELD_TICKET in message.fields: 1740 ticket_entry = message.fields[FIELD_TICKET] 1741 if type(ticket_entry) == list and len(ticket_entry) > 1: 1742 expires = ticket_entry[0] 1743 ticket = ticket_entry[1] 1744 1745 if time.time() < expires: 1746 if type(ticket) == bytes and len(ticket) == LXMessage.TICKET_LENGTH: 1747 self.remember_ticket(message.source_hash, ticket_entry) 1748 def save_job(): 1749 self.save_available_tickets() 1750 threading.Thread(target=save_job, daemon=True).start() 1751 1752 required_stamp_cost = self.delivery_destinations[message.destination_hash].stamp_cost 1753 if required_stamp_cost != None: 1754 destination_tickets = self.get_inbound_tickets(message.source_hash) 1755 if message.validate_stamp(required_stamp_cost, tickets=destination_tickets): 1756 message.stamp_valid = True 1757 message.stamp_checked = True 1758 else: 1759 message.stamp_valid = False 1760 message.stamp_checked = True 1761 1762 if not message.stamp_valid: 1763 if no_stamp_enforcement: 1764 RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement was temporarily disabled", RNS.LOG_NOTICE) 1765 else: 1766 if self._enforce_stamps: 1767 RNS.log(f"Dropping {message} with invalid stamp", RNS.LOG_NOTICE) 1768 return False 1769 else: 1770 RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement is disabled", RNS.LOG_NOTICE) 1771 else: 1772 RNS.log(f"Received {message} with valid stamp", RNS.LOG_DEBUG) 1773 1774 if phy_stats != None: 1775 if "rssi" in phy_stats: message.rssi = phy_stats["rssi"] 1776 if "snr" in phy_stats: message.snr = phy_stats["snr"] 1777 if "q" in phy_stats: message.q = phy_stats["q"] 1778 1779 # TODO: Update these descriptions to account for ratchets 1780 if destination_type == RNS.Destination.SINGLE: 1781 message.transport_encrypted = True 1782 message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC 1783 elif destination_type == RNS.Destination.GROUP: 1784 message.transport_encrypted = True 1785 message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES 1786 elif destination_type == RNS.Destination.LINK: 1787 message.transport_encrypted = True 1788 message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC 1789 else: 1790 message.transport_encrypted = False 1791 message.transport_encryption = None 1792 1793 if message.source_hash in self.ignored_list: 1794 RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) 1795 return False 1796 1797 if not allow_duplicate and self.has_message(message.hash): 1798 RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) 1799 return False 1800 else: 1801 self.locally_delivered_transient_ids[message.hash] = time.time() 1802 1803 if self.__delivery_callback != None and callable(self.__delivery_callback): 1804 try: 1805 self.__delivery_callback(message) 1806 except Exception as e: 1807 RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR) 1808 RNS.trace_exception(e) 1809 1810 return True 1811 1812 except Exception as e: 1813 RNS.log("Could not assemble LXMF message from received data", RNS.LOG_NOTICE) 1814 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 1815 return False 1816 1817 def delivery_packet(self, data, packet): 1818 packet.prove() 1819 try: 1820 method = None 1821 if packet.destination_type != RNS.Destination.LINK: 1822 method = LXMessage.OPPORTUNISTIC 1823 lxmf_data = b"" 1824 lxmf_data += packet.destination.hash 1825 lxmf_data += data 1826 else: 1827 method = LXMessage.DIRECT 1828 lxmf_data = data 1829 1830 try: 1831 reticulum = RNS.Reticulum.get_instance() 1832 if packet.rssi == None: packet.rssi = reticulum.get_packet_rssi(packet.packet_hash) 1833 if packet.snr == None: packet.snr = reticulum.get_packet_snr(packet.packet_hash) 1834 if packet.q == None: packet.q = reticulum.get_packet_q(packet.packet_hash) 1835 except Exception as e: 1836 RNS.log("Error while retrieving physical link stats for LXMF delivery packet: "+str(e), RNS.LOG_ERROR) 1837 1838 phy_stats = {"rssi": packet.rssi, "snr": packet.snr, "q": packet.q} 1839 1840 self.lxmf_delivery(lxmf_data, packet.destination_type, phy_stats=phy_stats, ratchet_id=packet.ratchet_id, method=method) 1841 1842 except Exception as e: 1843 RNS.log("Exception occurred while parsing incoming LXMF data.", RNS.LOG_ERROR) 1844 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 1845 1846 def delivery_link_established(self, link): 1847 link.track_phy_stats(True) 1848 link.set_packet_callback(self.delivery_packet) 1849 link.set_resource_strategy(RNS.Link.ACCEPT_APP) 1850 link.set_resource_callback(self.delivery_resource_advertised) 1851 link.set_resource_started_callback(self.resource_transfer_began) 1852 link.set_resource_concluded_callback(self.delivery_resource_concluded) 1853 link.set_remote_identified_callback(self.delivery_remote_identified) 1854 1855 def delivery_link_closed(self, link): 1856 pass 1857 1858 def resource_transfer_began(self, resource): 1859 RNS.log("Transfer began for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG) 1860 1861 def delivery_resource_advertised(self, resource): 1862 size = resource.get_data_size() 1863 limit = self.delivery_per_transfer_limit*1000 1864 if limit != None and size > limit: 1865 RNS.log("Rejecting "+RNS.prettysize(size)+" incoming LXMF delivery resource, since it exceeds the limit of "+RNS.prettysize(limit), RNS.LOG_DEBUG) 1866 return False 1867 else: 1868 return True 1869 1870 def delivery_resource_concluded(self, resource): 1871 RNS.log("Transfer concluded for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG) 1872 if resource.status == RNS.Resource.COMPLETE: 1873 ratchet_id = None 1874 # Set ratchet ID to link ID if available 1875 if resource.link and hasattr(resource.link, "link_id"): 1876 ratchet_id = resource.link.link_id 1877 phy_stats = {"rssi": resource.link.rssi, "snr": resource.link.snr, "q": resource.link.q} 1878 self.lxmf_delivery(resource.data.read(), resource.link.type, phy_stats=phy_stats, ratchet_id=ratchet_id, method=LXMessage.DIRECT) 1879 1880 def delivery_remote_identified(self, link, identity): 1881 destination_hash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", identity) 1882 self.backchannel_links[destination_hash] = link 1883 RNS.log(f"Backchannel became available for {RNS.prettyhexrep(destination_hash)} on delivery link {link}", RNS.LOG_DEBUG) 1884 1885 1886 ### Peer Sync & Propagation ########################### 1887 ####################################################### 1888 1889 def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility, peering_cost, metadata): 1890 if peering_cost > self.max_peering_cost: 1891 if destination_hash in self.peers: 1892 RNS.log(f"Peer {RNS.prettyhexrep(destination_hash)} increased peering cost beyond local accepted maximum, breaking peering...", RNS.LOG_NOTICE) 1893 self.unpeer(destination_hash, timestamp) 1894 else: 1895 RNS.log(f"Not peering with {RNS.prettyhexrep(destination_hash)}, since its peering cost of {peering_cost} exceeds local maximum of {self.max_peering_cost}", RNS.LOG_NOTICE) 1896 1897 else: 1898 if destination_hash in self.peers: 1899 peer = self.peers[destination_hash] 1900 if timestamp > peer.peering_timebase: 1901 peer.alive = True 1902 peer.metadata = metadata 1903 peer.sync_backoff = 0 1904 peer.next_sync_attempt = 0 1905 peer.peering_timebase = timestamp 1906 peer.last_heard = time.time() 1907 peer.propagation_stamp_cost = propagation_stamp_cost 1908 peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility 1909 peer.peering_cost = peering_cost 1910 peer.propagation_transfer_limit = propagation_transfer_limit 1911 if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit 1912 else: peer.propagation_sync_limit = propagation_transfer_limit 1913 1914 RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) 1915 1916 else: 1917 if len(self.peers) >= self.max_peers: RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) 1918 else: 1919 peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy) 1920 peer.alive = True 1921 peer.metadata = metadata 1922 peer.last_heard = time.time() 1923 peer.propagation_stamp_cost = propagation_stamp_cost 1924 peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility 1925 peer.peering_cost = peering_cost 1926 peer.propagation_transfer_limit = propagation_transfer_limit 1927 if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit 1928 else: peer.propagation_sync_limit = propagation_transfer_limit 1929 1930 self.peers[destination_hash] = peer 1931 RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE) 1932 1933 1934 def unpeer(self, destination_hash, timestamp = None): 1935 if timestamp == None: 1936 timestamp = int(time.time()) 1937 1938 if destination_hash in self.peers: 1939 peer = self.peers[destination_hash] 1940 1941 if timestamp >= peer.peering_timebase: 1942 self.peers.pop(destination_hash) 1943 RNS.log("Broke peering with "+str(peer.destination)) 1944 1945 def rotate_peers(self): 1946 try: 1947 rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0))) 1948 required_drops = len(self.peers) - (self.max_peers - rotation_headroom) 1949 if required_drops > 0 and len(self.peers) - required_drops > 1: 1950 peers = self.peers.copy() 1951 untested_peers = [] 1952 for peer_id in self.peers: 1953 peer = self.peers[peer_id] 1954 if peer.last_sync_attempt == 0: 1955 untested_peers.append(peer) 1956 1957 if len(untested_peers) >= rotation_headroom: 1958 RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG) 1959 return 1960 1961 fully_synced_peers = {} 1962 for peer_id in peers: 1963 peer = peers[peer_id] 1964 if peer.unhandled_message_count == 0: 1965 fully_synced_peers[peer_id] = peer 1966 1967 if len(fully_synced_peers) > 0: 1968 peers = fully_synced_peers 1969 ms = "" if len(fully_synced_peers) == 1 else "s" 1970 RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG) 1971 1972 culled_peers = [] 1973 waiting_peers = [] 1974 unresponsive_peers = [] 1975 for peer_id in peers: 1976 peer = peers[peer_id] 1977 if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE: 1978 if peer.alive: 1979 if peer.offered == 0: 1980 # Don't consider for unpeering until at 1981 # least one message has been offered 1982 pass 1983 else: 1984 waiting_peers.append(peer) 1985 else: 1986 unresponsive_peers.append(peer) 1987 1988 drop_pool = [] 1989 if len(unresponsive_peers) > 0: 1990 drop_pool.extend(unresponsive_peers) 1991 if not self.prioritise_rotating_unreachable_peers: 1992 drop_pool.extend(waiting_peers) 1993 1994 else: 1995 drop_pool.extend(waiting_peers) 1996 1997 if len(drop_pool) > 0: 1998 drop_count = min(required_drops, len(drop_pool)) 1999 low_acceptance_rate_peers = sorted( 2000 drop_pool, 2001 key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ), 2002 reverse=False 2003 )[0:drop_count] 2004 2005 dropped_peers = 0 2006 for peer in low_acceptance_rate_peers: 2007 ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2) 2008 if ar < LXMRouter.ROTATION_AR_MAX*100: 2009 reachable_str = "reachable" if peer.alive else "unreachable" 2010 RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG) 2011 self.unpeer(peer.destination_hash) 2012 dropped_peers += 1 2013 2014 ms = "" if dropped_peers == 1 else "s" 2015 RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG) 2016 2017 except Exception as e: 2018 RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR) 2019 RNS.trace_exception(e) 2020 2021 def sync_peers(self): 2022 culled_peers = [] 2023 waiting_peers = [] 2024 unresponsive_peers = [] 2025 peers = self.peers.copy() 2026 for peer_id in peers: 2027 peer = peers[peer_id] 2028 if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: 2029 if not peer_id in self.static_peers: culled_peers.append(peer_id) 2030 2031 else: 2032 if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: 2033 if peer.alive: waiting_peers.append(peer) 2034 else: 2035 if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: unresponsive_peers.append(peer) 2036 else: pass # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG) 2037 2038 peer_pool = [] 2039 if len(waiting_peers) > 0: 2040 fastest_peers = sorted( 2041 waiting_peers, 2042 key=lambda p: p.sync_transfer_rate, 2043 reverse=True 2044 )[0:min(LXMRouter.FASTEST_N_RANDOM_POOL, len(waiting_peers))] 2045 peer_pool.extend(fastest_peers) 2046 2047 unknown_speed_peers = [p for p in waiting_peers if p.sync_transfer_rate == 0] 2048 if len(unknown_speed_peers) > 0: 2049 peer_pool.extend( 2050 unknown_speed_peers[ 2051 0:min( 2052 len(unknown_speed_peers), 2053 len(fastest_peers) 2054 )] 2055 ) 2056 2057 RNS.log("Selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG) 2058 2059 elif len(unresponsive_peers) > 0: 2060 RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG) 2061 peer_pool = unresponsive_peers 2062 2063 if len(peer_pool) > 0: 2064 selected_index = random.randint(0,len(peer_pool)-1) 2065 selected_peer = peer_pool[selected_index] 2066 RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG) 2067 selected_peer.sync() 2068 2069 for peer_id in culled_peers: 2070 RNS.log("Removing peer "+RNS.prettyhexrep(peer_id)+" due to excessive unreachability", RNS.LOG_WARNING) 2071 try: 2072 if peer_id in self.peers: 2073 self.peers.pop(peer_id) 2074 except Exception as e: 2075 RNS.log("Error while removing peer "+RNS.prettyhexrep(peer_id)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 2076 2077 def propagation_link_established(self, link): 2078 link.set_packet_callback(self.propagation_packet) 2079 link.set_resource_strategy(RNS.Link.ACCEPT_APP) 2080 link.set_resource_callback(self.propagation_resource_advertised) 2081 link.set_resource_started_callback(self.resource_transfer_began) 2082 link.set_resource_concluded_callback(self.propagation_resource_concluded) 2083 self.active_propagation_links.append(link) 2084 2085 def propagation_resource_advertised(self, resource): 2086 if self.from_static_only: 2087 remote_identity = resource.link.get_remote_identity() 2088 if remote_identity == None: 2089 RNS.log(f"Rejecting propagation resource from unidentified peer", RNS.LOG_DEBUG) 2090 return False 2091 else: 2092 remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") 2093 remote_hash = remote_destination.hash 2094 remote_str = RNS.prettyhexrep(remote_hash) 2095 if not remote_hash in self.static_peers: 2096 RNS.log(f"Rejecting propagation resource from {remote_str} not in static peers list", RNS.LOG_DEBUG) 2097 return False 2098 2099 size = resource.get_data_size() 2100 limit = self.propagation_per_sync_limit*1000 2101 if limit != None and size > limit: 2102 RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG) 2103 return False 2104 else: 2105 return True 2106 2107 def propagation_packet(self, data, packet): 2108 try: 2109 if packet.destination_type != RNS.Destination.LINK: return 2110 else: 2111 data = msgpack.unpackb(data) 2112 remote_timebase = data[0] 2113 messages = data[1] 2114 2115 min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) 2116 validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost) 2117 2118 for validated_entry in validated_messages: 2119 lxmf_data = validated_entry[1] 2120 stamp_value = validated_entry[2] 2121 stamp_data = validated_entry[3] 2122 self.lxmf_propagation(lxmf_data, stamp_value=stamp_value, stamp_data=stamp_data) 2123 self.client_propagation_messages_received += 1 2124 2125 if len(validated_messages) == len(messages): 2126 ms = "" if len(messages) == 1 else "s" 2127 RNS.log(f"Received {len(messages)} propagation message{ms} from client with valid stamp{ms}", RNS.LOG_DEBUG) 2128 packet.prove() 2129 else: 2130 RNS.log("Propagation transfer from client contained messages with invalid stamps", RNS.LOG_NOTICE) 2131 reject_data = msgpack.packb([LXMPeer.ERROR_INVALID_STAMP]) 2132 RNS.Packet(packet.link, reject_data).send() 2133 packet.link.teardown() 2134 2135 except Exception as e: 2136 RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) 2137 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 2138 2139 def offer_request(self, path, data, request_id, link_id, remote_identity, requested_at): 2140 if remote_identity == None: 2141 return LXMPeer.ERROR_NO_IDENTITY 2142 else: 2143 remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") 2144 remote_hash = remote_destination.hash 2145 remote_str = RNS.prettyhexrep(remote_hash) 2146 2147 if remote_hash in self.throttled_peers: 2148 throttle_remaining = self.throttled_peers[remote_hash]-time.time() 2149 if throttle_remaining > 0: 2150 RNS.log(f"Propagation offer from node {remote_str} rejected, throttled for {RNS.prettytime(throttle_remaining)} more", RNS.LOG_NOTICE) 2151 return LXMPeer.ERROR_THROTTLED 2152 else: self.throttled_peers.pop(remote_hash) 2153 2154 if self.from_static_only: 2155 if not remote_hash in self.static_peers: 2156 RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG) 2157 return LXMPeer.ERROR_NO_ACCESS 2158 2159 try: 2160 if type(data) != list and len(data) < 2: return LXMPeer.ERROR_INVALID_DATA 2161 2162 peering_id = self.identity.hash+remote_identity.hash 2163 target_cost = self.peering_cost 2164 peering_key = data[0] 2165 transient_ids = data[1] 2166 wanted_ids = [] 2167 2168 ts = time.time() 2169 peering_key_valid = LXStamper.validate_peering_key(peering_id, peering_key, target_cost) 2170 td = time.time() - ts 2171 2172 if not peering_key_valid: 2173 RNS.log(f"Invalid peering key for incoming sync offer", RNS.LOG_DEBUG) 2174 return LXMPeer.ERROR_INVALID_KEY 2175 2176 else: 2177 RNS.log(f"Peering key validated for incoming offer in {RNS.prettytime(td)}", RNS.LOG_DEBUG) 2178 self.validated_peer_links[link_id] = True 2179 for transient_id in transient_ids: 2180 if not transient_id in self.propagation_entries: wanted_ids.append(transient_id) 2181 2182 if len(wanted_ids) == 0: return False 2183 elif len(wanted_ids) == len(transient_ids): return True 2184 else: return wanted_ids 2185 2186 except Exception as e: 2187 RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) 2188 RNS.trace_exception(e) 2189 return None 2190 2191 def propagation_resource_concluded(self, resource): 2192 if resource.status == RNS.Resource.COMPLETE: 2193 try: 2194 data = msgpack.unpackb(resource.data.read()) 2195 2196 if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list: 2197 # This is a series of propagation messages from a peer or originator 2198 remote_identity = resource.link.get_remote_identity() 2199 remote_timebase = data[0] 2200 messages = data[1] 2201 remote_hash = None 2202 remote_str = "unknown client" 2203 2204 if remote_identity != None: 2205 remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") 2206 remote_hash = remote_destination.hash 2207 remote_app_data = RNS.Identity.recall_app_data(remote_hash) 2208 remote_str = RNS.prettyhexrep(remote_hash) 2209 2210 if remote_hash in self.peers: remote_str = f"peer {remote_str}" 2211 else: 2212 if pn_announce_data_is_valid(remote_app_data): 2213 # 1: Current node timebase 2214 # 2: Boolean flag signalling propagation node state 2215 # 3: Per-transfer limit for message propagation in kilobytes 2216 # 4: Limit for incoming propagation node syncs 2217 # 5: Propagation stamp costs for this node 2218 # 6: Node metadata 2219 pn_config = msgpack.unpackb(remote_app_data) 2220 if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: 2221 remote_timebase = pn_config[1] 2222 remote_transfer_limit = pn_config[3] 2223 remote_sync_limit = pn_config[4] 2224 remote_stamp_cost = pn_config[5][0] 2225 remote_stamp_flex = pn_config[5][1] 2226 remote_peering_cost = pn_config[5][2] 2227 remote_metadata = pn_config[6] 2228 2229 RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug 2230 self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata) 2231 2232 peering_key_valid = False 2233 if remote_identity != None: 2234 if resource.link.link_id in self.validated_peer_links and self.validated_peer_links[resource.link.link_id] == True: 2235 peering_key_valid = True 2236 2237 if not peering_key_valid and len(messages) > 1: 2238 resource.link.teardown() 2239 RNS.log(f"Received multiple propagation messages from {remote_str} without valid peering key presentation. This is not supposed to happen, ignoring.", RNS.LOG_WARNING) 2240 RNS.log(f"Clients and peers without a valid peering key can only deliver 1 message per transfer.", RNS.LOG_WARNING) 2241 else: 2242 ms = "" if len(messages) == 1 else "s" 2243 RNS.log(f"Received {len(messages)} message{ms} from {remote_str}, validating stamps...", RNS.LOG_VERBOSE) 2244 2245 min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) 2246 validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost) 2247 invalid_stamps = len(messages)-len(validated_messages) 2248 ms = "" if invalid_stamps == 1 else "s" 2249 if len(validated_messages) == len(messages): RNS.log(f"All message stamps validated from {remote_str}", RNS.LOG_VERBOSE) 2250 else: RNS.log(f"Transfer from {remote_str} contained {invalid_stamps} invalid stamp{ms}", RNS.LOG_WARNING) 2251 2252 for validated_entry in validated_messages: 2253 transient_id = validated_entry[0] 2254 lxmf_data = validated_entry[1] 2255 stamp_value = validated_entry[2] 2256 stamp_data = validated_entry[3] 2257 peer = None 2258 2259 if remote_hash != None and remote_hash in self.peers: 2260 peer = self.peers[remote_hash] 2261 peer.incoming += 1 2262 peer.rx_bytes += len(lxmf_data) 2263 else: 2264 if remote_identity != None: 2265 self.unpeered_propagation_incoming += 1 2266 self.unpeered_propagation_rx_bytes += len(lxmf_data) 2267 else: 2268 self.client_propagation_messages_received += 1 2269 2270 self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data) 2271 if peer != None: peer.queue_handled_message(transient_id) 2272 2273 invalid_message_count = len(messages) - len(validated_messages) 2274 if invalid_message_count > 0: 2275 resource.link.teardown() 2276 if remote_identity != None: 2277 throttle_time = LXMRouter.PN_STAMP_THROTTLE 2278 self.throttled_peers[remote_hash] = time.time()+throttle_time 2279 ms = "" if invalid_message_count == 1 else "s" 2280 RNS.log(f"Propagation transfer from {remote_str} contained {invalid_message_count} message{ms} with invalid stamps, throttled for {RNS.prettytime(throttle_time)}", RNS.LOG_NOTICE) 2281 2282 else: 2283 RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) 2284 2285 except Exception as e: 2286 RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) 2287 RNS.trace_exception(e) 2288 2289 def enqueue_peer_distribution(self, transient_id, from_peer): 2290 self.peer_distribution_queue.append([transient_id, from_peer]) 2291 2292 def flush_peer_distribution_queue(self): 2293 if len(self.peer_distribution_queue) > 0: 2294 entries = [] 2295 while len(self.peer_distribution_queue) > 0: 2296 entries.append(self.peer_distribution_queue.pop()) 2297 2298 for peer_id in self.peers.copy(): 2299 if peer_id in self.peers: 2300 peer = self.peers[peer_id] 2301 for entry in entries: 2302 transient_id = entry[0] 2303 from_peer = entry[1] 2304 if peer != from_peer: 2305 peer.queue_unhandled_message(transient_id) 2306 2307 def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, 2308 from_peer=None, stamp_value=None, stamp_data=None): 2309 if is_paper_message: no_stamp_enforcement = True 2310 else: no_stamp_enforcement = False 2311 2312 try: 2313 if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: 2314 transient_id = RNS.Identity.full_hash(lxmf_data) 2315 2316 if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True: 2317 received = time.time() 2318 destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] 2319 2320 self.locally_processed_transient_ids[transient_id] = received 2321 2322 if destination_hash in self.delivery_destinations: 2323 delivery_destination = self.delivery_destinations[destination_hash] 2324 encrypted_lxmf_data = lxmf_data[LXMessage.DESTINATION_LENGTH:] 2325 decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data) 2326 if decrypted_lxmf_data != None: 2327 delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data 2328 self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate) 2329 self.locally_delivered_transient_ids[transient_id] = time.time() 2330 2331 if signal_local_delivery != None: 2332 return signal_local_delivery 2333 2334 else: 2335 if self.propagation_node: 2336 stamped_data = lxmf_data+stamp_data 2337 value_component = f"_{stamp_value}" if stamp_value and stamp_value > 0 else "" 2338 file_path = f"{self.messagepath}/{RNS.hexrep(transient_id, delimit=False)}_{received}{value_component}" 2339 msg_file = open(file_path, "wb") 2340 msg_file.write(stamped_data); msg_file.close() 2341 2342 RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)} with stamp value {stamp_value}, adding to peer distribution queues...", RNS.LOG_EXTREME) 2343 self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(stamped_data), [], [], stamp_value] 2344 self.enqueue_peer_distribution(transient_id, from_peer) 2345 2346 else: 2347 # TODO: Add message to sneakernet queues when implemented 2348 RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)}, but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG) 2349 2350 return True 2351 2352 else: 2353 if signal_duplicate != None: 2354 return signal_duplicate 2355 2356 else: 2357 return False 2358 2359 return False 2360 2361 except Exception as e: 2362 RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG) 2363 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 2364 RNS.trace_exception(e) 2365 return False 2366 2367 def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False): 2368 try: 2369 if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"): 2370 RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR) 2371 return False 2372 2373 else: 2374 lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==") 2375 transient_id = RNS.Identity.full_hash(lxmf_data) 2376 2377 router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True) 2378 if router_propagation_result != False: 2379 RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG) 2380 return router_propagation_result 2381 else: 2382 RNS.log("No valid LXM could be ingested from the provided URI", RNS.LOG_DEBUG) 2383 return False 2384 2385 except Exception as e: 2386 RNS.log("Error while decoding URI-encoded LXMF message. The contained exception was: "+str(e), RNS.LOG_ERROR) 2387 return False 2388 2389 def fail_message(self, lxmessage): 2390 RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) 2391 2392 lxmessage.progress = 0.0 2393 if lxmessage in self.pending_outbound: 2394 self.pending_outbound.remove(lxmessage) 2395 2396 self.failed_outbound.append(lxmessage) 2397 2398 if lxmessage.state != LXMessage.REJECTED: 2399 lxmessage.state = LXMessage.FAILED 2400 2401 if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): 2402 lxmessage.failed_callback(lxmessage) 2403 2404 def process_deferred_stamps(self): 2405 if len(self.pending_deferred_stamps) > 0: 2406 2407 if self.stamp_gen_lock.locked(): 2408 return 2409 2410 else: 2411 with self.stamp_gen_lock: 2412 selected_lxm = None 2413 selected_message_id = None 2414 for message_id in self.pending_deferred_stamps: 2415 lxmessage = self.pending_deferred_stamps[message_id] 2416 if selected_lxm == None: 2417 selected_lxm = lxmessage 2418 selected_message_id = message_id 2419 2420 if selected_lxm != None: 2421 if selected_lxm.state == LXMessage.CANCELLED: 2422 RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) 2423 selected_lxm.stamp_generation_failed = True 2424 self.pending_deferred_stamps.pop(selected_message_id) 2425 if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): 2426 selected_lxm.failed_callback(lxmessage) 2427 2428 return 2429 2430 if selected_lxm.defer_stamp: 2431 if selected_lxm.stamp == None: stamp_generation_success = False 2432 else: stamp_generation_success = True 2433 else: stamp_generation_success = True 2434 2435 if selected_lxm.desired_method == LXMessage.PROPAGATED: 2436 if selected_lxm.propagation_stamp == None: propagation_stamp_generation_success = False 2437 else: propagation_stamp_generation_success = True 2438 else: propagation_stamp_generation_success = True 2439 2440 if stamp_generation_success == False: 2441 RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) 2442 generated_stamp = selected_lxm.get_stamp() 2443 if generated_stamp: 2444 selected_lxm.stamp = generated_stamp 2445 selected_lxm.defer_stamp = False 2446 selected_lxm.packed = None 2447 selected_lxm.pack(payload_updated=True) 2448 stamp_generation_success = True 2449 RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) 2450 else: 2451 if selected_lxm.state == LXMessage.CANCELLED: 2452 RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) 2453 selected_lxm.stamp_generation_failed = True 2454 self.pending_deferred_stamps.pop(selected_message_id) 2455 if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): 2456 selected_lxm.failed_callback(lxmessage) 2457 else: 2458 RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) 2459 selected_lxm.stamp_generation_failed = True 2460 self.pending_deferred_stamps.pop(selected_message_id) 2461 self.fail_message(selected_lxm) 2462 2463 if propagation_stamp_generation_success == False: 2464 RNS.log(f"Starting propagation stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) 2465 pn_target_cost = self.get_outbound_propagation_cost() 2466 if pn_target_cost == None: 2467 RNS.log("Failed to get propagation node stamp cost, cannot generate propagation stamp", RNS.LOG_ERROR) 2468 selected_lxm.stamp_generation_failed = True 2469 self.pending_deferred_stamps.pop(selected_message_id) 2470 self.fail_message(selected_lxm) 2471 2472 else: 2473 propagation_stamp = selected_lxm.get_propagation_stamp(target_cost=pn_target_cost) 2474 if propagation_stamp: 2475 selected_lxm.propagation_stamp = propagation_stamp 2476 selected_lxm.defer_propagation_stamp = False 2477 selected_lxm.packed = None 2478 selected_lxm.pack() 2479 propagation_stamp_generation_success = True 2480 RNS.log(f"Propagation stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) 2481 else: 2482 if selected_lxm.state == LXMessage.CANCELLED: 2483 RNS.log(f"Message cancelled during deferred propagation stamp generation for {selected_lxm}.", RNS.LOG_DEBUG) 2484 selected_lxm.stamp_generation_failed = True 2485 self.pending_deferred_stamps.pop(selected_message_id) 2486 if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): 2487 selected_lxm.failed_callback(lxmessage) 2488 else: 2489 RNS.log(f"Deferred propagation stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) 2490 selected_lxm.stamp_generation_failed = True 2491 self.pending_deferred_stamps.pop(selected_message_id) 2492 self.fail_message(selected_lxm) 2493 2494 if stamp_generation_success and propagation_stamp_generation_success: 2495 self.pending_deferred_stamps.pop(selected_message_id) 2496 self.pending_outbound.append(selected_lxm) 2497 2498 def propagation_transfer_signalling_packet(self, data, packet): 2499 try: 2500 unpacked = msgpack.unpackb(data) 2501 if type(unpacked) == list and len(unpacked) >= 1: 2502 signal = unpacked[0] 2503 if signal == LXMPeer.ERROR_INVALID_STAMP: 2504 RNS.log("Message rejected by propagation node", RNS.LOG_ERROR) 2505 if hasattr(packet, "link") and hasattr(packet.link, "for_lxmessage"): 2506 lxm = packet.link.for_lxmessage 2507 RNS.log(f"Invalid propagation stamp on {lxm}", RNS.LOG_ERROR) 2508 self.cancel_outbound(lxm.message_id, cancel_state=LXMessage.REJECTED) 2509 2510 except Exception as e: 2511 RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR) 2512 2513 def process_outbound(self, sender = None): 2514 if self.outbound_processing_lock.locked(): return 2515 with self.outbound_processing_lock: 2516 for lxmessage in self.pending_outbound: 2517 if lxmessage.state == LXMessage.DELIVERED: 2518 RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) 2519 self.pending_outbound.remove(lxmessage) 2520 2521 # Udate ticket delivery stats 2522 if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: 2523 RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) 2524 self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() 2525 self.save_available_tickets() 2526 2527 # Prepare link for backchannel communications 2528 delivery_destination_hash = lxmessage.get_destination().hash 2529 if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links: 2530 direct_link = self.direct_links[delivery_destination_hash] 2531 if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False: 2532 if direct_link.initiator == True: 2533 source_destination_hash = lxmessage.get_source().hash 2534 if source_destination_hash in self.delivery_destinations: 2535 backchannel_identity = self.delivery_destinations[source_destination_hash].identity 2536 backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity) 2537 direct_link.identify(backchannel_identity) 2538 direct_link.backchannel_identified = True 2539 self.delivery_link_established(direct_link) 2540 RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG) 2541 2542 elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: 2543 RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) 2544 self.pending_outbound.remove(lxmessage) 2545 2546 elif lxmessage.state == LXMessage.CANCELLED: 2547 RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) 2548 self.pending_outbound.remove(lxmessage) 2549 if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): 2550 lxmessage.failed_callback(lxmessage) 2551 2552 elif lxmessage.state == LXMessage.REJECTED: 2553 RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) 2554 if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage) 2555 if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): 2556 lxmessage.failed_callback(lxmessage) 2557 2558 else: 2559 RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2560 2561 if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 2562 2563 # Outbound handling for opportunistic messages 2564 if lxmessage.method == LXMessage.OPPORTUNISTIC: 2565 if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: 2566 if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash): 2567 RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG) 2568 lxmessage.delivery_attempts += 1 2569 RNS.Transport.request_path(lxmessage.get_destination().hash) 2570 lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT 2571 lxmessage.progress = 0.01 2572 elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash): 2573 RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG) 2574 lxmessage.delivery_attempts += 1 2575 RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash) 2576 def rediscover_job(): 2577 time.sleep(0.5) 2578 RNS.Transport.request_path(lxmessage.get_destination().hash) 2579 threading.Thread(target=rediscover_job, daemon=True).start() 2580 lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT 2581 lxmessage.progress = 0.01 2582 else: 2583 if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: 2584 lxmessage.delivery_attempts += 1 2585 lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT 2586 RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2587 lxmessage.send() 2588 else: 2589 RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2590 self.fail_message(lxmessage) 2591 2592 # Outbound handling for messages transferred 2593 # over a direct link to the final recipient 2594 elif lxmessage.method == LXMessage.DIRECT: 2595 if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: 2596 delivery_destination_hash = lxmessage.get_destination().hash 2597 direct_link = None 2598 2599 if delivery_destination_hash in self.direct_links: 2600 # An established direct link already exists to 2601 # the destination, so we'll try to use it for 2602 # delivering the message 2603 direct_link = self.direct_links[delivery_destination_hash] 2604 RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) 2605 2606 elif delivery_destination_hash in self.backchannel_links: 2607 # An established backchannel link exists to 2608 # the destination, so we'll try to use it for 2609 # delivering the message 2610 direct_link = self.backchannel_links[delivery_destination_hash] 2611 RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG) 2612 2613 if direct_link != None: 2614 if direct_link.status == RNS.Link.ACTIVE: 2615 if lxmessage.progress == None or lxmessage.progress < 0.05: 2616 lxmessage.progress = 0.05 2617 if lxmessage.state != LXMessage.SENDING: 2618 RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG) 2619 lxmessage.set_delivery_destination(direct_link) 2620 lxmessage.send() 2621 else: 2622 if lxmessage.representation == LXMessage.RESOURCE: 2623 RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) 2624 else: 2625 RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) 2626 elif direct_link.status == RNS.Link.CLOSED: 2627 if direct_link.activated_at != None: 2628 RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG) 2629 RNS.Transport.request_path(lxmessage.get_destination().hash) 2630 else: 2631 if not hasattr(lxmessage, "path_request_retried"): 2632 RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG) 2633 RNS.Transport.request_path(lxmessage.get_destination().hash) 2634 lxmessage.path_request_retried = True 2635 else: 2636 RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG) 2637 2638 lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT 2639 2640 lxmessage.set_delivery_destination(None) 2641 if delivery_destination_hash in self.direct_links: 2642 self.direct_links.pop(delivery_destination_hash) 2643 if delivery_destination_hash in self.backchannel_links: 2644 self.backchannel_links.pop(delivery_destination_hash) 2645 lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT 2646 else: 2647 # Simply wait for the link to become active or close 2648 RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) 2649 else: 2650 # No link exists, so we'll try to establish one, but 2651 # only if we've never tried before, or the retry wait 2652 # period has elapsed. 2653 if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: 2654 lxmessage.delivery_attempts += 1 2655 lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT 2656 2657 if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: 2658 if RNS.Transport.has_path(lxmessage.get_destination().hash): 2659 RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2660 delivery_link = RNS.Link(lxmessage.get_destination()) 2661 delivery_link.set_link_established_callback(self.process_outbound) 2662 self.direct_links[delivery_destination_hash] = delivery_link 2663 lxmessage.progress = 0.03 2664 else: 2665 RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG) 2666 RNS.Transport.request_path(lxmessage.get_destination().hash) 2667 lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT 2668 lxmessage.progress = 0.01 2669 else: 2670 RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2671 self.fail_message(lxmessage) 2672 2673 # Outbound handling for messages transported via 2674 # propagation to a LXMF router network. 2675 elif lxmessage.method == LXMessage.PROPAGATED: 2676 RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2677 2678 if self.outbound_propagation_node == None: 2679 RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR) 2680 self.fail_message(lxmessage) 2681 else: 2682 if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: 2683 2684 if self.outbound_propagation_link != None: 2685 # A link already exists, so we'll try to use it 2686 # to deliver the message 2687 if self.outbound_propagation_link.status == RNS.Link.ACTIVE: 2688 if lxmessage.state != LXMessage.SENDING: 2689 RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) 2690 lxmessage.set_delivery_destination(self.outbound_propagation_link) 2691 lxmessage.send() 2692 else: 2693 if lxmessage.representation == LXMessage.RESOURCE: 2694 RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) 2695 else: 2696 RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) 2697 elif self.outbound_propagation_link.status == RNS.Link.CLOSED: 2698 RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) 2699 self.outbound_propagation_link = None 2700 lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT 2701 else: 2702 # Simply wait for the link to become 2703 # active or close 2704 RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) 2705 else: 2706 # No link exists, so we'll try to establish one, but 2707 # only if we've never tried before, or the retry wait 2708 # period has elapsed. 2709 if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: 2710 lxmessage.delivery_attempts += 1 2711 lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT 2712 2713 if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: 2714 if RNS.Transport.has_path(self.outbound_propagation_node): 2715 RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2716 propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) 2717 propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") 2718 self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) 2719 self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet) 2720 self.outbound_propagation_link.for_lxmessage = lxmessage 2721 else: 2722 RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) 2723 RNS.Transport.request_path(self.outbound_propagation_node) 2724 lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT 2725 2726 else: 2727 RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) 2728 self.fail_message(lxmessage)