/ RNS / Resource.py
Resource.py
   1  # Reticulum License
   2  #
   3  # Copyright (c) 2016-2025 Mark Qvist
   4  #
   5  # Permission is hereby granted, free of charge, to any person obtaining a copy
   6  # of this software and associated documentation files (the "Software"), to deal
   7  # in the Software without restriction, including without limitation the rights
   8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
   9  # copies of the Software, and to permit persons to whom the Software is
  10  # furnished to do so, subject to the following conditions:
  11  #
  12  # - The Software shall not be used in any kind of system which includes amongst
  13  #   its functions the ability to purposefully do harm to human beings.
  14  #
  15  # - The Software shall not be used, directly or indirectly, in the creation of
  16  #   an artificial intelligence, machine learning or language model training
  17  #   dataset, including but not limited to any use that contributes to the
  18  #   training or development of such a model or algorithm.
  19  #
  20  # - The above copyright notice and this permission notice shall be included in
  21  #   all copies or substantial portions of the Software.
  22  #
  23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  24  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  25  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  26  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  27  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  28  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  29  # SOFTWARE.
  30  
  31  import RNS
  32  import os
  33  import bz2
  34  import math
  35  import time
  36  import struct
  37  import tempfile
  38  import threading
  39  from threading import Lock
  40  from .vendor import umsgpack as umsgpack
  41  from time import sleep
  42  
  43  class Resource:
  44      """
  45      The Resource class allows transferring arbitrary amounts
  46      of data over a link. It will automatically handle sequencing,
  47      compression, coordination and checksumming.
  48  
  49      :param data: The data to be transferred. Can be *bytes* or an open *file handle*. See the :ref:`Filetransfer Example<example-filetransfer>` for details.
  50      :param link: The :ref:`RNS.Link<api-link>` instance on which to transfer the data.
  51      :param advertise: Optional. Whether to automatically advertise the resource. Can be *True* or *False*.
  52      :param auto_compress: Optional. Whether to auto-compress the resource. Can be *True* or *False*.
  53      :param callback: An optional *callable* with the signature *callback(resource)*. Will be called when the resource transfer concludes.
  54      :param progress_callback: An optional *callable* with the signature *callback(resource)*. Will be called whenever the resource transfer progress is updated.
  55      """
  56  
  57      # The initial window size at beginning of transfer
  58      WINDOW               = 4
  59  
  60      # Absolute minimum window size during transfer
  61      WINDOW_MIN           = 2
  62  
  63      # The maximum window size for transfers on slow links
  64      WINDOW_MAX_SLOW      = 10
  65  
  66      # The maximum window size for transfers on very slow links
  67      WINDOW_MAX_VERY_SLOW = 4
  68  
  69      # The maximum window size for transfers on fast links
  70      WINDOW_MAX_FAST      = 75
  71      
  72      # For calculating maps and guard segments, this
  73      # must be set to the global maximum window.
  74      WINDOW_MAX           = WINDOW_MAX_FAST
  75      
  76      # If the fast rate is sustained for this many request
  77      # rounds, the fast link window size will be allowed.
  78      FAST_RATE_THRESHOLD  = WINDOW_MAX_SLOW - WINDOW - 2
  79  
  80      # If the very slow rate is sustained for this many request
  81      # rounds, window will be capped to the very slow limit.
  82      VERY_SLOW_RATE_THRESHOLD = 2
  83  
  84      # If the RTT rate is higher than this value,
  85      # the max window size for fast links will be used.
  86      # The default is 50 Kbps (the value is stored in
  87      # bytes per second, hence the "/ 8").
  88      RATE_FAST            = (50*1000) / 8
  89  
  90      # If the RTT rate is lower than this value,
  91      # the window size will be capped at .
  92      # The default is 50 Kbps (the value is stored in
  93      # bytes per second, hence the "/ 8").
  94      RATE_VERY_SLOW       = (2*1000) / 8
  95  
  96      # The minimum allowed flexibility of the window size.
  97      # The difference between window_max and window_min
  98      # will never be smaller than this value.
  99      WINDOW_FLEXIBILITY   = 4
 100  
 101      # Number of bytes in a map hash
 102      MAPHASH_LEN          = 4
 103      SDU                  = RNS.Packet.MDU
 104      RANDOM_HASH_SIZE     = 4
 105  
 106      # This is an indication of what the
 107      # maximum size a resource should be, if
 108      # it is to be handled within reasonable
 109      # time constraint, even on small systems.
 110      #
 111      # This constant will be used when determining
 112      # how to sequence the sending of large resources.
 113      #
 114      # Capped at 16777215 (0xFFFFFF) per segment to
 115      # fit in 3 bytes in resource advertisements.
 116      MAX_EFFICIENT_SIZE      = 1 * 1024 * 1024 - 1
 117      RESPONSE_MAX_GRACE_TIME = 10
 118  
 119      # Max metadata size is 16777215 (0xFFFFFF) bytes
 120      METADATA_MAX_SIZE       = 16 * 1024 * 1024 - 1
 121      
 122      # The maximum size to auto-compress with
 123      # bz2 before sending.
 124      AUTO_COMPRESS_MAX_SIZE = 64 * 1024 * 1024
 125  
 126      PART_TIMEOUT_FACTOR           = 4
 127      PART_TIMEOUT_FACTOR_AFTER_RTT = 2
 128      PROOF_TIMEOUT_FACTOR          = 3
 129      MAX_RETRIES                   = 16
 130      MAX_ADV_RETRIES               = 4
 131      SENDER_GRACE_TIME             = 10.0
 132      PROCESSING_GRACE              = 1.0
 133      RETRY_GRACE_TIME              = 0.25
 134      PER_RETRY_DELAY               = 0.5
 135  
 136      WATCHDOG_MAX_SLEEP            = 1
 137  
 138      HASHMAP_IS_NOT_EXHAUSTED = 0x00
 139      HASHMAP_IS_EXHAUSTED = 0xFF
 140  
 141      # Status constants
 142      NONE            = 0x00
 143      QUEUED          = 0x01
 144      ADVERTISED      = 0x02
 145      TRANSFERRING    = 0x03
 146      AWAITING_PROOF  = 0x04
 147      ASSEMBLING      = 0x05
 148      COMPLETE        = 0x06
 149      FAILED          = 0x07
 150      CORRUPT         = 0x08
 151      REJECTED        = 0x00
 152  
 153      @staticmethod
 154      def reject(advertisement_packet):
 155          try:
 156              adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
 157              resource_hash = adv.h
 158              reject_packet = RNS.Packet(advertisement_packet.link, resource_hash, context=RNS.Packet.RESOURCE_RCL)
 159              reject_packet.send()
 160  
 161          except Exception as e:
 162              RNS.log(f"An error ocurred while rejecting advertised resource: {e}", RNS.LOG_ERROR)
 163              RNS.trace_exception(e)
 164  
 165      @staticmethod
 166      def accept(advertisement_packet, callback=None, progress_callback = None, request_id = None):
 167          try:
 168              adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
 169  
 170              resource = Resource(None, advertisement_packet.link, request_id = request_id)
 171              resource.status = Resource.TRANSFERRING
 172  
 173              resource.flags                = adv.f
 174              resource.size                 = adv.t
 175              resource.total_size           = adv.d
 176              resource.uncompressed_size    = adv.d
 177              resource.hash                 = adv.h
 178              resource.original_hash        = adv.o
 179              resource.random_hash          = adv.r
 180              resource.hashmap_raw          = adv.m
 181              resource.encrypted            = True if resource.flags & 0x01 else False
 182              resource.compressed           = True if resource.flags >> 1 & 0x01 else False
 183              resource.initiator            = False
 184              resource.callback             = callback
 185              resource.__progress_callback  = progress_callback
 186              resource.total_parts          = int(math.ceil(resource.size/float(resource.sdu)))
 187              resource.received_count       = 0
 188              resource.outstanding_parts    = 0
 189              resource.parts                = [None] * resource.total_parts
 190              resource.window               = Resource.WINDOW
 191              resource.window_max           = Resource.WINDOW_MAX_SLOW
 192              resource.window_min           = Resource.WINDOW_MIN
 193              resource.window_flexibility   = Resource.WINDOW_FLEXIBILITY
 194              resource.last_activity        = time.time()
 195              resource.started_transferring = resource.last_activity
 196  
 197              resource.storagepath          = RNS.Reticulum.resourcepath+"/"+resource.original_hash.hex()
 198              resource.meta_storagepath     = resource.storagepath+".meta"
 199              resource.segment_index        = adv.i
 200              resource.total_segments       = adv.l
 201              
 202              if adv.l > 1: resource.split = True
 203              else: resource.split = False
 204  
 205              if adv.x: resource.has_metadata = True
 206              else:     resource.has_metadata = False
 207  
 208              resource.hashmap = [None] * resource.total_parts
 209              resource.hashmap_height = 0
 210              resource.waiting_for_hmu = False
 211              resource.receiving_part = False
 212              resource.consecutive_completed_height = -1
 213  
 214              previous_window = resource.link.get_last_resource_window()
 215              previous_eifr   = resource.link.get_last_resource_eifr()
 216              if previous_window:
 217                  resource.window = previous_window
 218              if previous_eifr:
 219                  resource.previous_eifr = previous_eifr
 220              
 221              if not resource.link.has_incoming_resource(resource):
 222                  resource.link.register_incoming_resource(resource)
 223  
 224                  RNS.log(f"Accepting resource advertisement for {RNS.prettyhexrep(resource.hash)}. Transfer size is {RNS.prettysize(resource.size)} in {resource.total_parts} parts.", RNS.LOG_DEBUG)
 225                  if resource.link.callbacks.resource_started != None:
 226                      try:
 227                          resource.link.callbacks.resource_started(resource)
 228                      except Exception as e:
 229                          RNS.log("Error while executing resource started callback from "+str(resource)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
 230  
 231                  resource.hashmap_update(0, resource.hashmap_raw)
 232                  resource.watchdog_job()
 233                  return resource
 234  
 235              else:
 236                  RNS.log("Ignoring resource advertisement for "+RNS.prettyhexrep(resource.hash)+", resource already transferring", RNS.LOG_DEBUG)
 237                  return None
 238  
 239          except Exception as e:
 240              RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG)
 241              return None
 242  
 243      # Create a resource for transmission to a remote destination
 244      # The data passed can be either a bytes-array or a file opened
 245      # in binary read mode.
 246      def __init__(self, data, link, metadata=None, advertise=True, auto_compress=True, callback=None, progress_callback=None,
 247                   timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False, sent_metadata_size=0):
 248          
 249          data_size = None
 250          resource_data = None
 251          self.assembly_lock = False
 252          self.preparing_next_segment = False
 253          self.next_segment = None
 254          self.metadata = None
 255          self.has_metadata = False
 256          self.metadata_size = sent_metadata_size
 257  
 258          if metadata != None:
 259              packed_metadata = umsgpack.packb(metadata)
 260              metadata_size   = len(packed_metadata)
 261              if metadata_size > Resource.METADATA_MAX_SIZE:
 262                  raise SystemError("Resource metadata size exceeded")
 263              else:
 264                  self.metadata = struct.pack(">I", metadata_size)[1:] + packed_metadata
 265                  self.metadata_size = len(self.metadata)
 266                  self.has_metadata = True
 267          else:
 268              self.metadata = b""
 269              if sent_metadata_size > 0: self.has_metadata = True
 270  
 271          if data != None:
 272              if not hasattr(data, "read") and self.metadata_size + len(data) > Resource.MAX_EFFICIENT_SIZE:
 273                  original_data = data
 274                  data_size = len(original_data)
 275                  data = tempfile.TemporaryFile()
 276                  data.write(original_data)
 277                  del original_data
 278  
 279          if hasattr(data, "read"):
 280              if data_size == None: data_size = os.stat(data.name).st_size
 281              self.total_size = data_size + self.metadata_size
 282  
 283              if self.total_size <= Resource.MAX_EFFICIENT_SIZE:
 284                  self.total_segments = 1
 285                  self.segment_index  = 1
 286                  self.split          = False
 287                  resource_data = data.read()
 288                  data.close()
 289  
 290              else:
 291                  # self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1
 292                  # self.segment_index  = segment_index
 293                  # self.split          = True
 294                  # seek_index          = segment_index-1
 295                  # seek_position       = seek_index*Resource.MAX_EFFICIENT_SIZE
 296  
 297                  self.total_segments = ((self.total_size-1)//Resource.MAX_EFFICIENT_SIZE)+1
 298                  self.segment_index  = segment_index
 299                  self.split          = True
 300                  seek_index          = segment_index-1
 301                  first_read_size     = Resource.MAX_EFFICIENT_SIZE - self.metadata_size
 302  
 303                  if segment_index == 1:
 304                      seek_position     = 0
 305                      segment_read_size = first_read_size
 306                  else:
 307                      seek_position     = first_read_size + ((seek_index-1)*Resource.MAX_EFFICIENT_SIZE)
 308                      segment_read_size = Resource.MAX_EFFICIENT_SIZE
 309  
 310                  data.seek(seek_position)
 311                  resource_data = data.read(segment_read_size)
 312                  self.input_file = data
 313  
 314          elif isinstance(data, bytes):
 315              data_size = len(data)
 316              self.total_size = data_size + self.metadata_size
 317              
 318              resource_data = data
 319              self.total_segments = 1
 320              self.segment_index  = 1
 321              self.split          = False
 322  
 323          elif data == None:
 324              pass
 325  
 326          else:
 327              raise TypeError("Invalid data instance type passed to resource initialisation")
 328  
 329          if resource_data:
 330              if self.has_metadata: data = self.metadata + resource_data
 331              else:                 data = resource_data
 332  
 333          self.status = Resource.NONE
 334          self.link = link
 335          if self.link.mtu:
 336              self.sdu = self.link.mtu - RNS.Reticulum.HEADER_MAXSIZE - RNS.Reticulum.IFAC_MIN_SIZE
 337          else:
 338              self.sdu = link.mdu or Resource.SDU
 339          self.max_retries = Resource.MAX_RETRIES
 340          self.max_adv_retries = Resource.MAX_ADV_RETRIES
 341          self.retries_left = self.max_retries
 342          self.timeout_factor = self.link.traffic_timeout_factor
 343          self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR
 344          self.sender_grace_time = Resource.SENDER_GRACE_TIME
 345          self.hmu_retry_ok = False
 346          self.watchdog_lock = False
 347          self.__watchdog_job_id = 0
 348          self.__progress_callback = progress_callback
 349          self.rtt = None
 350          self.rtt_rxd_bytes = 0
 351          self.req_sent = 0
 352          self.req_resp_rtt_rate = 0
 353          self.rtt_rxd_bytes_at_part_req = 0
 354          self.req_data_rtt_rate = 0
 355          self.eifr = None
 356          self.previous_eifr = None
 357          self.fast_rate_rounds = 0
 358          self.very_slow_rate_rounds = 0
 359          self.request_id = request_id
 360          self.started_transferring = None
 361          self.is_response = is_response
 362          self.auto_compress_limit = Resource.AUTO_COMPRESS_MAX_SIZE
 363          self.auto_compress_option = auto_compress
 364  
 365          if type(auto_compress) == bool:
 366              self.auto_compress = auto_compress
 367          elif type(auto_compress) == int:
 368              self.auto_compress = True
 369              self.auto_compress_limit = auto_compress
 370          else:
 371              raise TypeError(f"Invalid type {type(auto_compress)} for auto_compress option")
 372  
 373          self.req_hashlist = []
 374          self.receiver_min_consecutive_height = 0
 375  
 376          if timeout != None:
 377              self.timeout = timeout
 378          else:
 379              self.timeout = self.link.rtt * self.link.traffic_timeout_factor
 380  
 381          if data != None:
 382              self.initiator         = True
 383              self.callback          = callback
 384              self.uncompressed_data = data
 385  
 386              compression_began = time.time()
 387              if self.auto_compress and data_size <= self.auto_compress_limit:
 388                  RNS.log("Compressing resource data...", RNS.LOG_EXTREME)
 389                  self.compressed_data   = bz2.compress(self.uncompressed_data)
 390                  RNS.log("Compression completed in "+str(round(time.time()-compression_began, 3))+" seconds", RNS.LOG_EXTREME)
 391              else:
 392                  self.compressed_data   = self.uncompressed_data
 393  
 394              self.uncompressed_size = len(self.uncompressed_data)
 395              self.compressed_size   = len(self.compressed_data)
 396  
 397              if (self.compressed_size < self.uncompressed_size and auto_compress):
 398                  saved_bytes = len(self.uncompressed_data) - len(self.compressed_data)
 399                  RNS.log("Compression saved "+str(saved_bytes)+" bytes, sending compressed", RNS.LOG_EXTREME)
 400  
 401                  self.data  = b""
 402                  self.data += RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE]
 403                  self.data += self.compressed_data
 404                  
 405                  self.compressed = True
 406  
 407              else:
 408                  self.data  = b""
 409                  self.data += RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE]
 410                  self.data += self.uncompressed_data
 411  
 412                  self.compressed = False
 413                  self.compressed_data = None
 414                  if self.auto_compress and data_size <= self.auto_compress_limit:
 415                      RNS.log("Compression did not decrease size, sending uncompressed", RNS.LOG_EXTREME)
 416  
 417              self.compressed_data = None
 418              self.uncompressed_data = None
 419  
 420              # Resources handle encryption directly to
 421              # make optimal use of packet MTU on an entire
 422              # encrypted stream. The Resource instance will
 423              # use it's underlying link directly to encrypt.
 424              self.data = self.link.encrypt(self.data)
 425              self.encrypted = True
 426  
 427              self.size = len(self.data)
 428              self.sent_parts = 0
 429              hashmap_entries = int(math.ceil(self.size/float(self.sdu)))
 430              self.total_parts = hashmap_entries
 431                  
 432              hashmap_ok = False
 433              while not hashmap_ok:
 434                  hashmap_computation_began = time.time()
 435                  RNS.log("Starting resource hashmap computation with "+str(hashmap_entries)+" entries...", RNS.LOG_EXTREME)
 436  
 437                  self.random_hash       = RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE]
 438                  self.hash = RNS.Identity.full_hash(data+self.random_hash)
 439                  self.truncated_hash = RNS.Identity.truncated_hash(data+self.random_hash)
 440                  self.expected_proof = RNS.Identity.full_hash(data+self.hash)
 441  
 442                  if original_hash == None:
 443                      self.original_hash = self.hash
 444                  else:
 445                      self.original_hash = original_hash
 446  
 447                  self.parts  = []
 448                  self.hashmap = b""
 449                  collision_guard_list = []
 450                  for i in range(0,hashmap_entries):
 451                      data = self.data[i*self.sdu:(i+1)*self.sdu]
 452                      map_hash = self.get_map_hash(data)
 453  
 454                      if map_hash in collision_guard_list:
 455                          RNS.log("Found hash collision in resource map, remapping...", RNS.LOG_DEBUG)
 456                          hashmap_ok = False
 457                          break
 458                      else:
 459                          hashmap_ok = True
 460                          collision_guard_list.append(map_hash)
 461                          if len(collision_guard_list) > ResourceAdvertisement.COLLISION_GUARD_SIZE:
 462                              collision_guard_list.pop(0)
 463  
 464                          part = RNS.Packet(link, data, context=RNS.Packet.RESOURCE)
 465                          part.pack()
 466                          part.map_hash = map_hash
 467  
 468                          self.hashmap += part.map_hash
 469                          self.parts.append(part)
 470  
 471                  RNS.log("Hashmap computation concluded in "+str(round(time.time()-hashmap_computation_began, 3))+" seconds", RNS.LOG_EXTREME)
 472  
 473              self.data = None
 474              if advertise:
 475                  self.advertise()
 476          else:
 477              self.receive_lock = Lock()
 478              
 479  
 480      def hashmap_update_packet(self, plaintext):
 481          if not self.status == Resource.FAILED:
 482              self.last_activity = time.time()
 483              self.retries_left = self.max_retries
 484  
 485              update = umsgpack.unpackb(plaintext[RNS.Identity.HASHLENGTH//8:])
 486              self.hashmap_update(update[0], update[1])
 487  
 488  
 489      def hashmap_update(self, segment, hashmap):
 490          if not self.status == Resource.FAILED:
 491              self.status = Resource.TRANSFERRING
 492              seg_len = ResourceAdvertisement.HASHMAP_MAX_LEN
 493              hashes = len(hashmap)//Resource.MAPHASH_LEN
 494              for i in range(0,hashes):
 495                  if self.hashmap[i+segment*seg_len] == None:
 496                      self.hashmap_height += 1
 497                  self.hashmap[i+segment*seg_len] = hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN]
 498  
 499              self.waiting_for_hmu = False
 500              self.request_next()
 501  
 502      def get_map_hash(self, data):
 503          return RNS.Identity.full_hash(data+self.random_hash)[:Resource.MAPHASH_LEN]
 504  
 505      def advertise(self):
 506          """
 507          Advertise the resource. If the other end of the link accepts
 508          the resource advertisement it will begin transferring.
 509          """
 510          thread = threading.Thread(target=self.__advertise_job, daemon=True)
 511          thread.start()
 512  
 513          if self.segment_index < self.total_segments:
 514              prepare_thread = threading.Thread(target=self.__prepare_next_segment, daemon=True)
 515              prepare_thread.start()
 516  
 517      def __advertise_job(self):
 518          self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV)
 519          while not self.link.ready_for_new_resource():
 520              self.status = Resource.QUEUED
 521              sleep(0.25)
 522  
 523          try:
 524              self.advertisement_packet.send()
 525              self.last_activity = time.time()
 526              self.started_transferring = self.last_activity
 527              self.adv_sent = self.last_activity
 528              self.rtt = None
 529              self.status = Resource.ADVERTISED
 530              self.retries_left = self.max_adv_retries
 531              self.link.register_outgoing_resource(self)
 532              RNS.log("Sent resource advertisement for "+RNS.prettyhexrep(self.hash), RNS.LOG_EXTREME)
 533          except Exception as e:
 534              RNS.log("Could not advertise resource, the contained exception was: "+str(e), RNS.LOG_ERROR)
 535              self.cancel()
 536              return
 537  
 538          self.watchdog_job()
 539  
 540      def update_eifr(self):
 541          if self.rtt == None:
 542              rtt = self.link.rtt
 543          else:
 544              rtt = self.rtt
 545  
 546          if self.req_data_rtt_rate != 0:
 547              expected_inflight_rate = self.req_data_rtt_rate*8
 548          else:
 549              if self.previous_eifr != None:
 550                  expected_inflight_rate = self.previous_eifr
 551              else:
 552                  expected_inflight_rate = self.link.establishment_cost*8 / rtt
 553  
 554          self.eifr = expected_inflight_rate
 555          if self.link: self.link.expected_rate = self.eifr
 556  
 557      def watchdog_job(self):
 558          thread = threading.Thread(target=self.__watchdog_job, daemon=True)
 559          thread.start()
 560  
 561      def __watchdog_job(self):
 562          self.__watchdog_job_id += 1
 563          this_job_id = self.__watchdog_job_id
 564  
 565          while self.status < Resource.ASSEMBLING and this_job_id == self.__watchdog_job_id:
 566              while self.watchdog_lock:
 567                  sleep(0.025)
 568  
 569              sleep_time = None
 570              if self.status == Resource.ADVERTISED:
 571                  sleep_time = (self.adv_sent+self.timeout+Resource.PROCESSING_GRACE)-time.time()
 572                  if sleep_time < 0:
 573                      if self.retries_left <= 0:
 574                          RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG)
 575                          self.cancel()
 576                          sleep_time = 0.001
 577                      else:
 578                          try:
 579                              RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG)
 580                              self.retries_left -= 1
 581                              self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV)
 582                              self.advertisement_packet.send()
 583                              self.last_activity = time.time()
 584                              self.adv_sent = self.last_activity
 585                              sleep_time = 0.001
 586                          except Exception as e:
 587                              RNS.log("Could not resend advertisement packet, cancelling resource. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
 588                              self.cancel()
 589                      
 590  
 591              elif self.status == Resource.TRANSFERRING:
 592                  if not self.initiator:
 593                      retries_used = self.max_retries - self.retries_left
 594                      extra_wait = retries_used * Resource.PER_RETRY_DELAY
 595  
 596                      self.update_eifr()
 597                      expected_tof_remaining = (self.outstanding_parts*self.sdu*8)/self.eifr
 598  
 599                      if self.req_resp_rtt_rate != 0:
 600                          sleep_time = self.last_activity + self.part_timeout_factor*expected_tof_remaining + Resource.RETRY_GRACE_TIME + extra_wait - time.time()
 601                      else:
 602                          sleep_time = self.last_activity + self.part_timeout_factor*((3*self.sdu)/self.eifr) + Resource.RETRY_GRACE_TIME + extra_wait - time.time()
 603                      
 604                      # TODO: Remove debug at some point
 605                      # RNS.log(f"EIFR {RNS.prettyspeed(self.eifr)}, ETOF {RNS.prettyshorttime(expected_tof_remaining)} ", RNS.LOG_DEBUG, pt=True)
 606                      # RNS.log(f"Resource ST {RNS.prettyshorttime(sleep_time)}, RTT {RNS.prettyshorttime(self.rtt or self.link.rtt)}, {self.outstanding_parts} left", RNS.LOG_DEBUG, pt=True)
 607                      
 608                      if sleep_time < 0:
 609                          if self.retries_left > 0:
 610                              ms = "" if self.outstanding_parts == 1 else "s"
 611                              RNS.log(f"Timed out waiting for {self.outstanding_parts} part{ms}, requesting retry on {self}", RNS.LOG_DEBUG)
 612                              if self.window > self.window_min:
 613                                  self.window -= 1
 614                                  if self.window_max > self.window_min:
 615                                      self.window_max -= 1
 616                                      if (self.window_max - self.window) > (self.window_flexibility-1):
 617                                          self.window_max -= 1
 618  
 619                              sleep_time = 0.001
 620                              self.retries_left -= 1
 621                              self.waiting_for_hmu = False
 622                              self.request_next()
 623                          else:
 624                              self.cancel()
 625                              sleep_time = 0.001
 626                  else:
 627                      max_extra_wait = sum([(r+1) * Resource.PER_RETRY_DELAY for r in range(self.MAX_RETRIES)])
 628                      max_wait = self.rtt * self.timeout_factor * self.max_retries + self.sender_grace_time + max_extra_wait
 629                      sleep_time = self.last_activity + max_wait - time.time()
 630                      if sleep_time < 0:
 631                          RNS.log("Resource timed out waiting for part requests", RNS.LOG_DEBUG)
 632                          self.cancel()
 633                          sleep_time = 0.001
 634  
 635              elif self.status == Resource.AWAITING_PROOF:
 636                  # Decrease timeout factor since proof packets are
 637                  # significantly smaller than full req/resp roundtrip
 638                  self.timeout_factor = Resource.PROOF_TIMEOUT_FACTOR
 639  
 640                  sleep_time = self.last_part_sent + (self.rtt*self.timeout_factor+self.sender_grace_time) - time.time()
 641                  if sleep_time < 0:
 642                      if self.retries_left <= 0:
 643                          RNS.log("Resource timed out waiting for proof", RNS.LOG_DEBUG)
 644                          self.cancel()
 645                          sleep_time = 0.001
 646                      else:
 647                          RNS.log("All parts sent, but no resource proof received, querying network cache...", RNS.LOG_DEBUG)
 648                          self.retries_left -= 1
 649                          expected_data = self.hash + self.expected_proof
 650                          expected_proof_packet = RNS.Packet(self.link, expected_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF)
 651                          expected_proof_packet.pack()
 652                          RNS.Transport.cache_request(expected_proof_packet.packet_hash, self.link)
 653                          self.last_part_sent = time.time()
 654                          sleep_time = 0.001
 655  
 656              elif self.status == Resource.REJECTED:
 657                  sleep_time = 0.001
 658  
 659              if sleep_time == 0:
 660                  RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_DEBUG)
 661              if sleep_time == None or sleep_time < 0:
 662                  RNS.log("Timing error, cancelling resource transfer.", RNS.LOG_ERROR)
 663                  self.cancel()
 664              
 665              if sleep_time != None:
 666                  sleep(min(sleep_time, Resource.WATCHDOG_MAX_SLEEP))
 667  
 668      def assemble(self):
 669          if not self.status == Resource.FAILED:
 670              try:
 671                  self.status = Resource.ASSEMBLING
 672                  stream = b"".join(self.parts)
 673  
 674                  if self.encrypted: data = self.link.decrypt(stream)
 675                  else: data = stream
 676  
 677                  # Strip off random hash
 678                  data = data[Resource.RANDOM_HASH_SIZE:]
 679  
 680                  if self.compressed: self.data = bz2.decompress(data)
 681                  else: self.data = data
 682  
 683                  calculated_hash = RNS.Identity.full_hash(self.data+self.random_hash)
 684                  if calculated_hash == self.hash:
 685                      if self.has_metadata and self.segment_index == 1:
 686                          # TODO: Add early metadata_ready callback
 687                          metadata_size = self.data[0] << 16 | self.data[1] << 8 | self.data[2]
 688                          packed_metadata = self.data[3:3+metadata_size]
 689                          metadata_file = open(self.meta_storagepath, "wb")
 690                          metadata_file.write(packed_metadata)
 691                          metadata_file.close()
 692                          del packed_metadata
 693                          data = self.data[3+metadata_size:]
 694                      else:
 695                          data = self.data
 696  
 697                      self.file = open(self.storagepath, "ab")
 698                      self.file.write(data)
 699                      self.file.close()
 700                      self.status = Resource.COMPLETE
 701                      del data
 702                      self.prove()
 703                  
 704                  else: self.status = Resource.CORRUPT
 705  
 706  
 707              except Exception as e:
 708                  RNS.log("Error while assembling received resource.", RNS.LOG_ERROR)
 709                  RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
 710                  self.status = Resource.CORRUPT
 711  
 712              self.link.resource_concluded(self)
 713  
 714              if self.segment_index == self.total_segments:
 715                  if self.callback != None:
 716                      if not os.path.isfile(self.meta_storagepath):
 717                          self.metadata = None
 718                      else:
 719                          metadata_file = open(self.meta_storagepath, "rb")
 720                          self.metadata = umsgpack.unpackb(metadata_file.read())
 721                          metadata_file.close()
 722                          try: os.unlink(self.meta_storagepath)
 723                          except Exception as e:
 724                              RNS.log(f"Error while cleaning up resource metadata file, the contained exception was: {e}", RNS.LOG_ERROR)
 725  
 726                      self.data = open(self.storagepath, "rb")
 727                      try: self.callback(self)
 728                      except Exception as e:
 729                          RNS.log("Error while executing resource assembled callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
 730  
 731                  try:
 732                      if hasattr(self.data, "close") and callable(self.data.close): self.data.close()
 733                      if os.path.isfile(self.storagepath): os.unlink(self.storagepath)
 734  
 735                  except Exception as e:
 736                      RNS.log(f"Error while cleaning up resource files, the contained exception was: {e}", RNS.LOG_ERROR)
 737              else:
 738                  RNS.log("Resource segment "+str(self.segment_index)+" of "+str(self.total_segments)+" received, waiting for next segment to be announced", RNS.LOG_DEBUG)
 739  
 740  
 741      def prove(self):
 742          if not self.status == Resource.FAILED:
 743              try:
 744                  proof = RNS.Identity.full_hash(self.data+self.hash)
 745                  proof_data = self.hash+proof
 746                  proof_packet = RNS.Packet(self.link, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF)
 747                  proof_packet.send()
 748                  RNS.Transport.cache(proof_packet, force_cache=True)
 749              except Exception as e:
 750                  RNS.log("Could not send proof packet, cancelling resource", RNS.LOG_DEBUG)
 751                  RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
 752                  self.cancel()
 753  
 754      def __prepare_next_segment(self):
 755          # Prepare the next segment for advertisement
 756          RNS.log(f"Preparing segment {self.segment_index+1} of {self.total_segments} for resource {self}", RNS.LOG_DEBUG)
 757          self.preparing_next_segment = True
 758          self.next_segment = Resource(
 759              self.input_file, self.link,
 760              callback = self.callback,
 761              segment_index = self.segment_index+1,
 762              original_hash=self.original_hash,
 763              progress_callback = self.__progress_callback,
 764              request_id = self.request_id,
 765              is_response = self.is_response,
 766              advertise = False,
 767              auto_compress = self.auto_compress_option,
 768              sent_metadata_size = self.metadata_size,
 769          )
 770  
 771      def validate_proof(self, proof_data):
 772          if not self.status == Resource.FAILED:
 773              if len(proof_data) == RNS.Identity.HASHLENGTH//8*2:
 774                  if proof_data[RNS.Identity.HASHLENGTH//8:] == self.expected_proof:
 775                      self.status = Resource.COMPLETE
 776                      self.link.resource_concluded(self)
 777                      if self.segment_index == self.total_segments:
 778                          # If all segments were processed, we'll
 779                          # signal that the resource sending concluded
 780                          if self.callback != None:
 781                              try: self.callback(self)
 782                              except Exception as e: RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
 783                              finally:
 784                                  try:
 785                                      if hasattr(self, "input_file"):
 786                                          if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close()
 787                                  except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR)
 788                          else:
 789                              try:
 790                                  if hasattr(self, "input_file"):
 791                                      if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close()
 792                              except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR)
 793                      else:
 794                          # Otherwise we'll recursively create the
 795                          # next segment of the resource
 796                          if not self.preparing_next_segment:
 797                              RNS.log(f"Next segment preparation for resource {self} was not started yet, manually preparing now. This will cause transfer slowdown.", RNS.LOG_WARNING)
 798                              self.__prepare_next_segment()
 799  
 800                          while self.next_segment == None: time.sleep(0.05)
 801  
 802                          self.data = None
 803                          self.metadata = None
 804                          self.parts = None
 805                          self.input_file = None
 806                          self.link = None
 807                          self.req_hashlist = None
 808                          self.hashmap = None
 809  
 810                          self.next_segment.advertise()
 811                  else:
 812                      pass
 813              else:
 814                  pass
 815  
 816  
 817      def receive_part(self, packet):
 818          with self.receive_lock:
 819  
 820              self.receiving_part = True
 821              self.last_activity = time.time()
 822              self.retries_left = self.max_retries
 823  
 824              if self.req_resp == None:
 825                  self.req_resp = self.last_activity
 826                  rtt = self.req_resp-self.req_sent
 827                  
 828                  self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT
 829                  if self.rtt == None:
 830                      self.rtt = self.link.rtt
 831                      self.watchdog_job()
 832                  elif rtt < self.rtt:
 833                      self.rtt = max(self.rtt - self.rtt*0.05, rtt)
 834                  elif rtt > self.rtt:
 835                      self.rtt = min(self.rtt + self.rtt*0.05, rtt)
 836  
 837                  if rtt > 0:
 838                      req_resp_cost = len(packet.raw)+self.req_sent_bytes
 839                      self.req_resp_rtt_rate = req_resp_cost / rtt
 840  
 841                      if self.req_resp_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD:
 842                          self.fast_rate_rounds += 1
 843  
 844                          if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD:
 845                              self.window_max = Resource.WINDOW_MAX_FAST
 846  
 847              if not self.status == Resource.FAILED:
 848                  self.status = Resource.TRANSFERRING
 849                  part_data = packet.data
 850                  part_hash = self.get_map_hash(part_data)
 851  
 852                  consecutive_index = self.consecutive_completed_height if self.consecutive_completed_height >= 0 else 0
 853                  i = consecutive_index
 854                  for map_hash in self.hashmap[consecutive_index:consecutive_index+self.window]:
 855                      if map_hash == part_hash:
 856                          if self.parts[i] == None:
 857  
 858                              # Insert data into parts list
 859                              self.parts[i] = part_data
 860                              self.rtt_rxd_bytes += len(part_data)
 861                              self.received_count += 1
 862                              self.outstanding_parts -= 1
 863  
 864                              # Update consecutive completed pointer
 865                              if i == self.consecutive_completed_height + 1:
 866                                  self.consecutive_completed_height = i
 867                              
 868                              cp = self.consecutive_completed_height + 1
 869                              while cp < len(self.parts) and self.parts[cp] != None:
 870                                  self.consecutive_completed_height = cp
 871                                  cp += 1
 872  
 873                              if self.__progress_callback != None:
 874                                  try:
 875                                      self.__progress_callback(self)
 876                                  except Exception as e:
 877                                      RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
 878  
 879                      i += 1
 880  
 881                  self.receiving_part = False
 882  
 883                  if self.received_count == self.total_parts and not self.assembly_lock:
 884                      self.assembly_lock = True
 885                      threading.Thread(target=self.assemble, daemon=True).start()
 886                  elif self.outstanding_parts == 0:
 887                      # TODO: Figure out if there is a mathematically
 888                      # optimal way to adjust windows
 889                      if self.window < self.window_max:
 890                          self.window += 1
 891                          if (self.window - self.window_min) > (self.window_flexibility-1):
 892                              self.window_min += 1
 893  
 894                      if self.req_sent != 0:
 895                          rtt = time.time()-self.req_sent
 896                          req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req
 897  
 898                          if rtt != 0:
 899                              self.req_data_rtt_rate = req_transferred/rtt
 900                              self.update_eifr()
 901                              self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes
 902  
 903                              if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD:
 904                                  self.fast_rate_rounds += 1
 905  
 906                                  if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD:
 907                                      self.window_max = Resource.WINDOW_MAX_FAST
 908  
 909                              if self.fast_rate_rounds == 0 and self.req_data_rtt_rate < Resource.RATE_VERY_SLOW and self.very_slow_rate_rounds < Resource.VERY_SLOW_RATE_THRESHOLD:
 910                                  self.very_slow_rate_rounds += 1
 911  
 912                                  if self.very_slow_rate_rounds == Resource.VERY_SLOW_RATE_THRESHOLD:
 913                                      self.window_max = Resource.WINDOW_MAX_VERY_SLOW
 914  
 915                      self.request_next()
 916              else:
 917                  self.receiving_part = False
 918  
 919      # Called on incoming resource to send a request for more data
 920      def request_next(self):
 921          while self.receiving_part:
 922              sleep(0.001)
 923  
 924          if not self.status == Resource.FAILED:
 925              if not self.waiting_for_hmu:
 926                  self.outstanding_parts = 0
 927                  hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED
 928                  requested_hashes = b""
 929  
 930                  i = 0; pn = self.consecutive_completed_height+1
 931                  search_start = pn
 932                  search_size = self.window
 933                  
 934                  for part in self.parts[search_start:search_start+search_size]:
 935                      if part == None:
 936                          part_hash = self.hashmap[pn]
 937                          if part_hash != None:
 938                              requested_hashes += part_hash
 939                              self.outstanding_parts += 1
 940                              i += 1
 941                          else:
 942                              hashmap_exhausted = Resource.HASHMAP_IS_EXHAUSTED
 943  
 944                      pn += 1
 945                      if i >= self.window or hashmap_exhausted == Resource.HASHMAP_IS_EXHAUSTED:
 946                          break
 947  
 948                  hmu_part = bytes([hashmap_exhausted])
 949                  if hashmap_exhausted == Resource.HASHMAP_IS_EXHAUSTED:
 950                      last_map_hash = self.hashmap[self.hashmap_height-1]
 951                      hmu_part += last_map_hash
 952                      self.waiting_for_hmu = True
 953  
 954                  request_data = hmu_part + self.hash + requested_hashes
 955                  request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ)
 956  
 957                  try:
 958                      request_packet.send()
 959                      self.last_activity = time.time()
 960                      self.req_sent = self.last_activity
 961                      self.req_sent_bytes = len(request_packet.raw)
 962                      self.req_resp = None
 963  
 964                  except Exception as e:
 965                      RNS.log("Could not send resource request packet, cancelling resource", RNS.LOG_DEBUG)
 966                      RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
 967                      self.cancel()
 968  
 969      # Called on outgoing resource to make it send more data
 970      def request(self, request_data):
 971          if not self.status == Resource.FAILED:
 972              rtt = time.time() - self.adv_sent
 973              if self.rtt == None:
 974                  self.rtt = rtt
 975  
 976              if self.status != Resource.TRANSFERRING:
 977                  self.status = Resource.TRANSFERRING
 978                  self.watchdog_job()
 979  
 980              self.retries_left = self.max_retries
 981  
 982              wants_more_hashmap = True if request_data[0] == Resource.HASHMAP_IS_EXHAUSTED else False
 983              pad = 1+Resource.MAPHASH_LEN if wants_more_hashmap else 1
 984  
 985              requested_hashes = request_data[pad+RNS.Identity.HASHLENGTH//8:]
 986  
 987              # Define the search scope
 988              search_start = self.receiver_min_consecutive_height
 989              search_end   = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE
 990  
 991              map_hashes = []
 992              for i in range(0,len(requested_hashes)//Resource.MAPHASH_LEN):
 993                  map_hash = requested_hashes[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN]
 994                  map_hashes.append(map_hash)
 995  
 996              search_scope = self.parts[search_start:search_end]
 997              requested_parts = list(filter(lambda part: part.map_hash in map_hashes, search_scope))
 998  
 999              for part in requested_parts:
1000                  try:
1001                      if not part.sent:
1002                          part.send()
1003                          self.sent_parts += 1
1004                      else:
1005                          part.resend()
1006  
1007                      self.last_activity = time.time()
1008                      self.last_part_sent = self.last_activity
1009  
1010                  except Exception as e:
1011                      RNS.log("Resource could not send parts, cancelling transfer!", RNS.LOG_DEBUG)
1012                      RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
1013                      self.cancel()
1014              
1015              if wants_more_hashmap:
1016                  last_map_hash = request_data[1:Resource.MAPHASH_LEN+1]
1017                  
1018                  part_index   = self.receiver_min_consecutive_height
1019                  search_start = part_index
1020                  search_end   = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE
1021                  for part in self.parts[search_start:search_end]:
1022                      part_index += 1
1023                      if part.map_hash == last_map_hash:
1024                          break
1025  
1026                  self.receiver_min_consecutive_height = max(part_index-1-Resource.WINDOW_MAX, 0)
1027  
1028                  if part_index % ResourceAdvertisement.HASHMAP_MAX_LEN != 0:
1029                      RNS.log("Resource sequencing error, cancelling transfer!", RNS.LOG_ERROR)
1030                      self.cancel()
1031                      return
1032                  else:
1033                      segment = part_index // ResourceAdvertisement.HASHMAP_MAX_LEN
1034  
1035                  
1036                  hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN
1037                  hashmap_end   = min((segment+1)*ResourceAdvertisement.HASHMAP_MAX_LEN, len(self.parts))
1038  
1039                  hashmap = b""
1040                  for i in range(hashmap_start,hashmap_end):
1041                      hashmap += self.hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN]
1042  
1043                  hmu = self.hash+umsgpack.packb([segment, hashmap])
1044                  hmu_packet = RNS.Packet(self.link, hmu, context = RNS.Packet.RESOURCE_HMU)
1045  
1046                  try:
1047                      hmu_packet.send()
1048                      self.last_activity = time.time()
1049                  except Exception as e:
1050                      RNS.log("Could not send resource HMU packet, cancelling resource", RNS.LOG_DEBUG)
1051                      RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
1052                      self.cancel()
1053  
1054              if self.sent_parts == len(self.parts):
1055                  self.status = Resource.AWAITING_PROOF
1056                  self.retries_left = 3
1057  
1058              if self.__progress_callback != None:
1059                  try:
1060                      self.__progress_callback(self)
1061                  except Exception as e:
1062                      RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1063  
1064      def cancel(self):
1065          """
1066          Cancels transferring the resource.
1067          """
1068          if self.status < Resource.COMPLETE:
1069              self.status = Resource.FAILED
1070              if self.initiator:
1071                  if self.link.status == RNS.Link.ACTIVE:
1072                      try:
1073                          cancel_packet = RNS.Packet(self.link, self.hash, context=RNS.Packet.RESOURCE_ICL)
1074                          cancel_packet.send()
1075                      except Exception as e:
1076                          RNS.log("Could not send resource cancel packet, the contained exception was: "+str(e), RNS.LOG_ERROR)
1077                  self.link.cancel_outgoing_resource(self)
1078              else:
1079                  self.link.cancel_incoming_resource(self)
1080              
1081              if self.callback != None:
1082                  try:
1083                      self.link.resource_concluded(self)
1084                      self.callback(self)
1085                  except Exception as e:
1086                      RNS.log("Error while executing callbacks on resource cancel from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1087  
1088      def _rejected(self):
1089          if self.status < Resource.COMPLETE:
1090              if self.initiator:
1091                  self.status = Resource.REJECTED
1092                  self.link.cancel_outgoing_resource(self)
1093                  if self.callback != None:
1094                      try:
1095                          self.link.resource_concluded(self)
1096                          def job(): self.callback(self)
1097                          threading.Thread(target=job, daemon=True).start()
1098                      except Exception as e:
1099                          RNS.log("Error while executing callbacks on resource reject from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
1100  
1101      def set_callback(self, callback):
1102          self.callback = callback
1103  
1104      def progress_callback(self, callback):
1105          self.__progress_callback = callback
1106  
1107      def get_progress(self):
1108          """
1109          :returns: The current progress of the resource transfer as a *float* between 0.0 and 1.0.
1110          """
1111          if self.status == RNS.Resource.COMPLETE and self.segment_index == self.total_segments:
1112              return 1.0
1113          
1114          elif self.initiator:
1115              if not self.split:
1116                  self.processed_parts = self.sent_parts
1117                  self.progress_total_parts = float(self.total_parts)
1118  
1119              else:
1120                  is_last_segment = self.segment_index != self.total_segments
1121                  total_segments = self.total_segments
1122                  processed_segments = self.segment_index-1
1123  
1124                  current_segment_parts = self.total_parts
1125                  max_parts_per_segment = math.ceil(Resource.MAX_EFFICIENT_SIZE/self.sdu)
1126  
1127                  previously_processed_parts = processed_segments*max_parts_per_segment
1128  
1129                  if current_segment_parts < max_parts_per_segment:
1130                      current_segment_factor = max_parts_per_segment / current_segment_parts
1131                  else:
1132                      current_segment_factor = 1
1133  
1134                  self.processed_parts = previously_processed_parts + self.sent_parts*current_segment_factor
1135                  self.progress_total_parts = self.total_segments*max_parts_per_segment
1136  
1137          else:
1138              if not self.split:
1139                  self.processed_parts = self.received_count
1140                  self.progress_total_parts = float(self.total_parts)
1141  
1142              else:
1143                  is_last_segment = self.segment_index != self.total_segments
1144                  total_segments = self.total_segments
1145                  processed_segments = self.segment_index-1
1146  
1147                  current_segment_parts = self.total_parts
1148                  max_parts_per_segment = math.ceil(Resource.MAX_EFFICIENT_SIZE/self.sdu)
1149  
1150                  previously_processed_parts = processed_segments*max_parts_per_segment
1151  
1152                  if current_segment_parts < max_parts_per_segment:
1153                      current_segment_factor = max_parts_per_segment / current_segment_parts
1154                  else:
1155                      current_segment_factor = 1
1156  
1157                  self.processed_parts = previously_processed_parts + self.received_count*current_segment_factor
1158                  self.progress_total_parts = self.total_segments*max_parts_per_segment
1159  
1160  
1161          progress = min(1.0, self.processed_parts / self.progress_total_parts)
1162          return progress
1163  
1164      def get_segment_progress(self):
1165          if self.status == RNS.Resource.COMPLETE and self.segment_index == self.total_segments:
1166              return 1.0
1167          elif self.initiator:
1168              processed_parts = self.sent_parts
1169          else:
1170              processed_parts = self.received_count
1171  
1172          progress = min(1.0, processed_parts / self.total_parts)
1173          return progress
1174  
1175      def get_transfer_size(self):
1176          """
1177          :returns: The number of bytes needed to transfer the resource.
1178          """
1179          return self.size
1180  
1181      def get_data_size(self):
1182          """
1183          :returns: The total data size of the resource.
1184          """
1185          return self.total_size
1186  
1187      def get_parts(self):
1188          """
1189          :returns: The number of parts the resource will be transferred in.
1190          """
1191          return self.total_parts
1192  
1193      def get_segments(self):
1194          """
1195          :returns: The number of segments the resource is divided into.
1196          """
1197          return self.total_segments
1198  
1199      def get_hash(self):
1200          """
1201          :returns: The hash of the resource.
1202          """
1203          return self.hash
1204  
1205      def is_compressed(self):
1206          """
1207          :returns: Whether the resource is compressed.
1208          """
1209          return self.compressed
1210  
1211      def __str__(self):
1212          return "<"+RNS.hexrep(self.hash,delimit=False)+"/"+RNS.hexrep(self.link.link_id,delimit=False)+">"
1213  
1214  
1215  class ResourceAdvertisement:
1216      OVERHEAD             = 134
1217      HASHMAP_MAX_LEN      = math.floor((RNS.Link.MDU-OVERHEAD)/Resource.MAPHASH_LEN)
1218      COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN
1219  
1220      assert HASHMAP_MAX_LEN > 0, "The configured MTU is too small to include any map hashes in resource advertisments"
1221  
1222      @staticmethod
1223      def is_request(advertisement_packet):
1224          adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
1225          if adv.q != None and adv.u:
1226              return True
1227          else:
1228              return False
1229  
1230  
1231      @staticmethod
1232      def is_response(advertisement_packet):
1233          adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
1234  
1235          if adv.q != None and adv.p:
1236              return True
1237          else:
1238              return False
1239  
1240  
1241      @staticmethod
1242      def read_request_id(advertisement_packet):
1243          adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
1244          return adv.q
1245  
1246  
1247      @staticmethod
1248      def read_transfer_size(advertisement_packet):
1249          adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
1250          return adv.t
1251  
1252  
1253      @staticmethod
1254      def read_size(advertisement_packet):
1255          adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
1256          return adv.d
1257  
1258  
1259      def __init__(self, resource=None, request_id=None, is_response=False):
1260          self.link = None
1261          if resource != None:
1262              self.t = resource.size              # Transfer size
1263              self.d = resource.total_size        # Total uncompressed data size
1264              self.n = len(resource.parts)        # Number of parts
1265              self.h = resource.hash              # Resource hash
1266              self.r = resource.random_hash       # Resource random hash
1267              self.o = resource.original_hash     # First-segment hash
1268              self.m = resource.hashmap           # Resource hashmap
1269              self.c = resource.compressed        # Compression flag
1270              self.e = resource.encrypted         # Encryption flag
1271              self.s = resource.split             # Split flag
1272              self.x = resource.has_metadata      # Metadata flag
1273              self.i = resource.segment_index     # Segment index
1274              self.l = resource.total_segments    # Total segments
1275              self.q = resource.request_id        # ID of associated request
1276              self.u = False                      # Is request flag
1277              self.p = False                      # Is response flag
1278  
1279              if self.q != None:
1280                  if not resource.is_response:
1281                      self.u = True
1282                      self.p = False
1283                  else:
1284                      self.u = False
1285                      self.p = True
1286  
1287              # Flags
1288              self.f = 0x00 | self.x << 5 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e
1289  
1290      def get_transfer_size(self):
1291          return self.t
1292  
1293      def get_data_size(self):
1294          return self.d
1295  
1296      def get_parts(self):
1297          return self.n
1298  
1299      def get_segments(self):
1300          return self.l
1301  
1302      def get_hash(self):
1303          return self.h
1304  
1305      def is_compressed(self):
1306          return self.c
1307  
1308      def has_metadata(self):
1309          return self.x
1310  
1311      def get_link(self):
1312          return self.link
1313  
1314      def pack(self, segment=0):
1315          hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN
1316          hashmap_end   = min((segment+1)*(ResourceAdvertisement.HASHMAP_MAX_LEN), self.n)
1317  
1318          hashmap = b""
1319          for i in range(hashmap_start,hashmap_end):
1320              hashmap += self.m[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN]
1321  
1322          dictionary = {
1323              "t": self.t,    # Transfer size
1324              "d": self.d,    # Data size
1325              "n": self.n,    # Number of parts
1326              "h": self.h,    # Resource hash
1327              "r": self.r,    # Resource random hash
1328              "o": self.o,    # Original hash
1329              "i": self.i,    # Segment index
1330              "l": self.l,    # Total segments
1331              "q": self.q,    # Request ID
1332              "f": self.f,    # Resource flags
1333              "m": hashmap
1334          }
1335  
1336          return umsgpack.packb(dictionary)
1337  
1338  
1339      @staticmethod
1340      def unpack(data):
1341          dictionary = umsgpack.unpackb(data)
1342          
1343          adv   = ResourceAdvertisement()
1344          adv.t = dictionary["t"]
1345          adv.d = dictionary["d"]
1346          adv.n = dictionary["n"]
1347          adv.h = dictionary["h"]
1348          adv.r = dictionary["r"]
1349          adv.o = dictionary["o"]
1350          adv.m = dictionary["m"]
1351          adv.f = dictionary["f"]
1352          adv.i = dictionary["i"]
1353          adv.l = dictionary["l"]
1354          adv.q = dictionary["q"]
1355          adv.e = True if (adv.f & 0x01) == 0x01 else False
1356          adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False
1357          adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False
1358          adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False
1359          adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False
1360          adv.x = True if ((adv.f >> 5) & 0x01) == 0x01 else False
1361  
1362          return adv