/ LXMF / LXMRouter.py
LXMRouter.py
   1  import os
   2  import sys
   3  import time
   4  import math
   5  import random
   6  import base64
   7  import atexit
   8  import signal
   9  import threading
  10  
  11  from collections import deque
  12  
  13  import RNS
  14  import RNS.vendor.umsgpack as msgpack
  15  
  16  from .LXMF import APP_NAME
  17  from .LXMF import FIELD_TICKET
  18  from .LXMF import PN_META_NAME
  19  from .LXMF import pn_announce_data_is_valid
  20  
  21  from .LXMPeer import LXMPeer
  22  from .LXMessage import LXMessage
  23  from .Handlers import LXMFDeliveryAnnounceHandler
  24  from .Handlers import LXMFPropagationAnnounceHandler
  25  
  26  import LXMF.LXStamper as LXStamper
  27  
  28  class LXMRouter:
  29      MAX_DELIVERY_ATTEMPTS = 5
  30      PROCESSING_INTERVAL   = 4
  31      DELIVERY_RETRY_WAIT   = 10
  32      PATH_REQUEST_WAIT     = 7
  33      MAX_PATHLESS_TRIES    = 1
  34      LINK_MAX_INACTIVITY   = 10*60
  35      P_LINK_MAX_INACTIVITY = 3*60
  36  
  37      MESSAGE_EXPIRY        = 30*24*60*60
  38      STAMP_COST_EXPIRY     = 45*24*60*60
  39  
  40      NODE_ANNOUNCE_DELAY   = 20
  41  
  42      MAX_PEERS             = 20
  43      AUTOPEER              = True
  44      AUTOPEER_MAXDEPTH     = 4
  45      FASTEST_N_RANDOM_POOL = 2
  46      ROTATION_HEADROOM_PCT = 10
  47      ROTATION_AR_MAX       = 0.5
  48  
  49      PEERING_COST          = 18
  50      MAX_PEERING_COST      = 26
  51      PROPAGATION_COST_MIN  = 13
  52      PROPAGATION_COST_FLEX = 3
  53      PROPAGATION_COST      = 16
  54      PROPAGATION_LIMIT     = 256
  55      SYNC_LIMIT            = PROPAGATION_LIMIT*40
  56      DELIVERY_LIMIT        = 1000
  57  
  58      PR_PATH_TIMEOUT       = 10
  59      PN_STAMP_THROTTLE     = 180
  60  
  61      PR_IDLE               = 0x00
  62      PR_PATH_REQUESTED     = 0x01
  63      PR_LINK_ESTABLISHING  = 0x02
  64      PR_LINK_ESTABLISHED   = 0x03
  65      PR_REQUEST_SENT       = 0x04
  66      PR_RECEIVING          = 0x05
  67      PR_RESPONSE_RECEIVED  = 0x06
  68      PR_COMPLETE           = 0x07
  69      PR_NO_PATH            = 0xf0
  70      PR_LINK_FAILED        = 0xf1
  71      PR_TRANSFER_FAILED    = 0xf2
  72      PR_NO_IDENTITY_RCVD   = 0xf3
  73      PR_NO_ACCESS          = 0xf4
  74      PR_FAILED             = 0xfe
  75  
  76      PR_ALL_MESSAGES       = 0x00
  77  
  78      DUPLICATE_SIGNAL      = "lxmf_duplicate"
  79  
  80      STATS_GET_PATH        = "/pn/get/stats"
  81      SYNC_REQUEST_PATH     = "/pn/peer/sync"
  82      UNPEER_REQUEST_PATH   = "/pn/peer/unpeer"
  83  
  84  
  85      ### Developer-facing API ##############################
  86      #######################################################
  87  
  88      def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
  89                   propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT,
  90                   enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None,
  91                   from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT,
  92                   propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX,
  93                   peering_cost=PEERING_COST, max_peering_cost=MAX_PEERING_COST, name=None):
  94  
  95          random.seed(os.urandom(10))
  96  
  97          self.pending_inbound       = []
  98          self.pending_outbound      = []
  99          self.failed_outbound       = []
 100          self.direct_links          = {}
 101          self.backchannel_links     = {}
 102          self.delivery_destinations = {}
 103  
 104          self.prioritised_list      = []
 105          self.ignored_list          = []
 106          self.allowed_list          = []
 107          self.control_allowed_list  = []
 108          self.auth_required         = False
 109          self.retain_synced_on_node = False
 110  
 111          self.default_sync_strategy = sync_strategy
 112          self.processing_inbound    = False
 113          self.processing_count      = 0
 114          self.name                  = name
 115  
 116          self.propagation_node = False
 117          self.propagation_node_start_time = None        
 118  
 119          if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path")
 120          else:
 121              self.storagepath = storagepath+"/lxmf"
 122              self.ratchetpath = self.storagepath+"/ratchets"
 123  
 124          self.outbound_propagation_node = None
 125          self.outbound_propagation_link = None
 126  
 127          if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT
 128          if propagation_cost < LXMRouter.PROPAGATION_COST_MIN: propagation_cost = LXMRouter.PROPAGATION_COST_MIN
 129  
 130          self.message_storage_limit              = None
 131          self.information_storage_limit          = None
 132          self.propagation_per_transfer_limit     = propagation_limit
 133          self.propagation_per_sync_limit         = sync_limit
 134          self.delivery_per_transfer_limit        = delivery_limit
 135          self.propagation_stamp_cost             = propagation_cost
 136          self.propagation_stamp_cost_flexibility = propagation_cost_flexibility
 137          self.peering_cost                       = peering_cost
 138          self.max_peering_cost                   = max_peering_cost
 139          self.enforce_ratchets                   = enforce_ratchets
 140          self._enforce_stamps                    = enforce_stamps
 141          self.pending_deferred_stamps            = {}
 142          self.throttled_peers                    = {}
 143  
 144          if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit:
 145              self.propagation_per_sync_limit = self.propagation_per_transfer_limit
 146  
 147          self.wants_download_on_path_available_from = None
 148          self.wants_download_on_path_available_to = None
 149          self.propagation_transfer_state = LXMRouter.PR_IDLE
 150          self.propagation_transfer_progress = 0.0
 151          self.propagation_transfer_last_result = None
 152          self.propagation_transfer_last_duplicates = None
 153          self.propagation_transfer_max_messages = None
 154          self.prioritise_rotating_unreachable_peers = False
 155          self.active_propagation_links = []
 156          self.validated_peer_links = {}
 157          self.locally_delivered_transient_ids = {}
 158          self.locally_processed_transient_ids = {}
 159          self.outbound_stamp_costs = {}
 160          self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
 161  
 162          self.outbound_processing_lock = threading.Lock()
 163          self.cost_file_lock = threading.Lock()
 164          self.ticket_file_lock = threading.Lock()
 165          self.stamp_gen_lock = threading.Lock()
 166          self.exit_handler_running = False
 167  
 168          if identity == None:
 169              identity = RNS.Identity()
 170  
 171          self.identity = identity
 172          self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
 173          self.propagation_destination.set_default_app_data(self.get_propagation_node_app_data)
 174          self.control_destination = None
 175          self.client_propagation_messages_received = 0
 176          self.client_propagation_messages_served = 0
 177          self.unpeered_propagation_incoming = 0
 178          self.unpeered_propagation_rx_bytes = 0
 179  
 180          if autopeer != None: self.autopeer = autopeer
 181          else:                self.autopeer = LXMRouter.AUTOPEER
 182  
 183          if autopeer_maxdepth != None: self.autopeer_maxdepth = autopeer_maxdepth
 184          else:                         self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH
 185  
 186          if max_peers == None: self.max_peers = LXMRouter.MAX_PEERS
 187          else:
 188              if type(max_peers) == int and max_peers >= 0: self.max_peers = max_peers
 189              else: raise ValueError(f"Invalid value for max_peers: {max_peers}")
 190  
 191          self.from_static_only = from_static_only
 192          if type(static_peers) != list: raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}")
 193          else:
 194              for static_peer in static_peers:
 195                  if type(static_peer) != bytes: raise ValueError(f"Invalid static peer destination hash: {static_peer}")
 196                  else:
 197                      if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Invalid static peer destination hash: {static_peer}")
 198  
 199              self.static_peers = static_peers
 200  
 201          self.peers = {}
 202          self.propagation_entries = {}
 203  
 204          self.peer_distribution_queue = deque()
 205  
 206          RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self))
 207          RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self))
 208  
 209          self.__delivery_callback = None
 210  
 211          try:
 212              if os.path.isfile(self.storagepath+"/local_deliveries"):
 213                  locally_delivered_file = open(self.storagepath+"/local_deliveries", "rb")
 214                  data = locally_delivered_file.read()
 215                  locally_delivered_file.close()
 216                  self.locally_delivered_transient_ids = msgpack.unpackb(data)
 217                  if not type(self.locally_delivered_transient_ids) == dict:
 218                      RNS.log("Invalid data format for loaded locally delivered transient IDs, recreating...", RNS.LOG_ERROR)
 219                      self.locally_delivered_transient_ids = {}
 220  
 221          except Exception as e:
 222              RNS.log(f"Could not load locally delivered message ID cache from storage. The contained exception was: {e}", RNS.LOG_ERROR)
 223              self.locally_delivered_transient_ids = {}
 224  
 225          try:
 226              if os.path.isfile(self.storagepath+"/locally_processed"):
 227                  locally_processed_file = open(self.storagepath+"/locally_processed", "rb")
 228                  data = locally_processed_file.read()
 229                  locally_processed_file.close()
 230                  self.locally_processed_transient_ids = msgpack.unpackb(data)
 231                  if not type(self.locally_processed_transient_ids) == dict:
 232                      RNS.log("Invalid data format for loaded locally processed transient IDs, recreating...", RNS.LOG_ERROR)
 233                      self.locally_processed_transient_ids = {}
 234  
 235  
 236          except Exception as e:
 237              RNS.log("Could not load locally processed message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
 238              self.locally_processed_transient_ids = {}
 239  
 240          try:
 241              self.clean_transient_id_caches()
 242  
 243          except Exception as e:
 244              RNS.log("Could not clean transient ID caches. The contained exception was : "+str(e), RNS.LOG_ERROR)
 245              self.locally_delivered_transient_ids = {}
 246              self.locally_processed_transient_ids = {}
 247  
 248          try:
 249              if os.path.isfile(self.storagepath+"/outbound_stamp_costs"):
 250                  with self.cost_file_lock:
 251                      with open(self.storagepath+"/outbound_stamp_costs", "rb") as outbound_stamp_cost_file:
 252                          data = outbound_stamp_cost_file.read()
 253                          self.outbound_stamp_costs = msgpack.unpackb(data)
 254                          if not type(self.outbound_stamp_costs) == dict:
 255                              RNS.log("Invalid data format for loaded outbound stamp costs, recreating...", RNS.LOG_ERROR)
 256                              self.outbound_stamp_costs = {}
 257  
 258                  self.clean_outbound_stamp_costs()
 259                  self.save_outbound_stamp_costs()
 260  
 261          except Exception as e:
 262              RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
 263  
 264          try:
 265              if os.path.isfile(self.storagepath+"/available_tickets"):
 266                  with self.ticket_file_lock:
 267                      with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file:
 268                          data = available_tickets_file.read()
 269                          self.available_tickets = msgpack.unpackb(data)
 270                          if not type(self.available_tickets) == dict:
 271                              RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR)
 272                              self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
 273                          if not "outbound" in self.available_tickets:
 274                              RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
 275                              self.available_tickets["outbound"] = {}
 276                          if not "inbound" in self.available_tickets:
 277                              RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
 278                              self.available_tickets["inbound"] = {}
 279                          if not "last_deliveries" in self.available_tickets:
 280                              RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
 281                              self.available_tickets["last_deliveries"] = {}
 282  
 283                  self.clean_available_tickets()
 284                  self.save_available_tickets()
 285  
 286          except Exception as e:
 287              RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
 288  
 289          atexit.register(self.exit_handler)
 290          signal.signal(signal.SIGINT, self.sigint_handler)
 291          signal.signal(signal.SIGTERM, self.sigterm_handler)
 292  
 293          job_thread = threading.Thread(target=self.jobloop)
 294          job_thread.setDaemon(True)
 295          job_thread.start()
 296  
 297      def announce(self, destination_hash, attached_interface=None):
 298          if destination_hash in self.delivery_destinations:
 299              self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface)
 300  
 301      def get_propagation_node_announce_metadata(self):
 302          metadata = {}
 303          if self.name: metadata[PN_META_NAME] = str(self.name).encode("utf-8")
 304          return metadata
 305  
 306      def get_propagation_node_app_data(self):
 307          metadata      = self.get_propagation_node_announce_metadata()
 308          node_state    = self.propagation_node and not self.from_static_only
 309          stamp_cost    = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost]
 310          announce_data = [ False,                                  # 0: Legacy LXMF PN support
 311                            int(time.time()),                       # 1: Current node timebase
 312                            node_state,                             # 2: Boolean flag signalling propagation node state
 313                            self.propagation_per_transfer_limit,    # 3: Per-transfer limit for message propagation in kilobytes
 314                            self.propagation_per_sync_limit,        # 4: Limit for incoming propagation node syncs
 315                            stamp_cost,                             # 5: Propagation stamp cost for this node
 316                            metadata ]                              # 6: Node metadata
 317  
 318          return msgpack.packb(announce_data)
 319  
 320      def announce_propagation_node(self):
 321          def delayed_announce():
 322              time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
 323              self.propagation_destination.announce(app_data=self.get_propagation_node_app_data())
 324              if len(self.control_allowed_list) > 1: self.control_destination.announce()
 325  
 326          da_thread = threading.Thread(target=delayed_announce)
 327          da_thread.setDaemon(True)
 328          da_thread.start()
 329  
 330      def register_delivery_identity(self, identity, display_name = None, stamp_cost = None):
 331          if len(self.delivery_destinations) != 0:
 332              RNS.log("Currently only one delivery identity is supported per LXMF router instance", RNS.LOG_ERROR)
 333              return None
 334  
 335          if not os.path.isdir(self.ratchetpath):
 336              os.makedirs(self.ratchetpath)
 337  
 338          delivery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "delivery")
 339          delivery_destination.enable_ratchets(f"{self.ratchetpath}/{RNS.hexrep(delivery_destination.hash, delimit=False)}.ratchets")
 340          delivery_destination.set_packet_callback(self.delivery_packet)
 341          delivery_destination.set_link_established_callback(self.delivery_link_established)
 342          delivery_destination.display_name = display_name
 343  
 344          if self.enforce_ratchets:
 345              delivery_destination.enforce_ratchets()
 346  
 347          if display_name != None:
 348              def get_app_data():
 349                  return self.get_announce_app_data(delivery_destination.hash)
 350              delivery_destination.set_default_app_data(get_app_data)
 351  
 352          self.delivery_destinations[delivery_destination.hash] = delivery_destination
 353          self.set_inbound_stamp_cost(delivery_destination.hash, stamp_cost)
 354  
 355          return delivery_destination
 356  
 357      def register_delivery_callback(self, callback):
 358          self.__delivery_callback = callback
 359  
 360      def set_inbound_stamp_cost(self, destination_hash, stamp_cost):
 361          if destination_hash in self.delivery_destinations:
 362              delivery_destination = self.delivery_destinations[destination_hash]
 363              if stamp_cost == None:
 364                  delivery_destination.stamp_cost = None
 365                  return True
 366              elif type(stamp_cost) == int:
 367                  if stamp_cost < 1:
 368                      delivery_destination.stamp_cost = None
 369                  elif stamp_cost < 255:
 370                      delivery_destination.stamp_cost = stamp_cost
 371                  else:
 372                      return False
 373      
 374                  return True
 375  
 376          return False
 377  
 378      def get_outbound_stamp_cost(self, destination_hash):
 379          if destination_hash in self.outbound_stamp_costs:
 380              stamp_cost = self.outbound_stamp_costs[destination_hash][1]
 381              return stamp_cost
 382          else:
 383              return None
 384  
 385      def set_active_propagation_node(self, destination_hash):
 386          self.set_outbound_propagation_node(destination_hash)
 387          # self.set_inbound_propagation_node(destination_hash)
 388  
 389      def set_outbound_propagation_node(self, destination_hash):
 390          if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8 or type(destination_hash) != bytes:
 391              raise ValueError("Invalid destination hash for outbound propagation node")
 392          else:
 393              if self.outbound_propagation_node != destination_hash:
 394                  self.outbound_propagation_node = destination_hash
 395                  if self.outbound_propagation_link != None:
 396                      if self.outbound_propagation_link.destination.hash != destination_hash:
 397                          self.outbound_propagation_link.teardown()
 398                          self.outbound_propagation_link = None
 399  
 400      def get_outbound_propagation_node(self):
 401          return self.outbound_propagation_node
 402  
 403      def get_outbound_propagation_cost(self):
 404          target_propagation_cost = None
 405          pn_destination_hash = self.get_outbound_propagation_node()
 406          pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash)
 407          if pn_announce_data_is_valid(pn_app_data):
 408              pn_config = msgpack.unpackb(pn_app_data)
 409              target_propagation_cost = pn_config[5][0]
 410  
 411          if not target_propagation_cost:
 412              RNS.log(f"Could not retrieve cached propagation node config. Requesting path to propagation node to get target propagation cost...", RNS.LOG_DEBUG)
 413              RNS.Transport.request_path(pn_destination_hash)
 414              timeout = time.time() + LXMRouter.PATH_REQUEST_WAIT
 415              while not RNS.Identity.recall_app_data(pn_destination_hash) and time.time() < timeout:
 416                  time.sleep(0.5)
 417  
 418              pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash)
 419              if pn_announce_data_is_valid(pn_app_data):
 420                  pn_config = msgpack.unpackb(pn_app_data)
 421                  target_propagation_cost = pn_config[5][0]
 422  
 423          if not target_propagation_cost: RNS.log("Propagation node stamp cost still unavailable after path request", RNS.LOG_ERROR)
 424          return target_propagation_cost
 425  
 426      def set_inbound_propagation_node(self, destination_hash):
 427          # TODO: Implement
 428          raise NotImplementedError("Inbound/outbound propagation node differentiation is currently not implemented")
 429  
 430      def get_inbound_propagation_node(self):
 431          return self.get_outbound_propagation_node()
 432  
 433      def set_retain_node_lxms(self, retain):
 434          if retain == True:
 435              self.retain_synced_on_node = True
 436          else:
 437              self.retain_synced_on_node = False
 438  
 439      def set_authentication(self, required=None):
 440          if required != None:
 441              self.auth_required = required
 442  
 443      def requires_authentication(self):
 444          return self.auth_required
 445  
 446      def allow(self, identity_hash=None):
 447          if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8:
 448              if not identity_hash in self.allowed_list:
 449                  self.allowed_list.append(identity_hash)
 450          else:
 451              raise ValueError("Allowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes")
 452  
 453      def disallow(self, identity_hash=None):
 454          if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8:
 455              if identity_hash in self.allowed_list:
 456                  self.allowed_list.pop(identity_hash)
 457          else:
 458              raise ValueError("Disallowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes")
 459  
 460      def allow_control(self, identity_hash=None):
 461          if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8:
 462              if not identity_hash in self.control_allowed_list: self.control_allowed_list.append(identity_hash)
 463          else: raise ValueError("Allowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes")
 464  
 465      def disallow_control(self, identity_hash=None):
 466          if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8:
 467              if identity_hash in self.control_allowed_list: self.control_allowed_list.pop(identity_hash)
 468          else: raise ValueError("Disallowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes")
 469  
 470      def prioritise(self, destination_hash=None):
 471          if isinstance(destination_hash, bytes) and len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
 472              if not destination_hash in self.prioritised_list:
 473                  self.prioritised_list.append(destination_hash)
 474          else:
 475              raise ValueError("Prioritised destination hash must be "+str(RNS.Reticulum.TRUNCATED_HASHLENGTH//8)+" bytes")
 476  
 477      def unprioritise(self, identity_hash=None):
 478          if isinstance(destination_hash, bytes) and len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
 479              if destination_hash in self.prioritised_list:
 480                  self.prioritised_list.pop(destination_hash)
 481          else:
 482              raise ValueError("Prioritised destination hash must be "+str(RNS.Reticulum.TRUNCATED_HASHLENGTH//8)+" bytes")
 483  
 484      def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES):
 485          if max_messages == None:
 486              max_messages = LXMRouter.PR_ALL_MESSAGES
 487  
 488          self.propagation_transfer_progress = 0.0
 489          self.propagation_transfer_max_messages = max_messages
 490          if self.outbound_propagation_node != None:
 491              if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE:
 492                  self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED
 493                  RNS.log("Requesting message list from propagation node", RNS.LOG_DEBUG)
 494                  self.outbound_propagation_link.identify(identity)
 495                  self.outbound_propagation_link.request(
 496                      LXMPeer.MESSAGE_GET_PATH,
 497                      [None, None], # Set both want and have fields to None to get message list
 498                      response_callback=self.message_list_response,
 499                      failed_callback=self.message_get_failed
 500                  )
 501                  self.propagation_transfer_state = LXMRouter.PR_REQUEST_SENT
 502              else:
 503                  if self.outbound_propagation_link == None:
 504                      if RNS.Transport.has_path(self.outbound_propagation_node):
 505                          self.wants_download_on_path_available_from = None
 506                          self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHING
 507                          RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for message download", RNS.LOG_DEBUG)
 508                          propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
 509                          propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
 510                          def msg_request_established_callback(link):
 511                              self.request_messages_from_propagation_node(identity, self.propagation_transfer_max_messages)
 512  
 513                          self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=msg_request_established_callback)
 514                      else:
 515                          RNS.log("No path known for message download from propagation node "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
 516                          RNS.Transport.request_path(self.outbound_propagation_node)
 517                          self.wants_download_on_path_available_from = self.outbound_propagation_node
 518                          self.wants_download_on_path_available_to = identity
 519                          self.wants_download_on_path_available_timeout = time.time() + LXMRouter.PR_PATH_TIMEOUT
 520                          self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED
 521                          self.request_messages_path_job()
 522                  else:
 523                      RNS.log("Waiting for propagation node link to become active", RNS.LOG_EXTREME)
 524          else:
 525              RNS.log("Cannot request LXMF propagation node sync, no default propagation node configured", RNS.LOG_WARNING)
 526  
 527      def cancel_propagation_node_requests(self):
 528          if self.outbound_propagation_link != None:
 529              self.outbound_propagation_link.teardown()
 530              self.outbound_propagation_link = None
 531  
 532          self.acknowledge_sync_completion(reset_state=True)
 533  
 534      def enable_propagation(self):
 535          try:
 536              self.messagepath = self.storagepath+"/messagestore"
 537  
 538              if not os.path.isdir(self.storagepath):
 539                  os.makedirs(self.storagepath)
 540  
 541              if not os.path.isdir(self.messagepath):
 542                  os.makedirs(self.messagepath)
 543  
 544              self.propagation_entries = {}
 545  
 546              st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE)
 547              for filename in os.listdir(self.messagepath):
 548                  components = filename.split("_")
 549                  if len(components) >= 3:
 550                      if float(components[1]) > 0:
 551                          if len(components[0]) == RNS.Identity.HASHLENGTH//8*2:
 552                              try:
 553                                  transient_id       = bytes.fromhex(components[0])
 554                                  received           = float(components[1])
 555                                  stamp_value        = int(components[2])
 556                                  filepath           = self.messagepath+"/"+filename
 557                                  msg_size           = os.path.getsize(filepath)
 558                                  file               = open(filepath, "rb")
 559                                  destination_hash   = file.read(LXMessage.DESTINATION_LENGTH)
 560                                  file.close()
 561  
 562                                  self.propagation_entries[transient_id] = [
 563                                      destination_hash,   # 0: Destination hash
 564                                      filepath,           # 1: Storage location
 565                                      received,           # 2: Receive timestamp
 566                                      msg_size,           # 3: Message size
 567                                      [],                 # 4: Handled peers
 568                                      [],                 # 5: Unhandled peers
 569                                      stamp_value,        # 6: Stamp value
 570                                  ]
 571  
 572                              except Exception as e:
 573                                  RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
 574              
 575              et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st))
 576              RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE)
 577              RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
 578              st = time.time()
 579  
 580              peers_storage_path = self.storagepath+"/peers"
 581              if os.path.isfile(peers_storage_path):
 582                  peers_file = open(peers_storage_path, "rb")
 583                  peers_data = peers_file.read()
 584                  peers_file.close()
 585  
 586                  if len(peers_data) > 0:
 587                      try: serialised_peers = msgpack.unpackb(peers_data)
 588                      except Exception as e:
 589                          RNS.log(f"Could not load propagation node peering data from storage. The contained exception was: {e}", RNS.LOG_ERROR)
 590                          RNS.log(f"The peering data file located at {peers_storage_path} is likely corrupt.", RNS.LOG_ERROR)
 591                          RNS.log(f"You can delete this file and attempt starting the propagation node again, but peer synchronization states will be lost.", RNS.LOG_ERROR)
 592                          raise e
 593  
 594                      del peers_data
 595  
 596                      while len(serialised_peers) > 0:
 597                          serialised_peer = serialised_peers.pop()
 598                          peer = LXMPeer.from_bytes(serialised_peer, self)
 599                          del serialised_peer
 600                          if peer.destination_hash in self.static_peers and peer.last_heard == 0:
 601                              RNS.Transport.request_path(peer.destination_hash)
 602                          if peer.identity != None:
 603                              self.peers[peer.destination_hash] = peer
 604                              lim_str = ", no transfer limit"
 605                              if peer.propagation_transfer_limit != None:
 606                                  lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
 607                              RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
 608                          else:
 609                              RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
 610                              del peer
 611  
 612                      del serialised_peers
 613  
 614              if len(self.static_peers) > 0:
 615                  for static_peer in self.static_peers:
 616                      if not static_peer in self.peers:
 617                          RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE)
 618                          self.peers[static_peer] = LXMPeer(self, static_peer, sync_strategy=self.default_sync_strategy)
 619                          if self.peers[static_peer].last_heard == 0:
 620                              # TODO: Allow path request responses through announce handler
 621                              # momentarily here, so peering config can be updated even if
 622                              # the static peer is not available to directly send an announce.
 623                              RNS.Transport.request_path(static_peer)
 624  
 625              RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
 626  
 627              try:
 628                  if os.path.isfile(self.storagepath+"/node_stats"):
 629                      node_stats_file = open(self.storagepath+"/node_stats", "rb")
 630                      data = node_stats_file.read()
 631                      node_stats_file.close()
 632                      node_stats = msgpack.unpackb(data)
 633  
 634                      if not type(node_stats) == dict:
 635                          RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR)
 636                      else:
 637                          self.client_propagation_messages_received = node_stats["client_propagation_messages_received"]
 638                          self.client_propagation_messages_served = node_stats["client_propagation_messages_served"]
 639                          self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"]
 640                          self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"]
 641  
 642              except Exception as e:
 643                  RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR)
 644  
 645              self.propagation_node = True
 646              self.propagation_node_start_time = time.time()
 647              self.propagation_destination.set_link_established_callback(self.propagation_link_established)
 648              self.propagation_destination.set_packet_callback(self.propagation_packet)
 649  
 650              self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
 651              self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
 652  
 653              self.control_allowed_list = [self.identity.hash]
 654              self.control_destination  = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
 655              self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list)
 656              self.control_destination.register_request_handler(LXMRouter.SYNC_REQUEST_PATH, self.peer_sync_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list)
 657              self.control_destination.register_request_handler(LXMRouter.UNPEER_REQUEST_PATH, self.peer_unpeer_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.control_allowed_list)
 658  
 659              if self.message_storage_limit != None:
 660                  limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
 661              else:
 662                  limit_str = ""
 663  
 664              RNS.log("LXMF Propagation Node message store size is "+RNS.prettysize(self.message_storage_size())+limit_str, RNS.LOG_DEBUG)
 665  
 666              self.announce_propagation_node()
 667  
 668          except Exception as e:
 669              RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR)
 670              raise e
 671              RNS.panic()
 672  
 673      def disable_propagation(self):
 674          self.propagation_node = False
 675          self.announce_propagation_node()
 676  
 677      def enforce_stamps(self):
 678          self._enforce_stamps = True
 679  
 680      def ignore_stamps(self):
 681          self._enforce_stamps = False
 682  
 683      def ignore_destination(self, destination_hash):
 684          if not destination_hash in self.ignored_list:
 685              self.ignored_list.append(destination_hash)
 686  
 687      def unignore_destination(self, destination_hash):
 688          if destination_hash in self.ignored_list:
 689              self.ignored_list.remove(destination_hash)
 690  
 691      def set_message_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None):
 692          limit_bytes = 0
 693  
 694          if kilobytes != None:
 695              limit_bytes += kilobytes*1000
 696  
 697          if megabytes != None:
 698              limit_bytes += megabytes*1000*1000
 699  
 700          if gigabytes != None:
 701              limit_bytes += gigabytes*1000*1000*1000
 702  
 703          if limit_bytes == 0:
 704              limit_bytes = None
 705  
 706          try:
 707              if limit_bytes == None or int(limit_bytes) > 0:
 708                  self.message_storage_limit = int(limit_bytes)
 709              else:
 710                  raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
 711          
 712          except Exception as e:
 713              raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
 714  
 715      def message_storage_limit(self):
 716          return self.message_storage_limit
 717  
 718      def message_storage_size(self):
 719          if self.propagation_node:
 720              return sum(self.propagation_entries[f][3] for f in self.propagation_entries)
 721          else:
 722              return None
 723  
 724      def set_information_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None):
 725          limit_bytes = 0
 726          if kilobytes != None: limit_bytes += kilobytes*1000
 727          if megabytes != None: limit_bytes += megabytes*1000*1000
 728          if gigabytes != None: limit_bytes += gigabytes*1000*1000*1000
 729          if limit_bytes == 0:  limit_bytes = None
 730  
 731          try:
 732              if limit_bytes == None or int(limit_bytes) > 0: self.information_storage_limit = int(limit_bytes)
 733              else: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
 734          except Exception as e: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
 735  
 736      def information_storage_limit(self):
 737          return self.information_storage_limit
 738  
 739      def information_storage_size(self):
 740          pass
 741  
 742      def delivery_link_available(self, destination_hash):
 743          if destination_hash in self.direct_links or destination_hash in self.backchannel_links: return True
 744          else: return False
 745  
 746  
 747      ### Propagation Node Control ##########################
 748      #######################################################
 749  
 750      def compile_stats(self):
 751          if not self.propagation_node: return None
 752          else:
 753              peer_stats = {}
 754              for peer_id in self.peers.copy():
 755                  peer = self.peers[peer_id]
 756                  peer_stats[peer_id] = {
 757                      "type": "static" if peer_id in self.static_peers else "discovered",
 758                      "state": peer.state,
 759                      "alive": peer.alive,
 760                      "name":  peer.name,
 761                      "last_heard": int(peer.last_heard),
 762                      "next_sync_attempt": peer.next_sync_attempt,
 763                      "last_sync_attempt": peer.last_sync_attempt,
 764                      "sync_backoff": peer.sync_backoff,
 765                      "peering_timebase": peer.peering_timebase,
 766                      "ler": int(peer.link_establishment_rate),
 767                      "str": int(peer.sync_transfer_rate),
 768                      "transfer_limit": peer.propagation_transfer_limit,
 769                      "sync_limit": peer.propagation_sync_limit,
 770                      "target_stamp_cost": peer.propagation_stamp_cost,
 771                      "stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility,
 772                      "peering_cost": peer.peering_cost,
 773                      "peering_key": peer.peering_key_value(),
 774                      "network_distance": RNS.Transport.hops_to(peer_id),
 775                      "rx_bytes": peer.rx_bytes,
 776                      "tx_bytes": peer.tx_bytes,
 777                      "acceptance_rate": peer.acceptance_rate,
 778                      "messages": {
 779                          "offered": peer.offered,
 780                          "outgoing": peer.outgoing,
 781                          "incoming": peer.incoming,
 782                          "unhandled": peer.unhandled_message_count
 783                      },
 784                  }
 785  
 786              node_stats = {
 787                  "identity_hash": self.identity.hash,
 788                  "destination_hash": self.propagation_destination.hash,
 789                  "uptime": time.time()-self.propagation_node_start_time,
 790                  "delivery_limit": self.delivery_per_transfer_limit,
 791                  "propagation_limit": self.propagation_per_transfer_limit,
 792                  "sync_limit": self.propagation_per_sync_limit,
 793                  "target_stamp_cost": self.propagation_stamp_cost,
 794                  "stamp_cost_flexibility": self.propagation_stamp_cost_flexibility,
 795                  "peering_cost": self.peering_cost,
 796                  "max_peering_cost": self.max_peering_cost,
 797                  "autopeer_maxdepth": self.autopeer_maxdepth,
 798                  "from_static_only": self.from_static_only,
 799                  "messagestore": {
 800                      "count": len(self.propagation_entries),
 801                      "bytes": self.message_storage_size(),
 802                      "limit": self.message_storage_limit,
 803                  },
 804                  "clients" : {
 805                      "client_propagation_messages_received": self.client_propagation_messages_received,
 806                      "client_propagation_messages_served": self.client_propagation_messages_served,
 807                  },
 808                  "unpeered_propagation_incoming": self.unpeered_propagation_incoming,
 809                  "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
 810                  "static_peers": len(self.static_peers),
 811                  "discovered_peers": len(self.peers)-len(self.static_peers),
 812                  "total_peers": len(self.peers),
 813                  "max_peers": self.max_peers,
 814                  "peers": peer_stats,
 815              }
 816  
 817              return node_stats
 818  
 819      def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
 820          if   remote_identity == None:                               return LXMPeer.ERROR_NO_IDENTITY
 821          elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS
 822          else:                                                       return self.compile_stats()
 823  
 824      def peer_sync_request(self, path, data, request_id, remote_identity, requested_at):
 825          if   remote_identity == None:                               return LXMPeer.ERROR_NO_IDENTITY
 826          elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS
 827          else:
 828              if type(data)  != bytes:                                return LXMPeer.ERROR_INVALID_DATA
 829              elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA
 830              else:
 831                  if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND
 832                  else:
 833                      self.peers[data].sync()
 834                      return True
 835  
 836      def peer_unpeer_request(self, path, data, request_id, remote_identity, requested_at):
 837          if   remote_identity == None:                               return LXMPeer.ERROR_NO_IDENTITY
 838          elif remote_identity.hash not in self.control_allowed_list: return LXMPeer.ERROR_NO_ACCESS
 839          else:
 840              if type(data)  != bytes:                                return LXMPeer.ERROR_INVALID_DATA
 841              elif len(data) != RNS.Identity.TRUNCATED_HASHLENGTH//8: return LXMPeer.ERROR_INVALID_DATA
 842              else:
 843                  if not data in self.peers: return LXMPeer.ERROR_NOT_FOUND
 844                  else:
 845                      self.unpeer(data)
 846                      return True
 847  
 848  
 849      ### Utility & Maintenance #############################
 850      #######################################################
 851  
 852      JOB_OUTBOUND_INTERVAL  = 1
 853      JOB_STAMPS_INTERVAL    = 1
 854      JOB_LINKS_INTERVAL     = 1
 855      JOB_TRANSIENT_INTERVAL = 60
 856      JOB_STORE_INTERVAL     = 120
 857      JOB_PEERSYNC_INTERVAL  = 6
 858      JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL
 859      JOB_ROTATE_INTERVAL    = 56*JOB_PEERINGEST_INTERVAL
 860      def jobs(self):
 861          if not self.exit_handler_running:
 862              self.processing_count += 1
 863  
 864              if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
 865                  self.process_outbound()
 866  
 867              if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0:
 868                  threading.Thread(target=self.process_deferred_stamps, daemon=True).start()
 869  
 870              if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
 871                  self.clean_links()
 872  
 873              if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
 874                  self.clean_transient_id_caches()
 875  
 876              if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
 877                  if self.propagation_node == True: self.clean_message_store()
 878  
 879              if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
 880                  if self.propagation_node == True: self.flush_queues()
 881  
 882              if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0:
 883                  if self.propagation_node == True: self.rotate_peers()
 884  
 885              if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
 886                  if self.propagation_node == True: self.sync_peers()
 887                  self.clean_throttled_peers()
 888  
 889      def jobloop(self):
 890          while (True):
 891              # TODO: Improve this to scheduling, so manual
 892              # triggers can delay next run
 893              try:
 894                  self.jobs()
 895              except Exception as e:
 896                  RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR)
 897                  RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
 898                  RNS.trace_exception(e)
 899              time.sleep(LXMRouter.PROCESSING_INTERVAL)
 900  
 901      def flush_queues(self):
 902          if len(self.peers) > 0:
 903              self.flush_peer_distribution_queue()
 904              RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time()
 905              for peer_id in self.peers.copy():
 906                  if peer_id in self.peers:
 907                      peer = self.peers[peer_id]
 908                      if peer.queued_items():
 909                          peer.process_queues()
 910  
 911              RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
 912  
 913      def clean_links(self):
 914          closed_links = []
 915          for link_hash in self.direct_links:
 916              link = self.direct_links[link_hash]
 917              inactive_time = link.no_data_for()
 918  
 919              if inactive_time > LXMRouter.LINK_MAX_INACTIVITY:
 920                  link.teardown()
 921                  closed_links.append(link_hash)
 922                  if link.link_id in self.validated_peer_links:
 923                      self.validated_peer_links.pop(link.link_id)
 924  
 925          for link_hash in closed_links:
 926              cleaned_link = self.direct_links.pop(link_hash)
 927              RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG)
 928  
 929          try:
 930              inactive_links = []
 931              for link in self.active_propagation_links:
 932                  if link.no_data_for() > LXMRouter.P_LINK_MAX_INACTIVITY:
 933                      inactive_links.append(link)
 934  
 935              for link in inactive_links:
 936                  self.active_propagation_links.remove(link)
 937                  link.teardown()
 938          
 939          except Exception as e:
 940              RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR)
 941  
 942          if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED:
 943              self.outbound_propagation_link = None
 944              if self.propagation_transfer_state == LXMRouter.PR_COMPLETE:
 945                  self.acknowledge_sync_completion()
 946              elif self.propagation_transfer_state < LXMRouter.PR_LINK_ESTABLISHED:
 947                  self.acknowledge_sync_completion(failure_state=LXMRouter.PR_LINK_FAILED)
 948              elif self.propagation_transfer_state >= LXMRouter.PR_LINK_ESTABLISHED and self.propagation_transfer_state < LXMRouter.PR_COMPLETE:
 949                  self.acknowledge_sync_completion(failure_state=LXMRouter.PR_TRANSFER_FAILED)
 950              else:
 951                  RNS.log(f"Unknown propagation transfer state on link cleaning: {self.propagation_transfer_state}", RNS.LOG_DEBUG)
 952                  self.acknowledge_sync_completion()
 953  
 954              RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG)
 955  
 956      def clean_transient_id_caches(self):
 957          now = time.time()
 958          removed_entries = []
 959          for transient_id in self.locally_delivered_transient_ids:
 960              timestamp = self.locally_delivered_transient_ids[transient_id]
 961              if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0:
 962                  removed_entries.append(transient_id)
 963  
 964          for transient_id in removed_entries:
 965              self.locally_delivered_transient_ids.pop(transient_id)
 966              RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG)
 967  
 968          removed_entries = []
 969          for transient_id in self.locally_processed_transient_ids:
 970              timestamp = self.locally_processed_transient_ids[transient_id]
 971              if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0:
 972                  removed_entries.append(transient_id)
 973  
 974          for transient_id in removed_entries:
 975              self.locally_processed_transient_ids.pop(transient_id)
 976              RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG)
 977  
 978      def update_stamp_cost(self, destination_hash, stamp_cost):
 979          RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG)
 980          self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost]
 981          
 982          def job(): self.save_outbound_stamp_costs()
 983          threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
 984  
 985      def get_announce_app_data(self, destination_hash):
 986          if destination_hash in self.delivery_destinations:
 987              delivery_destination = self.delivery_destinations[destination_hash]
 988              
 989              display_name = None
 990              if delivery_destination.display_name != None:
 991                  display_name = delivery_destination.display_name.encode("utf-8")
 992  
 993              stamp_cost = None
 994              if delivery_destination.stamp_cost != None and type(delivery_destination.stamp_cost) == int:
 995                  if delivery_destination.stamp_cost > 0 and delivery_destination.stamp_cost < 255:
 996                      stamp_cost = delivery_destination.stamp_cost
 997  
 998              peer_data = [display_name, stamp_cost]
 999  
1000              return msgpack.packb(peer_data)
1001  
1002      def get_size(self, transient_id):
1003          lxm_size = self.propagation_entries[transient_id][3]
1004          return lxm_size
1005  
1006      def get_weight(self, transient_id):
1007          dst_hash   = self.propagation_entries[transient_id][0]
1008          lxm_rcvd   = self.propagation_entries[transient_id][2]
1009          lxm_size   = self.propagation_entries[transient_id][3]
1010  
1011          now        = time.time()
1012          age_weight = max(1, (now - lxm_rcvd)/60/60/24/4)
1013  
1014          if dst_hash in self.prioritised_list: priority_weight = 0.1
1015          else:                                 priority_weight = 1.0
1016          
1017          return priority_weight * age_weight * lxm_size
1018  
1019      def get_stamp_value(self, transient_id):
1020          if not transient_id in self.propagation_entries: return None
1021          else:                                            return self.propagation_entries[transient_id][6]
1022  
1023      def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY):
1024          now = time.time()
1025          ticket = None
1026          if destination_hash in self.available_tickets["last_deliveries"]:
1027              last_delivery = self.available_tickets["last_deliveries"][destination_hash]
1028              elapsed = now - last_delivery
1029              if elapsed < LXMessage.TICKET_INTERVAL:
1030                  RNS.log(f"A ticket for {RNS.prettyhexrep(destination_hash)} was already delivered {RNS.prettytime(elapsed)} ago, not including another ticket yet", RNS.LOG_DEBUG)
1031                  return None
1032  
1033          if destination_hash in self.available_tickets["inbound"]:
1034              for ticket in self.available_tickets["inbound"][destination_hash]:
1035                  ticket_entry = self.available_tickets["inbound"][destination_hash][ticket]
1036                  expires = ticket_entry[0]; validity_left = expires - now
1037                  if validity_left > LXMessage.TICKET_RENEW:
1038                      RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG)
1039                      return [expires, ticket]
1040          
1041          else:
1042              self.available_tickets["inbound"][destination_hash] = {}
1043  
1044          RNS.log(f"No generated tickets for {RNS.prettyhexrep(destination_hash)} with enough validity found, generating a new one", RNS.LOG_DEBUG)
1045          expires = now+expiry
1046          ticket  = os.urandom(LXMessage.TICKET_LENGTH)
1047          self.available_tickets["inbound"][destination_hash][ticket] = [expires]
1048          self.save_available_tickets()
1049  
1050          return [expires, ticket]
1051  
1052      def remember_ticket(self, destination_hash, ticket_entry):
1053          expires = ticket_entry[0]-time.time()
1054          RNS.log(f"Remembering ticket for {RNS.prettyhexrep(destination_hash)}, expires in {RNS.prettytime(expires)}", RNS.LOG_DEBUG)
1055          self.available_tickets["outbound"][destination_hash] = [ticket_entry[0], ticket_entry[1]]
1056  
1057      def get_outbound_ticket(self, destination_hash):
1058          if destination_hash in self.available_tickets["outbound"]:
1059              entry = self.available_tickets["outbound"][destination_hash]
1060              if entry[0] > time.time():
1061                  return entry[1]
1062  
1063          return None
1064  
1065      def get_outbound_ticket_expiry(self, destination_hash):
1066          if destination_hash in self.available_tickets["outbound"]:
1067              entry = self.available_tickets["outbound"][destination_hash]
1068              if entry[0] > time.time():
1069                  return entry[0]
1070  
1071          return None
1072  
1073      def get_inbound_tickets(self, destination_hash):
1074          now = time.time()
1075          available_tickets = []
1076          if destination_hash in self.available_tickets["inbound"]:
1077              for inbound_ticket in self.available_tickets["inbound"][destination_hash]:
1078                  if now < self.available_tickets["inbound"][destination_hash][inbound_ticket][0]:
1079                      available_tickets.append(inbound_ticket)
1080  
1081          if len(available_tickets) == 0:
1082              return None
1083          else:
1084              return available_tickets
1085  
1086      def clean_throttled_peers(self):
1087          expired_entries = []
1088          now = time.time()
1089          for peer_hash in self.throttled_peers:
1090              if now > self.throttled_peers[peer_hash]: expired_entries.append(peer_hash)
1091  
1092          for peer_hash in expired_entries: self.throttled_peers.pop(peer_hash)
1093  
1094      def clean_message_store(self):
1095          RNS.log("Cleaning message store", RNS.LOG_VERBOSE)
1096          # Check and remove expired messages
1097          now = time.time()
1098          removed_entries = {}
1099          for transient_id in self.propagation_entries.copy():
1100              entry       = self.propagation_entries[transient_id]
1101              filepath    = entry[1]
1102              stamp_value = entry[6]
1103              filename    = os.path.split(filepath)[-1]
1104              components  = filename.split("_")
1105  
1106              if len(components) == 3 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2 and int(components[2]) == stamp_value:
1107                  timestamp = float(components[1])
1108                  if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
1109                      RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME)
1110                      removed_entries[transient_id] = filepath
1111              else:
1112                  RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
1113                  removed_entries[transient_id] = filepath
1114          
1115          removed_count = 0
1116          for transient_id in removed_entries:
1117              try:
1118                  filepath = removed_entries[transient_id]
1119                  self.propagation_entries.pop(transient_id)
1120                  if os.path.isfile(filepath):
1121                          os.unlink(filepath)
1122                  removed_count += 1
1123              except Exception as e:
1124                  RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
1125  
1126          if removed_count > 0:
1127              RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE)
1128  
1129          # Check size of message store and cull if needed
1130          try:
1131              message_storage_size = self.message_storage_size()
1132              if message_storage_size != None:
1133                  if self.message_storage_limit != None and message_storage_size > self.message_storage_limit:
1134                      # Clean the message storage according to priorities
1135                      bytes_needed = message_storage_size - self.message_storage_limit
1136                      bytes_cleaned = 0
1137  
1138                      weighted_entries = []
1139                      for transient_id in self.propagation_entries.copy():
1140                          weighted_entries.append([
1141                              self.propagation_entries[transient_id],
1142                              self.get_weight(transient_id),
1143                              transient_id
1144                          ])
1145  
1146                      weighted_entries.sort(key=lambda we: we[1], reverse=True)
1147  
1148                      i = 0
1149                      while i < len(weighted_entries) and bytes_cleaned < bytes_needed:
1150                          try:
1151                              w = weighted_entries[i]
1152                              entry = w[0]
1153                              transient_id = w[2]
1154                              filepath = entry[1]
1155  
1156                              if os.path.isfile(filepath):
1157                                  os.unlink(filepath)
1158                              
1159                              self.propagation_entries.pop(transient_id)
1160                              bytes_cleaned += entry[3]
1161  
1162                              RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME)
1163      
1164                          except Exception as e:
1165                              RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
1166      
1167                          finally:
1168                              i += 1
1169  
1170                      RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size())+" for "+str(len(self.propagation_entries))+" items", RNS.LOG_EXTREME)
1171  
1172  
1173          except Exception as e:
1174              RNS.log("Could not clean the LXMF message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
1175  
1176      def save_locally_delivered_transient_ids(self):
1177          try:
1178              if len(self.locally_delivered_transient_ids) > 0:
1179                  if not os.path.isdir(self.storagepath):
1180                      os.makedirs(self.storagepath)
1181  
1182                  with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file:
1183                      locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
1184  
1185          except Exception as e:
1186              RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
1187  
1188      def save_locally_processed_transient_ids(self):
1189          try:
1190              if len(self.locally_processed_transient_ids) > 0:
1191                  if not os.path.isdir(self.storagepath):
1192                      os.makedirs(self.storagepath)
1193  
1194                  with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file:
1195                      locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
1196  
1197          except Exception as e:
1198              RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
1199  
1200      def save_node_stats(self):
1201          try:
1202              if not os.path.isdir(self.storagepath):
1203                  os.makedirs(self.storagepath)
1204  
1205              with open(self.storagepath+"/node_stats", "wb") as stats_file:
1206                  node_stats = {
1207                      "client_propagation_messages_received": self.client_propagation_messages_received,
1208                      "client_propagation_messages_served": self.client_propagation_messages_served,
1209                      "unpeered_propagation_incoming": self.unpeered_propagation_incoming,
1210                      "unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
1211                  }
1212                  stats_file.write(msgpack.packb(node_stats))
1213  
1214          except Exception as e:
1215              RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
1216          
1217  
1218      def clean_outbound_stamp_costs(self):
1219          try:
1220              expired = []
1221              for destination_hash in self.outbound_stamp_costs:
1222                  entry = self.outbound_stamp_costs[destination_hash]
1223                  if time.time() > entry[0] + LXMRouter.STAMP_COST_EXPIRY:
1224                      expired.append(destination_hash)
1225  
1226              for destination_hash in expired:
1227                  self.outbound_stamp_costs.pop(destination_hash)
1228          
1229          except Exception as e:
1230              RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR)
1231              RNS.trace_exception(e)
1232  
1233      def save_outbound_stamp_costs(self):
1234          with self.cost_file_lock:
1235              try:
1236                  if not os.path.isdir(self.storagepath):
1237                          os.makedirs(self.storagepath)
1238  
1239                  outbound_stamp_costs_file = open(self.storagepath+"/outbound_stamp_costs", "wb")
1240                  outbound_stamp_costs_file.write(msgpack.packb(self.outbound_stamp_costs.copy()))
1241                  outbound_stamp_costs_file.close()
1242  
1243              except Exception as e:
1244                  RNS.log("Could not save outbound stamp costs to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
1245  
1246      def clean_available_tickets(self):
1247          try:
1248              # Clean outbound tickets
1249              expired_outbound = []
1250              for destination_hash in self.available_tickets["outbound"]:
1251                  entry = self.available_tickets["outbound"][destination_hash]
1252                  if time.time() > entry[0]:
1253                      expired_outbound.append(destination_hash)
1254  
1255              for destination_hash in expired_outbound:
1256                  self.available_tickets["outbound"].pop(destination_hash)
1257  
1258              # Clean inbound tickets
1259              for destination_hash in self.available_tickets["inbound"]:
1260                  expired_inbound = []
1261                  for inbound_ticket in self.available_tickets["inbound"][destination_hash]:
1262                      entry = self.available_tickets["inbound"][destination_hash][inbound_ticket]
1263                      ticket_expiry = entry[0]
1264                      if time.time() > ticket_expiry+LXMessage.TICKET_GRACE:
1265                          expired_inbound.append(inbound_ticket)
1266  
1267                  for inbound_ticket in expired_inbound:
1268                      self.available_tickets["inbound"][destination_hash].pop(inbound_ticket)
1269          
1270          except Exception as e:
1271              RNS.log(f"Error while cleaning available tickets. The contained exception was: {e}", RNS.LOG_ERROR)
1272              RNS.trace_exception(e)
1273  
1274      def save_available_tickets(self):
1275          with self.ticket_file_lock:
1276              try:
1277                  if not os.path.isdir(self.storagepath):
1278                          os.makedirs(self.storagepath)
1279  
1280                  available_tickets_file = open(self.storagepath+"/available_tickets", "wb")
1281                  available_tickets_file.write(msgpack.packb(self.available_tickets))
1282                  available_tickets_file.close()
1283  
1284              except Exception as e:
1285                  RNS.log("Could not save available tickets to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
1286  
1287      def reload_available_tickets(self):
1288          RNS.log("Reloading available tickets from storage", RNS.LOG_DEBUG)
1289          try:
1290              with self.ticket_file_lock:
1291                  with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file:
1292                      data = available_tickets_file.read()
1293                      self.available_tickets = msgpack.unpackb(data)
1294                      if not type(self.available_tickets) == dict:
1295                          RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR)
1296                          self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
1297                      if not "outbound" in self.available_tickets:
1298                          RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
1299                          self.available_tickets["outbound"] = {}
1300                      if not "inbound" in self.available_tickets:
1301                          RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
1302                          self.available_tickets["inbound"] = {}
1303                      if not "last_deliveries" in self.available_tickets:
1304                          RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
1305                          self.available_tickets["last_deliveries"] = {}
1306          
1307          except Exception as e:
1308              RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)
1309  
1310      def exit_handler(self):
1311          if self.exit_handler_running:
1312              return
1313  
1314          self.exit_handler_running = True
1315  
1316          RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE)
1317          for destination_hash in self.delivery_destinations:
1318              delivery_destination = self.delivery_destinations[destination_hash]
1319              delivery_destination.set_packet_callback(None)
1320              delivery_destination.set_link_established_callback(None)
1321              for link in delivery_destination.links:
1322                  try:
1323                      if link.status == RNS.Link.ACTIVE:
1324                          link.teardown()
1325                  except Exception as e:
1326                      RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
1327  
1328          if self.propagation_node:
1329              RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE)
1330              self.propagation_destination.set_link_established_callback(None)
1331              self.propagation_destination.set_packet_callback(None)
1332              self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH)
1333              self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH)
1334              self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH)
1335              self.propagation_destination.deregister_request_handler(LXMRouter.SYNC_REQUEST_PATH)
1336              self.propagation_destination.deregister_request_handler(LXMRouter.UNPEER_REQUEST_PATH)
1337              for link in self.active_propagation_links:
1338                  try:
1339                      if link.status == RNS.Link.ACTIVE:
1340                          link.teardown()
1341                  except Exception as e:
1342                      RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
1343  
1344          RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE)
1345          self.flush_queues()
1346          if self.propagation_node:
1347              try:
1348                  st = time.time(); RNS.log(f"Saving {len(self.peers)} peer synchronisation states to storage...", RNS.LOG_NOTICE)
1349                  serialised_peers = []
1350                  peer_dict = self.peers.copy()
1351                  for peer_id in peer_dict:
1352                      peer = self.peers[peer_id]
1353                      serialised_peers.append(peer.to_bytes())
1354  
1355                  peers_file = open(self.storagepath+"/peers", "wb")
1356                  peers_file.write(msgpack.packb(serialised_peers))
1357                  peers_file.close()
1358  
1359                  RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_NOTICE)
1360  
1361              except Exception as e:
1362                  RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
1363  
1364          self.save_locally_delivered_transient_ids()
1365          self.save_locally_processed_transient_ids()
1366          self.save_node_stats()
1367  
1368      def sigint_handler(self, signal, frame):
1369          if threading.current_thread() != threading.main_thread():
1370              RNS.log(f"SIGINT on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING)
1371              os._exit(0)
1372          else:
1373              if not self.exit_handler_running:
1374                  RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
1375                  self.exit_handler()
1376                  RNS.exit(0)
1377              else:
1378                  RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
1379  
1380      def sigterm_handler(self, signal, frame):
1381          if threading.current_thread() != threading.main_thread():
1382              RNS.log(f"SIGTERM on non-main thread {threading.current_thread()}, exiting immediately", RNS.LOG_WARNING)
1383              os._exit(0)
1384          else:
1385              if not self.exit_handler_running:
1386                  RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
1387                  self.exit_handler()
1388                  RNS.exit(0)
1389              else:
1390                  RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
1391  
1392      def __str__(self):
1393          return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
1394  
1395  
1396      ### Message Download ##################################
1397      #######################################################
1398      
1399      def request_messages_path_job(self):
1400          job_thread = threading.Thread(target=self.__request_messages_path_job)
1401          job_thread.setDaemon(True)
1402          job_thread.start()
1403  
1404      def __request_messages_path_job(self):
1405          path_timeout = self.wants_download_on_path_available_timeout
1406          while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < path_timeout:
1407              time.sleep(0.1)
1408  
1409          if RNS.Transport.has_path(self.wants_download_on_path_available_from):
1410              self.request_messages_from_propagation_node(self.wants_download_on_path_available_to, self.propagation_transfer_max_messages)
1411          else:
1412              RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG)
1413              self.acknowledge_sync_completion(failure_state=LXMRouter.PR_NO_PATH)
1414      
1415      def identity_allowed(self, identity):
1416          if self.auth_required:
1417              if identity.hash in self.allowed_list:
1418                  return True
1419              else:
1420                  return False
1421          
1422          else:
1423              return True
1424  
1425      def message_get_request(self, path, data, request_id, remote_identity, requested_at):
1426          if remote_identity == None:                      return LXMPeer.ERROR_NO_IDENTITY
1427          elif not self.identity_allowed(remote_identity): return LXMPeer.ERROR_NO_ACCESS
1428          else:
1429              try:
1430                  remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery")
1431  
1432                  # If both want and have fields are empty, send a list of
1433                  # available messages.
1434                  if data[0] == None and data[1] == None:
1435                      available_messages = []
1436                      for transient_id in self.propagation_entries:
1437                          message_entry = self.propagation_entries[transient_id]
1438                          if message_entry[0] == remote_destination.hash:
1439                              message_size = os.path.getsize(message_entry[1])
1440                              available_entry = [transient_id, message_size]
1441                              available_messages.append(available_entry)
1442  
1443                      available_messages.sort(key=lambda e: e[1], reverse=False)
1444  
1445                      transient_ids = []
1446                      for available_entry in available_messages: transient_ids.append(available_entry[0])
1447                      return transient_ids
1448  
1449                  else:
1450                      # Process messages the client already have
1451                      if data[1] != None and len(data[1]) > 0:
1452                          for transient_id in data[1]:
1453                              if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
1454                                  try:
1455                                      filepath = self.propagation_entries[transient_id][1]
1456                                      self.propagation_entries.pop(transient_id)
1457                                      os.unlink(filepath)
1458                                      # TODO: Remove debug
1459                                      # RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
1460                                  
1461                                  except Exception as e:
1462                                      RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1463  
1464  
1465                      # Process wanted messages
1466                      response_messages = []
1467                      if data[0] != None and len(data[0]) > 0:
1468                          client_transfer_limit = None
1469                          if len(data) >= 3:
1470                              try:
1471                                  client_transfer_limit = float(data[2])*1000
1472                                  RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG)
1473                              except: pass
1474  
1475                          per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
1476                          cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
1477                          for transient_id in data[0]:
1478                              if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
1479                                  try:
1480                                      filepath = self.propagation_entries[transient_id][1]
1481                                      RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" requested message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
1482  
1483                                      message_file = open(filepath, "rb")
1484                                      lxmf_data = message_file.read()
1485                                      message_file.close()
1486  
1487                                      lxm_size = len(lxmf_data)
1488                                      next_size = cumulative_size + (lxm_size+per_message_overhead)
1489  
1490                                      if client_transfer_limit != None and next_size > client_transfer_limit: pass
1491                                      else:
1492                                          response_messages.append(lxmf_data[:-LXStamper.STAMP_SIZE])
1493                                          cumulative_size += (lxm_size+per_message_overhead)
1494  
1495                                  except Exception as e:
1496                                      RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1497  
1498                      self.client_propagation_messages_served += len(response_messages)
1499                      return response_messages
1500  
1501              except Exception as e:
1502                  RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
1503                  return None
1504  
1505      def message_list_response(self, request_receipt):
1506          if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
1507              RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG)
1508              if self.outbound_propagation_link != None:
1509                  self.outbound_propagation_link.teardown()
1510              self.propagation_transfer_state = LXMRouter.PR_NO_IDENTITY_RCVD
1511  
1512          elif request_receipt.response == LXMPeer.ERROR_NO_ACCESS:
1513              RNS.log("Propagation node did not allow list request, tearing down link.", RNS.LOG_DEBUG)
1514              if self.outbound_propagation_link != None:
1515                  self.outbound_propagation_link.teardown()
1516              self.propagation_transfer_state = LXMRouter.PR_NO_ACCESS
1517  
1518          else:
1519              if request_receipt.response != None and isinstance(request_receipt.response, list):
1520                  haves = []
1521                  wants = []
1522                  if len(request_receipt.response) > 0:
1523                      for transient_id in request_receipt.response:
1524                          if self.has_message(transient_id):
1525                              if not self.retain_synced_on_node:
1526                                  haves.append(transient_id)
1527                          else:
1528                              if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages:
1529                                  wants.append(transient_id)
1530  
1531                      ms = "" if len(wants) == 1 else "s"
1532                      RNS.log(f"Requesting {len(wants)} message{ms} from propagation node", RNS.LOG_DEBUG)
1533                      request_receipt.link.request(
1534                          LXMPeer.MESSAGE_GET_PATH,
1535                          [wants, haves, self.delivery_per_transfer_limit],
1536                          response_callback=self.message_get_response,
1537                          failed_callback=self.message_get_failed,
1538                          progress_callback=self.message_get_progress)
1539  
1540                  else:
1541                      self.propagation_transfer_state = LXMRouter.PR_COMPLETE
1542                      self.propagation_transfer_progress = 1.0
1543                      self.propagation_transfer_last_result = 0
1544  
1545              else:
1546                  RNS.log("Invalid message list data received from propagation node", RNS.LOG_DEBUG)
1547                  if self.outbound_propagation_link != None:
1548                      self.outbound_propagation_link.teardown()
1549  
1550      def message_get_response(self, request_receipt):
1551          if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
1552              RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG)
1553              if self.outbound_propagation_link != None:
1554                  self.outbound_propagation_link.teardown()
1555              self.propagation_transfer_state = LXMRouter.PR_NO_IDENTITY_RCVD
1556  
1557          elif request_receipt.response == LXMPeer.ERROR_NO_ACCESS:
1558              RNS.log("Propagation node did not allow get request, tearing down link.", RNS.LOG_DEBUG)
1559              if self.outbound_propagation_link != None:
1560                  self.outbound_propagation_link.teardown()
1561              self.propagation_transfer_state = LXMRouter.PR_NO_ACCESS
1562  
1563          else:
1564              duplicates = 0
1565              if request_receipt.response != None and len(request_receipt.response) > 0:
1566                  haves = []
1567                  for lxmf_data in request_receipt.response:
1568                      result = self.lxmf_propagation(lxmf_data, signal_duplicate=LXMRouter.DUPLICATE_SIGNAL)
1569                      if result == LXMRouter.DUPLICATE_SIGNAL: duplicates += 1
1570                      haves.append(RNS.Identity.full_hash(lxmf_data))
1571  
1572                  # Return a list of successfully received messages to the node.
1573                  # This deletes the messages on the propagation node.
1574                  # TODO: Add option to keep messages on node.
1575                  request_receipt.link.request(
1576                      LXMPeer.MESSAGE_GET_PATH,
1577                      [None, haves],
1578                      # response_callback=self.message_syncfinal_response,
1579                      failed_callback=self.message_get_failed,
1580                      # progress_callback=self.message_get_progress
1581                  )
1582  
1583              self.propagation_transfer_state = LXMRouter.PR_COMPLETE
1584              self.propagation_transfer_progress = 1.0
1585              self.propagation_transfer_last_duplicates = duplicates
1586              self.propagation_transfer_last_result = len(request_receipt.response)
1587              self.save_locally_delivered_transient_ids()
1588  
1589      def message_get_progress(self, request_receipt):
1590          self.propagation_transfer_state = LXMRouter.PR_RECEIVING
1591          self.propagation_transfer_progress = request_receipt.get_progress()
1592  
1593      def message_get_failed(self, request_receipt):
1594          RNS.log("Message list/get request failed", RNS.LOG_DEBUG)
1595          if self.outbound_propagation_link != None:
1596              self.outbound_propagation_link.teardown()
1597  
1598      def acknowledge_sync_completion(self, reset_state=False, failure_state=None):
1599          self.propagation_transfer_last_result = None
1600          if reset_state or self.propagation_transfer_state <= LXMRouter.PR_COMPLETE:
1601              if failure_state == None:
1602                  self.propagation_transfer_state = LXMRouter.PR_IDLE
1603              else:
1604                  self.propagation_transfer_state = failure_state
1605  
1606          self.propagation_transfer_progress = 0.0
1607          self.wants_download_on_path_available_from = None
1608          self.wants_download_on_path_available_to = None
1609  
1610      def has_message(self, transient_id):
1611          if transient_id in self.locally_delivered_transient_ids:
1612              return True
1613          else:
1614              return False
1615      
1616      def cancel_outbound(self, message_id, cancel_state=LXMessage.CANCELLED):
1617          try:
1618              if message_id in self.pending_deferred_stamps:
1619                  lxm = self.pending_deferred_stamps[message_id]
1620                  RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG)
1621                  lxm.state = cancel_state
1622                  LXStamper.cancel_work(message_id)
1623  
1624              lxmessage = None
1625              for lxm in self.pending_outbound:
1626                  if lxm.message_id == message_id:
1627                      lxmessage = lxm
1628  
1629              if lxmessage != None:
1630                  lxmessage.state = cancel_state
1631                  if lxmessage in self.pending_outbound:
1632                      RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG)
1633                      if lxmessage.representation == LXMessage.RESOURCE:
1634                          if lxmessage.resource_representation != None:
1635                              lxmessage.resource_representation.cancel()
1636  
1637                      self.process_outbound()
1638  
1639          except Exception as e:
1640              RNS.log(f"An error occurred while cancelling {lxmessage}: {e}", RNS.LOG_ERROR)
1641              RNS.trace_exception(e)
1642  
1643      def handle_outbound(self, lxmessage):
1644          destination_hash = lxmessage.get_destination().hash
1645  
1646          if lxmessage.stamp_cost == None:
1647              if destination_hash in self.outbound_stamp_costs:
1648                  stamp_cost = self.outbound_stamp_costs[destination_hash][1]
1649                  lxmessage.stamp_cost = stamp_cost
1650                  RNS.log(f"No stamp cost set on LXM to {RNS.prettyhexrep(destination_hash)}, autoconfigured to {stamp_cost}, as required by latest announce", RNS.LOG_DEBUG)
1651  
1652          lxmessage.state = LXMessage.OUTBOUND
1653  
1654          # If an outbound ticket is available for this
1655          # destination, attach it to the message.
1656          lxmessage.outbound_ticket = self.get_outbound_ticket(destination_hash)
1657          if lxmessage.outbound_ticket != None and lxmessage.defer_stamp:
1658              RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but outbound ticket was applied, processing immediately", RNS.LOG_DEBUG)
1659              lxmessage.defer_stamp = False
1660  
1661          # If requested, include a ticket to allow the
1662          # destination to reply without generating a stamp.
1663          if lxmessage.include_ticket:
1664              ticket = self.generate_ticket(lxmessage.destination_hash)
1665              if ticket: lxmessage.fields[FIELD_TICKET] = ticket
1666  
1667          if not lxmessage.packed: lxmessage.pack()
1668  
1669          unknown_path_requested = False
1670          if not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC:
1671              RNS.log(f"Pre-emptively requesting unknown path for opportunistic {lxmessage}", RNS.LOG_DEBUG)
1672              RNS.Transport.request_path(destination_hash)
1673              lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
1674              unknown_path_requested = True
1675  
1676          lxmessage.determine_transport_encryption()
1677  
1678          if lxmessage.defer_stamp and lxmessage.stamp_cost == None:
1679              RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG)
1680              lxmessage.defer_stamp = False
1681  
1682          if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp):
1683              while not unknown_path_requested and self.outbound_processing_lock.locked(): time.sleep(0.05)
1684  
1685              self.pending_outbound.append(lxmessage)
1686              if not unknown_path_requested: threading.Thread(target=self.process_outbound, daemon=True).start()
1687  
1688          else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
1689  
1690      def get_outbound_progress(self, lxm_hash):
1691          for lxm in self.pending_outbound:
1692              if lxm.hash == lxm_hash:
1693                  return lxm.progress
1694  
1695          for lxm_id in self.pending_deferred_stamps:
1696              if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
1697                  return self.pending_deferred_stamps[lxm_id].progress
1698          
1699          return None
1700  
1701      def get_outbound_lxm_stamp_cost(self, lxm_hash):
1702          for lxm in self.pending_outbound:
1703              if lxm.hash == lxm_hash:
1704                  if lxm.outbound_ticket: return None
1705                  else:                   return lxm.stamp_cost
1706  
1707          for lxm_id in self.pending_deferred_stamps:
1708              if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
1709                  lxm = self.pending_deferred_stamps[lxm_id]
1710                  if lxm.outbound_ticket: return None
1711                  else:                   return lxm.stamp_cost
1712          
1713          return None
1714  
1715      def get_outbound_lxm_propagation_stamp_cost(self, lxm_hash):
1716          for lxm in self.pending_outbound:
1717              if lxm.hash == lxm_hash:
1718                  return lxm.propagation_target_cost
1719  
1720          for lxm_id in self.pending_deferred_stamps:
1721              if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
1722                  return self.pending_deferred_stamps[lxm_id].propagation_target_cost
1723          
1724          return None
1725  
1726  
1727      ### Message Routing & Delivery ########################
1728      #######################################################
1729  
1730      def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False):
1731          try:
1732              message = LXMessage.unpack_from_bytes(lxmf_data)
1733              if ratchet_id and not message.ratchet_id:
1734                  message.ratchet_id = ratchet_id
1735  
1736              if method:
1737                  message.method = method
1738  
1739              if message.signature_validated and FIELD_TICKET in message.fields:
1740                  ticket_entry = message.fields[FIELD_TICKET]
1741                  if type(ticket_entry) == list and len(ticket_entry) > 1:
1742                      expires = ticket_entry[0]
1743                      ticket  = ticket_entry[1]
1744  
1745                      if time.time() < expires:
1746                          if type(ticket) == bytes and len(ticket) == LXMessage.TICKET_LENGTH:
1747                              self.remember_ticket(message.source_hash, ticket_entry)
1748                              def save_job():
1749                                  self.save_available_tickets()
1750                              threading.Thread(target=save_job, daemon=True).start()
1751  
1752              required_stamp_cost = self.delivery_destinations[message.destination_hash].stamp_cost
1753              if required_stamp_cost != None:
1754                  destination_tickets = self.get_inbound_tickets(message.source_hash)
1755                  if message.validate_stamp(required_stamp_cost, tickets=destination_tickets):
1756                      message.stamp_valid = True
1757                      message.stamp_checked = True
1758                  else:
1759                      message.stamp_valid = False
1760                      message.stamp_checked = True
1761  
1762                  if not message.stamp_valid:
1763                      if no_stamp_enforcement:
1764                          RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement was temporarily disabled", RNS.LOG_NOTICE)
1765                      else:
1766                          if self._enforce_stamps:
1767                              RNS.log(f"Dropping {message} with invalid stamp", RNS.LOG_NOTICE)
1768                              return False
1769                          else:
1770                              RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement is disabled", RNS.LOG_NOTICE)
1771                  else:
1772                      RNS.log(f"Received {message} with valid stamp", RNS.LOG_DEBUG)
1773  
1774              if phy_stats != None:
1775                  if "rssi" in phy_stats: message.rssi = phy_stats["rssi"]
1776                  if "snr" in phy_stats: message.snr = phy_stats["snr"]
1777                  if "q" in phy_stats: message.q = phy_stats["q"]
1778  
1779              # TODO: Update these descriptions to account for ratchets
1780              if destination_type == RNS.Destination.SINGLE:
1781                  message.transport_encrypted = True
1782                  message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
1783              elif destination_type == RNS.Destination.GROUP:
1784                  message.transport_encrypted = True
1785                  message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES
1786              elif destination_type == RNS.Destination.LINK:
1787                  message.transport_encrypted = True
1788                  message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
1789              else:
1790                  message.transport_encrypted = False
1791                  message.transport_encryption = None
1792  
1793              if message.source_hash in self.ignored_list:
1794                  RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
1795                  return False
1796  
1797              if not allow_duplicate and self.has_message(message.hash):
1798                  RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
1799                  return False
1800              else:
1801                  self.locally_delivered_transient_ids[message.hash] = time.time()
1802  
1803              if self.__delivery_callback != None and callable(self.__delivery_callback):
1804                  try:
1805                      self.__delivery_callback(message)
1806                  except Exception as e:
1807                      RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
1808                      RNS.trace_exception(e)
1809  
1810              return True
1811  
1812          except Exception as e:
1813              RNS.log("Could not assemble LXMF message from received data", RNS.LOG_NOTICE)
1814              RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
1815              return False
1816  
1817      def delivery_packet(self, data, packet):
1818          packet.prove()
1819          try:
1820              method = None
1821              if packet.destination_type != RNS.Destination.LINK:
1822                  method = LXMessage.OPPORTUNISTIC
1823                  lxmf_data  = b""
1824                  lxmf_data += packet.destination.hash
1825                  lxmf_data += data
1826              else:
1827                  method = LXMessage.DIRECT
1828                  lxmf_data = data
1829  
1830              try:
1831                  reticulum = RNS.Reticulum.get_instance()
1832                  if packet.rssi == None: packet.rssi = reticulum.get_packet_rssi(packet.packet_hash)
1833                  if packet.snr  == None: packet.snr  = reticulum.get_packet_snr(packet.packet_hash)
1834                  if packet.q    == None: packet.q    = reticulum.get_packet_q(packet.packet_hash)
1835              except Exception as e:
1836                  RNS.log("Error while retrieving physical link stats for LXMF delivery packet: "+str(e), RNS.LOG_ERROR)
1837  
1838              phy_stats = {"rssi": packet.rssi, "snr": packet.snr, "q": packet.q}
1839  
1840              self.lxmf_delivery(lxmf_data, packet.destination_type, phy_stats=phy_stats, ratchet_id=packet.ratchet_id, method=method)
1841  
1842          except Exception as e:
1843              RNS.log("Exception occurred while parsing incoming LXMF data.", RNS.LOG_ERROR)
1844              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
1845  
1846      def delivery_link_established(self, link):
1847          link.track_phy_stats(True)
1848          link.set_packet_callback(self.delivery_packet)
1849          link.set_resource_strategy(RNS.Link.ACCEPT_APP)
1850          link.set_resource_callback(self.delivery_resource_advertised)
1851          link.set_resource_started_callback(self.resource_transfer_began)
1852          link.set_resource_concluded_callback(self.delivery_resource_concluded)
1853          link.set_remote_identified_callback(self.delivery_remote_identified)
1854  
1855      def delivery_link_closed(self, link):
1856          pass
1857  
1858      def resource_transfer_began(self, resource):
1859          RNS.log("Transfer began for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG)
1860  
1861      def delivery_resource_advertised(self, resource):
1862          size = resource.get_data_size()
1863          limit = self.delivery_per_transfer_limit*1000
1864          if limit != None and size > limit:
1865              RNS.log("Rejecting "+RNS.prettysize(size)+" incoming LXMF delivery resource, since it exceeds the limit of "+RNS.prettysize(limit), RNS.LOG_DEBUG)
1866              return False
1867          else:
1868              return True
1869  
1870      def delivery_resource_concluded(self, resource):
1871          RNS.log("Transfer concluded for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG)
1872          if resource.status == RNS.Resource.COMPLETE:
1873              ratchet_id = None
1874              # Set ratchet ID to link ID if available
1875              if resource.link and hasattr(resource.link, "link_id"):
1876                  ratchet_id = resource.link.link_id
1877              phy_stats = {"rssi": resource.link.rssi, "snr": resource.link.snr, "q": resource.link.q}
1878              self.lxmf_delivery(resource.data.read(), resource.link.type, phy_stats=phy_stats, ratchet_id=ratchet_id, method=LXMessage.DIRECT)
1879  
1880      def delivery_remote_identified(self, link, identity):
1881          destination_hash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", identity)
1882          self.backchannel_links[destination_hash] = link
1883          RNS.log(f"Backchannel became available for {RNS.prettyhexrep(destination_hash)} on delivery link {link}", RNS.LOG_DEBUG)
1884  
1885  
1886      ### Peer Sync & Propagation ###########################
1887      #######################################################
1888  
1889      def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility, peering_cost, metadata):
1890          if peering_cost > self.max_peering_cost:
1891              if destination_hash in self.peers:
1892                  RNS.log(f"Peer {RNS.prettyhexrep(destination_hash)} increased peering cost beyond local accepted maximum, breaking peering...", RNS.LOG_NOTICE)
1893                  self.unpeer(destination_hash, timestamp)
1894              else:
1895                  RNS.log(f"Not peering with {RNS.prettyhexrep(destination_hash)}, since its peering cost of {peering_cost} exceeds local maximum of {self.max_peering_cost}", RNS.LOG_NOTICE)
1896  
1897          else:
1898              if destination_hash in self.peers:
1899                  peer = self.peers[destination_hash]
1900                  if timestamp > peer.peering_timebase:
1901                      peer.alive = True
1902                      peer.metadata = metadata
1903                      peer.sync_backoff = 0
1904                      peer.next_sync_attempt = 0
1905                      peer.peering_timebase = timestamp
1906                      peer.last_heard = time.time()
1907                      peer.propagation_stamp_cost = propagation_stamp_cost
1908                      peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
1909                      peer.peering_cost = peering_cost
1910                      peer.propagation_transfer_limit = propagation_transfer_limit
1911                      if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
1912                      else:                              peer.propagation_sync_limit = propagation_transfer_limit
1913                      
1914                      RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
1915                  
1916              else:
1917                  if len(self.peers) >= self.max_peers: RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
1918                  else:
1919                      peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy)
1920                      peer.alive = True
1921                      peer.metadata = metadata
1922                      peer.last_heard = time.time()
1923                      peer.propagation_stamp_cost = propagation_stamp_cost
1924                      peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
1925                      peer.peering_cost = peering_cost
1926                      peer.propagation_transfer_limit = propagation_transfer_limit
1927                      if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
1928                      else:                              peer.propagation_sync_limit = propagation_transfer_limit
1929                      
1930                      self.peers[destination_hash] = peer
1931                      RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE)
1932  
1933  
1934      def unpeer(self, destination_hash, timestamp = None):
1935          if timestamp == None:
1936              timestamp = int(time.time())
1937  
1938          if destination_hash in self.peers:
1939              peer = self.peers[destination_hash]
1940  
1941              if timestamp >= peer.peering_timebase:
1942                  self.peers.pop(destination_hash)
1943                  RNS.log("Broke peering with "+str(peer.destination))
1944  
1945      def rotate_peers(self):
1946          try:
1947              rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0)))
1948              required_drops = len(self.peers) - (self.max_peers - rotation_headroom)
1949              if required_drops > 0 and len(self.peers) - required_drops > 1:
1950                  peers = self.peers.copy()
1951                  untested_peers = []
1952                  for peer_id in self.peers:
1953                      peer = self.peers[peer_id]
1954                      if peer.last_sync_attempt == 0:
1955                          untested_peers.append(peer)
1956  
1957                  if len(untested_peers) >= rotation_headroom:
1958                      RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
1959                      return
1960                  
1961                  fully_synced_peers = {}
1962                  for peer_id in peers:
1963                      peer = peers[peer_id]
1964                      if peer.unhandled_message_count == 0:
1965                          fully_synced_peers[peer_id] = peer
1966  
1967                  if len(fully_synced_peers) > 0:
1968                      peers = fully_synced_peers
1969                      ms = "" if len(fully_synced_peers) == 1 else "s"
1970                      RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG)
1971  
1972                  culled_peers  = []
1973                  waiting_peers = []
1974                  unresponsive_peers = []
1975                  for peer_id in peers:
1976                      peer = peers[peer_id]
1977                      if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE:
1978                          if peer.alive:
1979                              if peer.offered == 0:
1980                                  # Don't consider for unpeering until at
1981                                  # least one message has been offered
1982                                  pass
1983                              else:
1984                                  waiting_peers.append(peer)
1985                          else:
1986                              unresponsive_peers.append(peer)
1987  
1988                  drop_pool = []
1989                  if len(unresponsive_peers) > 0:
1990                      drop_pool.extend(unresponsive_peers)
1991                      if not self.prioritise_rotating_unreachable_peers:
1992                          drop_pool.extend(waiting_peers)
1993  
1994                  else:
1995                      drop_pool.extend(waiting_peers)
1996  
1997                  if len(drop_pool) > 0:
1998                      drop_count = min(required_drops, len(drop_pool))
1999                      low_acceptance_rate_peers = sorted(
2000                          drop_pool,
2001                          key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ),
2002                          reverse=False
2003                      )[0:drop_count]
2004  
2005                      dropped_peers = 0
2006                      for peer in low_acceptance_rate_peers:
2007                          ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2)
2008                          if ar < LXMRouter.ROTATION_AR_MAX*100:
2009                              reachable_str = "reachable" if peer.alive else "unreachable"
2010                              RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG)
2011                              self.unpeer(peer.destination_hash)
2012                              dropped_peers += 1
2013  
2014                      ms = "" if dropped_peers == 1 else "s"
2015                      RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG)
2016  
2017          except Exception as e:
2018              RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR)
2019              RNS.trace_exception(e)
2020  
2021      def sync_peers(self):
2022          culled_peers  = []
2023          waiting_peers = []
2024          unresponsive_peers = []
2025          peers = self.peers.copy()
2026          for peer_id in peers:
2027              peer = peers[peer_id]
2028              if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
2029                  if not peer_id in self.static_peers: culled_peers.append(peer_id)
2030              
2031              else:
2032                  if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
2033                      if peer.alive: waiting_peers.append(peer)
2034                      else:
2035                          if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: unresponsive_peers.append(peer)
2036                          else: pass # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG)
2037  
2038          peer_pool = []
2039          if len(waiting_peers) > 0:
2040              fastest_peers = sorted(
2041                  waiting_peers,
2042                  key=lambda p: p.sync_transfer_rate,
2043                  reverse=True
2044              )[0:min(LXMRouter.FASTEST_N_RANDOM_POOL, len(waiting_peers))]
2045              peer_pool.extend(fastest_peers)
2046              
2047              unknown_speed_peers = [p for p in waiting_peers if p.sync_transfer_rate == 0]
2048              if len(unknown_speed_peers) > 0:
2049                  peer_pool.extend(
2050                      unknown_speed_peers[
2051                          0:min(
2052                              len(unknown_speed_peers),
2053                              len(fastest_peers)
2054                          )]
2055                  )
2056  
2057              RNS.log("Selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
2058              
2059          elif len(unresponsive_peers) > 0:
2060              RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG)
2061              peer_pool = unresponsive_peers
2062          
2063          if len(peer_pool) > 0:
2064              selected_index = random.randint(0,len(peer_pool)-1)
2065              selected_peer = peer_pool[selected_index]
2066              RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
2067              selected_peer.sync()
2068  
2069          for peer_id in culled_peers:
2070              RNS.log("Removing peer "+RNS.prettyhexrep(peer_id)+" due to excessive unreachability", RNS.LOG_WARNING)
2071              try:
2072                  if peer_id in self.peers:
2073                      self.peers.pop(peer_id)
2074              except Exception as e:
2075                  RNS.log("Error while removing peer "+RNS.prettyhexrep(peer_id)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
2076  
2077      def propagation_link_established(self, link):
2078          link.set_packet_callback(self.propagation_packet)
2079          link.set_resource_strategy(RNS.Link.ACCEPT_APP)
2080          link.set_resource_callback(self.propagation_resource_advertised)
2081          link.set_resource_started_callback(self.resource_transfer_began)
2082          link.set_resource_concluded_callback(self.propagation_resource_concluded)
2083          self.active_propagation_links.append(link)
2084  
2085      def propagation_resource_advertised(self, resource):
2086          if self.from_static_only:
2087              remote_identity = resource.link.get_remote_identity()
2088              if remote_identity == None:
2089                  RNS.log(f"Rejecting propagation resource from unidentified peer", RNS.LOG_DEBUG)
2090                  return False
2091              else:
2092                  remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
2093                  remote_hash = remote_destination.hash
2094                  remote_str  = RNS.prettyhexrep(remote_hash)
2095                  if not remote_hash in self.static_peers:
2096                      RNS.log(f"Rejecting propagation resource from {remote_str} not in static peers list", RNS.LOG_DEBUG)
2097                      return False
2098  
2099          size = resource.get_data_size()
2100          limit = self.propagation_per_sync_limit*1000
2101          if limit != None and size > limit:
2102              RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG)
2103              return False
2104          else:
2105              return True
2106  
2107      def propagation_packet(self, data, packet):
2108          try:
2109              if packet.destination_type != RNS.Destination.LINK: return
2110              else:
2111                  data               = msgpack.unpackb(data)
2112                  remote_timebase    = data[0]
2113                  messages           = data[1]
2114  
2115                  min_accepted_cost  = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
2116                  validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost)
2117  
2118                  for validated_entry in validated_messages:
2119                      lxmf_data    = validated_entry[1]
2120                      stamp_value  = validated_entry[2]
2121                      stamp_data   = validated_entry[3]
2122                      self.lxmf_propagation(lxmf_data, stamp_value=stamp_value, stamp_data=stamp_data)
2123                      self.client_propagation_messages_received += 1
2124  
2125                  if len(validated_messages) == len(messages):
2126                      ms = "" if len(messages) == 1 else "s"
2127                      RNS.log(f"Received {len(messages)} propagation message{ms} from client with valid stamp{ms}", RNS.LOG_DEBUG)
2128                      packet.prove()
2129                  else:
2130                      RNS.log("Propagation transfer from client contained messages with invalid stamps", RNS.LOG_NOTICE)
2131                      reject_data = msgpack.packb([LXMPeer.ERROR_INVALID_STAMP])
2132                      RNS.Packet(packet.link, reject_data).send()
2133                      packet.link.teardown()
2134  
2135          except Exception as e:
2136              RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR)
2137              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
2138  
2139      def offer_request(self, path, data, request_id, link_id, remote_identity, requested_at):
2140          if remote_identity == None:
2141              return LXMPeer.ERROR_NO_IDENTITY
2142          else:
2143              remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
2144              remote_hash = remote_destination.hash
2145              remote_str  = RNS.prettyhexrep(remote_hash)
2146  
2147              if remote_hash in self.throttled_peers:
2148                  throttle_remaining = self.throttled_peers[remote_hash]-time.time()
2149                  if throttle_remaining > 0:
2150                      RNS.log(f"Propagation offer from node {remote_str} rejected, throttled for {RNS.prettytime(throttle_remaining)} more", RNS.LOG_NOTICE)
2151                      return LXMPeer.ERROR_THROTTLED
2152                  else: self.throttled_peers.pop(remote_hash)
2153  
2154              if self.from_static_only:
2155                  if not remote_hash in self.static_peers:
2156                      RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG)
2157                      return LXMPeer.ERROR_NO_ACCESS
2158  
2159              try:
2160                  if type(data) != list and len(data) < 2: return LXMPeer.ERROR_INVALID_DATA
2161  
2162                  peering_id         = self.identity.hash+remote_identity.hash
2163                  target_cost        = self.peering_cost
2164                  peering_key        = data[0]
2165                  transient_ids      = data[1]
2166                  wanted_ids         = []
2167  
2168                  ts                 = time.time()
2169                  peering_key_valid  = LXStamper.validate_peering_key(peering_id, peering_key, target_cost)
2170                  td                 = time.time() - ts
2171  
2172                  if not peering_key_valid:
2173                      RNS.log(f"Invalid peering key for incoming sync offer", RNS.LOG_DEBUG)
2174                      return LXMPeer.ERROR_INVALID_KEY
2175  
2176                  else:
2177                      RNS.log(f"Peering key validated for incoming offer in {RNS.prettytime(td)}", RNS.LOG_DEBUG)
2178                      self.validated_peer_links[link_id] = True
2179                      for transient_id in transient_ids:
2180                          if not transient_id in self.propagation_entries: wanted_ids.append(transient_id)
2181  
2182                      if len(wanted_ids)   == 0:                  return False
2183                      elif len(wanted_ids) == len(transient_ids): return True
2184                      else:                                       return wanted_ids
2185  
2186              except Exception as e:
2187                  RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
2188                  RNS.trace_exception(e)
2189                  return None
2190  
2191      def propagation_resource_concluded(self, resource):
2192          if resource.status == RNS.Resource.COMPLETE:
2193              try:
2194                  data = msgpack.unpackb(resource.data.read())
2195  
2196                  if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list:
2197                      # This is a series of propagation messages from a peer or originator
2198                      remote_identity = resource.link.get_remote_identity()
2199                      remote_timebase = data[0]
2200                      messages        = data[1]
2201                      remote_hash     = None
2202                      remote_str      = "unknown client"
2203  
2204                      if remote_identity != None:
2205                          remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
2206                          remote_hash        = remote_destination.hash
2207                          remote_app_data    = RNS.Identity.recall_app_data(remote_hash)
2208                          remote_str         = RNS.prettyhexrep(remote_hash)
2209  
2210                          if remote_hash in self.peers: remote_str = f"peer {remote_str}"
2211                          else:
2212                              if pn_announce_data_is_valid(remote_app_data):
2213                                  # 1: Current node timebase
2214                                  # 2: Boolean flag signalling propagation node state
2215                                  # 3: Per-transfer limit for message propagation in kilobytes
2216                                  # 4: Limit for incoming propagation node syncs
2217                                  # 5: Propagation stamp costs for this node
2218                                  # 6: Node metadata
2219                                  pn_config = msgpack.unpackb(remote_app_data)
2220                                  if pn_config[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
2221                                      remote_timebase       = pn_config[1]
2222                                      remote_transfer_limit = pn_config[3]
2223                                      remote_sync_limit     = pn_config[4]
2224                                      remote_stamp_cost     = pn_config[5][0]
2225                                      remote_stamp_flex     = pn_config[5][1]
2226                                      remote_peering_cost   = pn_config[5][2]
2227                                      remote_metadata       = pn_config[6]
2228  
2229                                      RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug
2230                                      self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata)
2231  
2232                      peering_key_valid = False
2233                      if remote_identity != None:
2234                          if resource.link.link_id in self.validated_peer_links and self.validated_peer_links[resource.link.link_id] == True:
2235                              peering_key_valid = True
2236  
2237                      if not peering_key_valid and len(messages) > 1:
2238                          resource.link.teardown()
2239                          RNS.log(f"Received multiple propagation messages from {remote_str} without valid peering key presentation. This is not supposed to happen, ignoring.", RNS.LOG_WARNING)
2240                          RNS.log(f"Clients and peers without a valid peering key can only deliver 1 message per transfer.", RNS.LOG_WARNING)
2241                      else:
2242                          ms = "" if len(messages) == 1 else "s"
2243                          RNS.log(f"Received {len(messages)} message{ms} from {remote_str}, validating stamps...", RNS.LOG_VERBOSE)
2244  
2245                          min_accepted_cost  = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
2246                          validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost)
2247                          invalid_stamps     = len(messages)-len(validated_messages)
2248                          ms                 = "" if invalid_stamps == 1 else "s"
2249                          if len(validated_messages) == len(messages): RNS.log(f"All message stamps validated from {remote_str}", RNS.LOG_VERBOSE)
2250                          else:                                        RNS.log(f"Transfer from {remote_str} contained {invalid_stamps} invalid stamp{ms}", RNS.LOG_WARNING)
2251  
2252                          for validated_entry in validated_messages:
2253                              transient_id = validated_entry[0]
2254                              lxmf_data    = validated_entry[1]
2255                              stamp_value  = validated_entry[2]
2256                              stamp_data   = validated_entry[3]
2257                              peer         = None
2258                              
2259                              if remote_hash != None and remote_hash in self.peers:
2260                                  peer = self.peers[remote_hash]
2261                                  peer.incoming += 1
2262                                  peer.rx_bytes += len(lxmf_data)
2263                              else:
2264                                  if remote_identity != None:
2265                                      self.unpeered_propagation_incoming += 1
2266                                      self.unpeered_propagation_rx_bytes += len(lxmf_data)
2267                                  else:
2268                                      self.client_propagation_messages_received += 1
2269  
2270                              self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data)
2271                              if peer != None: peer.queue_handled_message(transient_id)
2272  
2273                          invalid_message_count = len(messages) - len(validated_messages)
2274                          if invalid_message_count > 0:
2275                              resource.link.teardown()
2276                              if remote_identity != None:
2277                                  throttle_time = LXMRouter.PN_STAMP_THROTTLE
2278                                  self.throttled_peers[remote_hash] = time.time()+throttle_time
2279                                  ms = "" if invalid_message_count == 1 else "s"
2280                                  RNS.log(f"Propagation transfer from {remote_str} contained {invalid_message_count} message{ms} with invalid stamps, throttled for {RNS.prettytime(throttle_time)}", RNS.LOG_NOTICE)
2281  
2282                  else:
2283                      RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
2284      
2285              except Exception as e:
2286                  RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
2287                  RNS.trace_exception(e)
2288  
2289      def enqueue_peer_distribution(self, transient_id, from_peer):
2290          self.peer_distribution_queue.append([transient_id, from_peer])
2291  
2292      def flush_peer_distribution_queue(self):
2293          if len(self.peer_distribution_queue) > 0:
2294              entries = []
2295              while len(self.peer_distribution_queue) > 0:
2296                  entries.append(self.peer_distribution_queue.pop())
2297  
2298              for peer_id in self.peers.copy():
2299                  if peer_id in self.peers:
2300                      peer = self.peers[peer_id]
2301                      for entry in entries:
2302                          transient_id = entry[0]
2303                          from_peer = entry[1]
2304                          if peer != from_peer:
2305                              peer.queue_unhandled_message(transient_id)
2306  
2307      def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False,
2308                           from_peer=None, stamp_value=None, stamp_data=None):
2309          if is_paper_message: no_stamp_enforcement = True
2310          else:                no_stamp_enforcement = False
2311  
2312          try:
2313              if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
2314                  transient_id = RNS.Identity.full_hash(lxmf_data)
2315  
2316                  if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True:
2317                      received = time.time()
2318                      destination_hash  = lxmf_data[:LXMessage.DESTINATION_LENGTH]
2319  
2320                      self.locally_processed_transient_ids[transient_id] = received
2321  
2322                      if destination_hash in self.delivery_destinations:
2323                          delivery_destination = self.delivery_destinations[destination_hash]
2324                          encrypted_lxmf_data = lxmf_data[LXMessage.DESTINATION_LENGTH:]
2325                          decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
2326                          if decrypted_lxmf_data != None:
2327                              delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
2328                              self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate)
2329                              self.locally_delivered_transient_ids[transient_id] = time.time()
2330  
2331                              if signal_local_delivery != None:
2332                                  return signal_local_delivery
2333  
2334                      else:
2335                          if self.propagation_node:
2336                              stamped_data    = lxmf_data+stamp_data
2337                              value_component = f"_{stamp_value}" if stamp_value and stamp_value > 0 else ""
2338                              file_path       = f"{self.messagepath}/{RNS.hexrep(transient_id, delimit=False)}_{received}{value_component}"
2339                              msg_file        = open(file_path, "wb")
2340                              msg_file.write(stamped_data); msg_file.close()
2341  
2342                              RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)} with stamp value {stamp_value}, adding to peer distribution queues...", RNS.LOG_EXTREME)
2343                              self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(stamped_data), [], [], stamp_value]
2344                              self.enqueue_peer_distribution(transient_id, from_peer)
2345  
2346                          else:
2347                              # TODO: Add message to sneakernet queues when implemented
2348                              RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)}, but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG)
2349  
2350                      return True
2351  
2352                  else:
2353                      if signal_duplicate != None:
2354                          return signal_duplicate
2355  
2356                      else:
2357                          return False
2358  
2359              return False
2360  
2361          except Exception as e:
2362              RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG)
2363              RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
2364              RNS.trace_exception(e)
2365              return False
2366  
2367      def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False):
2368          try:
2369              if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"):
2370                  RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR)
2371                  return False
2372  
2373              else:
2374                  lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
2375                  transient_id = RNS.Identity.full_hash(lxmf_data)
2376                  
2377                  router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
2378                  if router_propagation_result != False:
2379                      RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
2380                      return router_propagation_result
2381                  else:
2382                      RNS.log("No valid LXM could be ingested from the provided URI", RNS.LOG_DEBUG)
2383                      return False
2384  
2385          except Exception as e:
2386              RNS.log("Error while decoding URI-encoded LXMF message. The contained exception was: "+str(e), RNS.LOG_ERROR)
2387              return False
2388  
2389      def fail_message(self, lxmessage):
2390          RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
2391  
2392          lxmessage.progress = 0.0
2393          if lxmessage in self.pending_outbound:
2394              self.pending_outbound.remove(lxmessage)
2395  
2396          self.failed_outbound.append(lxmessage)
2397  
2398          if lxmessage.state != LXMessage.REJECTED:
2399              lxmessage.state = LXMessage.FAILED
2400  
2401          if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
2402              lxmessage.failed_callback(lxmessage)
2403  
2404      def process_deferred_stamps(self):
2405          if len(self.pending_deferred_stamps) > 0:
2406  
2407              if self.stamp_gen_lock.locked():
2408                  return
2409  
2410              else:
2411                  with self.stamp_gen_lock:
2412                      selected_lxm = None
2413                      selected_message_id = None
2414                      for message_id in self.pending_deferred_stamps:
2415                          lxmessage = self.pending_deferred_stamps[message_id]
2416                          if selected_lxm == None:
2417                              selected_lxm = lxmessage
2418                              selected_message_id = message_id
2419  
2420                      if selected_lxm != None:
2421                          if selected_lxm.state == LXMessage.CANCELLED:
2422                              RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
2423                              selected_lxm.stamp_generation_failed = True
2424                              self.pending_deferred_stamps.pop(selected_message_id)
2425                              if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
2426                                  selected_lxm.failed_callback(lxmessage)
2427                              
2428                              return
2429  
2430                          if selected_lxm.defer_stamp:
2431                              if selected_lxm.stamp == None: stamp_generation_success = False
2432                              else:                          stamp_generation_success = True
2433                          else:                              stamp_generation_success = True
2434  
2435                          if selected_lxm.desired_method == LXMessage.PROPAGATED:
2436                              if selected_lxm.propagation_stamp == None: propagation_stamp_generation_success = False
2437                              else:                                      propagation_stamp_generation_success = True
2438                          else:                                          propagation_stamp_generation_success = True
2439  
2440                          if stamp_generation_success == False:
2441                              RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
2442                              generated_stamp = selected_lxm.get_stamp()
2443                              if generated_stamp:
2444                                  selected_lxm.stamp = generated_stamp
2445                                  selected_lxm.defer_stamp = False
2446                                  selected_lxm.packed = None
2447                                  selected_lxm.pack(payload_updated=True)
2448                                  stamp_generation_success = True
2449                                  RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
2450                              else:
2451                                  if selected_lxm.state == LXMessage.CANCELLED:
2452                                      RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
2453                                      selected_lxm.stamp_generation_failed = True
2454                                      self.pending_deferred_stamps.pop(selected_message_id)
2455                                      if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
2456                                          selected_lxm.failed_callback(lxmessage)
2457                                  else:
2458                                      RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
2459                                      selected_lxm.stamp_generation_failed = True
2460                                      self.pending_deferred_stamps.pop(selected_message_id)
2461                                      self.fail_message(selected_lxm)
2462  
2463                          if propagation_stamp_generation_success == False:
2464                              RNS.log(f"Starting propagation stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
2465                              pn_target_cost = self.get_outbound_propagation_cost()
2466                              if pn_target_cost == None:
2467                                  RNS.log("Failed to get propagation node stamp cost, cannot generate propagation stamp", RNS.LOG_ERROR)
2468                                  selected_lxm.stamp_generation_failed = True
2469                                  self.pending_deferred_stamps.pop(selected_message_id)
2470                                  self.fail_message(selected_lxm)
2471  
2472                              else:
2473                                  propagation_stamp = selected_lxm.get_propagation_stamp(target_cost=pn_target_cost)
2474                                  if propagation_stamp:
2475                                      selected_lxm.propagation_stamp = propagation_stamp
2476                                      selected_lxm.defer_propagation_stamp = False
2477                                      selected_lxm.packed = None
2478                                      selected_lxm.pack()
2479                                      propagation_stamp_generation_success = True
2480                                      RNS.log(f"Propagation stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
2481                                  else:
2482                                      if selected_lxm.state == LXMessage.CANCELLED:
2483                                          RNS.log(f"Message cancelled during deferred propagation stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
2484                                          selected_lxm.stamp_generation_failed = True
2485                                          self.pending_deferred_stamps.pop(selected_message_id)
2486                                          if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
2487                                              selected_lxm.failed_callback(lxmessage)
2488                                      else:
2489                                          RNS.log(f"Deferred propagation stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
2490                                          selected_lxm.stamp_generation_failed = True
2491                                          self.pending_deferred_stamps.pop(selected_message_id)
2492                                          self.fail_message(selected_lxm)
2493  
2494                          if stamp_generation_success and propagation_stamp_generation_success:
2495                              self.pending_deferred_stamps.pop(selected_message_id)
2496                              self.pending_outbound.append(selected_lxm)
2497  
2498      def propagation_transfer_signalling_packet(self, data, packet):
2499          try:
2500              unpacked = msgpack.unpackb(data)
2501              if type(unpacked) == list and len(unpacked) >= 1:
2502                  signal = unpacked[0]
2503                  if signal == LXMPeer.ERROR_INVALID_STAMP:
2504                      RNS.log("Message rejected by propagation node", RNS.LOG_ERROR)
2505                      if hasattr(packet, "link") and hasattr(packet.link, "for_lxmessage"):
2506                          lxm = packet.link.for_lxmessage
2507                          RNS.log(f"Invalid propagation stamp on {lxm}", RNS.LOG_ERROR)
2508                          self.cancel_outbound(lxm.message_id, cancel_state=LXMessage.REJECTED)
2509  
2510          except Exception as e:
2511              RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR)
2512  
2513      def process_outbound(self, sender = None):
2514          if self.outbound_processing_lock.locked(): return
2515          with self.outbound_processing_lock:
2516              for lxmessage in self.pending_outbound:
2517                  if lxmessage.state == LXMessage.DELIVERED:
2518                      RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
2519                      self.pending_outbound.remove(lxmessage)
2520  
2521                      # Udate ticket delivery stats
2522                      if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
2523                          RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
2524                          self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
2525                          self.save_available_tickets()
2526  
2527                      # Prepare link for backchannel communications
2528                      delivery_destination_hash = lxmessage.get_destination().hash
2529                      if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
2530                          direct_link = self.direct_links[delivery_destination_hash]
2531                          if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
2532                              if direct_link.initiator == True:
2533                                  source_destination_hash = lxmessage.get_source().hash
2534                                  if source_destination_hash in self.delivery_destinations:
2535                                      backchannel_identity = self.delivery_destinations[source_destination_hash].identity
2536                                      backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
2537                                      direct_link.identify(backchannel_identity)
2538                                      direct_link.backchannel_identified = True
2539                                      self.delivery_link_established(direct_link)
2540                                      RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
2541  
2542                  elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
2543                      RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
2544                      self.pending_outbound.remove(lxmessage)
2545  
2546                  elif lxmessage.state == LXMessage.CANCELLED:
2547                      RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
2548                      self.pending_outbound.remove(lxmessage)
2549                      if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
2550                          lxmessage.failed_callback(lxmessage)
2551  
2552                  elif lxmessage.state == LXMessage.REJECTED:
2553                      RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
2554                      if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
2555                      if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
2556                          lxmessage.failed_callback(lxmessage)
2557  
2558                  else:
2559                      RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2560  
2561                      if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
2562  
2563                      # Outbound handling for opportunistic messages
2564                      if lxmessage.method == LXMessage.OPPORTUNISTIC:
2565                          if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
2566                              if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
2567                                  RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
2568                                  lxmessage.delivery_attempts += 1
2569                                  RNS.Transport.request_path(lxmessage.get_destination().hash)
2570                                  lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
2571                                  lxmessage.progress = 0.01
2572                              elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
2573                                  RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
2574                                  lxmessage.delivery_attempts += 1
2575                                  RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
2576                                  def rediscover_job():
2577                                      time.sleep(0.5)
2578                                      RNS.Transport.request_path(lxmessage.get_destination().hash)
2579                                  threading.Thread(target=rediscover_job, daemon=True).start()
2580                                  lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
2581                                  lxmessage.progress = 0.01
2582                              else:
2583                                  if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
2584                                      lxmessage.delivery_attempts += 1
2585                                      lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
2586                                      RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2587                                      lxmessage.send()
2588                          else:
2589                              RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2590                              self.fail_message(lxmessage)
2591  
2592                      # Outbound handling for messages transferred
2593                      # over a direct link to the final recipient
2594                      elif lxmessage.method == LXMessage.DIRECT:
2595                          if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
2596                              delivery_destination_hash = lxmessage.get_destination().hash
2597                              direct_link = None
2598                              
2599                              if delivery_destination_hash in self.direct_links:
2600                                  # An established direct link already exists to
2601                                  # the destination, so we'll try to use it for
2602                                  # delivering the message
2603                                  direct_link = self.direct_links[delivery_destination_hash]
2604                                  RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
2605  
2606                              elif delivery_destination_hash in self.backchannel_links:
2607                                  # An established backchannel link exists to
2608                                  # the destination, so we'll try to use it for
2609                                  # delivering the message
2610                                  direct_link = self.backchannel_links[delivery_destination_hash]
2611                                  RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
2612  
2613                              if direct_link != None:
2614                                  if direct_link.status == RNS.Link.ACTIVE:
2615                                      if lxmessage.progress == None or lxmessage.progress < 0.05:
2616                                          lxmessage.progress = 0.05
2617                                      if lxmessage.state != LXMessage.SENDING:
2618                                          RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
2619                                          lxmessage.set_delivery_destination(direct_link)
2620                                          lxmessage.send()
2621                                      else:
2622                                          if lxmessage.representation == LXMessage.RESOURCE:
2623                                              RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
2624                                          else:
2625                                              RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
2626                                  elif direct_link.status == RNS.Link.CLOSED:
2627                                      if direct_link.activated_at != None:
2628                                          RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
2629                                          RNS.Transport.request_path(lxmessage.get_destination().hash)
2630                                      else:
2631                                          if not hasattr(lxmessage, "path_request_retried"):
2632                                              RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
2633                                              RNS.Transport.request_path(lxmessage.get_destination().hash)
2634                                              lxmessage.path_request_retried = True
2635                                          else:
2636                                              RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
2637  
2638                                          lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
2639  
2640                                      lxmessage.set_delivery_destination(None)
2641                                      if delivery_destination_hash in self.direct_links:
2642                                          self.direct_links.pop(delivery_destination_hash)
2643                                      if delivery_destination_hash in self.backchannel_links:
2644                                          self.backchannel_links.pop(delivery_destination_hash)
2645                                      lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
2646                                  else:
2647                                      # Simply wait for the link to become active or close
2648                                      RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
2649                              else:
2650                                  # No link exists, so we'll try to establish one, but
2651                                  # only if we've never tried before, or the retry wait
2652                                  # period has elapsed.
2653                                  if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
2654                                      lxmessage.delivery_attempts += 1
2655                                      lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
2656  
2657                                      if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
2658                                          if RNS.Transport.has_path(lxmessage.get_destination().hash):
2659                                              RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2660                                              delivery_link = RNS.Link(lxmessage.get_destination())
2661                                              delivery_link.set_link_established_callback(self.process_outbound)
2662                                              self.direct_links[delivery_destination_hash] = delivery_link
2663                                              lxmessage.progress = 0.03
2664                                          else:
2665                                              RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
2666                                              RNS.Transport.request_path(lxmessage.get_destination().hash)
2667                                              lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
2668                                              lxmessage.progress = 0.01
2669                          else:
2670                              RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2671                              self.fail_message(lxmessage)
2672  
2673                      # Outbound handling for messages transported via
2674                      # propagation to a LXMF router network.
2675                      elif lxmessage.method == LXMessage.PROPAGATED:
2676                          RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2677  
2678                          if self.outbound_propagation_node == None:
2679                              RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
2680                              self.fail_message(lxmessage)
2681                          else:
2682                              if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
2683  
2684                                  if self.outbound_propagation_link != None:
2685                                      # A link already exists, so we'll try to use it
2686                                      # to deliver the message
2687                                      if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
2688                                          if lxmessage.state != LXMessage.SENDING:
2689                                              RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
2690                                              lxmessage.set_delivery_destination(self.outbound_propagation_link)
2691                                              lxmessage.send()
2692                                          else:
2693                                              if lxmessage.representation == LXMessage.RESOURCE:
2694                                                  RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
2695                                              else:
2696                                                  RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
2697                                      elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
2698                                          RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
2699                                          self.outbound_propagation_link = None
2700                                          lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
2701                                      else:
2702                                          # Simply wait for the link to become
2703                                          # active or close
2704                                          RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
2705                                  else:
2706                                      # No link exists, so we'll try to establish one, but
2707                                      # only if we've never tried before, or the retry wait
2708                                      # period has elapsed.
2709                                      if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
2710                                          lxmessage.delivery_attempts += 1
2711                                          lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
2712  
2713                                          if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
2714                                              if RNS.Transport.has_path(self.outbound_propagation_node):
2715                                                  RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2716                                                  propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
2717                                                  propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
2718                                                  self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
2719                                                  self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
2720                                                  self.outbound_propagation_link.for_lxmessage = lxmessage
2721                                              else:
2722                                                  RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
2723                                                  RNS.Transport.request_path(self.outbound_propagation_node)
2724                                                  lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
2725  
2726                              else:
2727                                  RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
2728                                  self.fail_message(lxmessage)