Resource.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import RNS 32 import os 33 import bz2 34 import math 35 import time 36 import struct 37 import tempfile 38 import threading 39 from threading import Lock 40 from .vendor import umsgpack as umsgpack 41 from time import sleep 42 43 class Resource: 44 """ 45 The Resource class allows transferring arbitrary amounts 46 of data over a link. It will automatically handle sequencing, 47 compression, coordination and checksumming. 48 49 :param data: The data to be transferred. Can be *bytes* or an open *file handle*. See the :ref:`Filetransfer Example<example-filetransfer>` for details. 50 :param link: The :ref:`RNS.Link<api-link>` instance on which to transfer the data. 51 :param advertise: Optional. Whether to automatically advertise the resource. Can be *True* or *False*. 52 :param auto_compress: Optional. Whether to auto-compress the resource. Can be *True* or *False*. 53 :param callback: An optional *callable* with the signature *callback(resource)*. Will be called when the resource transfer concludes. 54 :param progress_callback: An optional *callable* with the signature *callback(resource)*. Will be called whenever the resource transfer progress is updated. 55 """ 56 57 # The initial window size at beginning of transfer 58 WINDOW = 4 59 60 # Absolute minimum window size during transfer 61 WINDOW_MIN = 2 62 63 # The maximum window size for transfers on slow links 64 WINDOW_MAX_SLOW = 10 65 66 # The maximum window size for transfers on very slow links 67 WINDOW_MAX_VERY_SLOW = 4 68 69 # The maximum window size for transfers on fast links 70 WINDOW_MAX_FAST = 75 71 72 # For calculating maps and guard segments, this 73 # must be set to the global maximum window. 74 WINDOW_MAX = WINDOW_MAX_FAST 75 76 # If the fast rate is sustained for this many request 77 # rounds, the fast link window size will be allowed. 78 FAST_RATE_THRESHOLD = WINDOW_MAX_SLOW - WINDOW - 2 79 80 # If the very slow rate is sustained for this many request 81 # rounds, window will be capped to the very slow limit. 82 VERY_SLOW_RATE_THRESHOLD = 2 83 84 # If the RTT rate is higher than this value, 85 # the max window size for fast links will be used. 86 # The default is 50 Kbps (the value is stored in 87 # bytes per second, hence the "/ 8"). 88 RATE_FAST = (50*1000) / 8 89 90 # If the RTT rate is lower than this value, 91 # the window size will be capped at . 92 # The default is 50 Kbps (the value is stored in 93 # bytes per second, hence the "/ 8"). 94 RATE_VERY_SLOW = (2*1000) / 8 95 96 # The minimum allowed flexibility of the window size. 97 # The difference between window_max and window_min 98 # will never be smaller than this value. 99 WINDOW_FLEXIBILITY = 4 100 101 # Number of bytes in a map hash 102 MAPHASH_LEN = 4 103 SDU = RNS.Packet.MDU 104 RANDOM_HASH_SIZE = 4 105 106 # This is an indication of what the 107 # maximum size a resource should be, if 108 # it is to be handled within reasonable 109 # time constraint, even on small systems. 110 # 111 # This constant will be used when determining 112 # how to sequence the sending of large resources. 113 # 114 # Capped at 16777215 (0xFFFFFF) per segment to 115 # fit in 3 bytes in resource advertisements. 116 MAX_EFFICIENT_SIZE = 1 * 1024 * 1024 - 1 117 RESPONSE_MAX_GRACE_TIME = 10 118 119 # Max metadata size is 16777215 (0xFFFFFF) bytes 120 METADATA_MAX_SIZE = 16 * 1024 * 1024 - 1 121 122 # The maximum size to auto-compress with 123 # bz2 before sending. 124 AUTO_COMPRESS_MAX_SIZE = 64 * 1024 * 1024 125 126 PART_TIMEOUT_FACTOR = 4 127 PART_TIMEOUT_FACTOR_AFTER_RTT = 2 128 PROOF_TIMEOUT_FACTOR = 3 129 MAX_RETRIES = 16 130 MAX_ADV_RETRIES = 4 131 SENDER_GRACE_TIME = 10.0 132 PROCESSING_GRACE = 1.0 133 RETRY_GRACE_TIME = 0.25 134 PER_RETRY_DELAY = 0.5 135 136 WATCHDOG_MAX_SLEEP = 1 137 138 HASHMAP_IS_NOT_EXHAUSTED = 0x00 139 HASHMAP_IS_EXHAUSTED = 0xFF 140 141 # Status constants 142 NONE = 0x00 143 QUEUED = 0x01 144 ADVERTISED = 0x02 145 TRANSFERRING = 0x03 146 AWAITING_PROOF = 0x04 147 ASSEMBLING = 0x05 148 COMPLETE = 0x06 149 FAILED = 0x07 150 CORRUPT = 0x08 151 REJECTED = 0x00 152 153 @staticmethod 154 def reject(advertisement_packet): 155 try: 156 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 157 resource_hash = adv.h 158 reject_packet = RNS.Packet(advertisement_packet.link, resource_hash, context=RNS.Packet.RESOURCE_RCL) 159 reject_packet.send() 160 161 except Exception as e: 162 RNS.log(f"An error ocurred while rejecting advertised resource: {e}", RNS.LOG_ERROR) 163 RNS.trace_exception(e) 164 165 @staticmethod 166 def accept(advertisement_packet, callback=None, progress_callback = None, request_id = None): 167 try: 168 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 169 170 resource = Resource(None, advertisement_packet.link, request_id = request_id) 171 resource.status = Resource.TRANSFERRING 172 173 resource.flags = adv.f 174 resource.size = adv.t 175 resource.total_size = adv.d 176 resource.uncompressed_size = adv.d 177 resource.hash = adv.h 178 resource.original_hash = adv.o 179 resource.random_hash = adv.r 180 resource.hashmap_raw = adv.m 181 resource.encrypted = True if resource.flags & 0x01 else False 182 resource.compressed = True if resource.flags >> 1 & 0x01 else False 183 resource.initiator = False 184 resource.callback = callback 185 resource.__progress_callback = progress_callback 186 resource.total_parts = int(math.ceil(resource.size/float(resource.sdu))) 187 resource.received_count = 0 188 resource.outstanding_parts = 0 189 resource.parts = [None] * resource.total_parts 190 resource.window = Resource.WINDOW 191 resource.window_max = Resource.WINDOW_MAX_SLOW 192 resource.window_min = Resource.WINDOW_MIN 193 resource.window_flexibility = Resource.WINDOW_FLEXIBILITY 194 resource.last_activity = time.time() 195 resource.started_transferring = resource.last_activity 196 197 resource.storagepath = RNS.Reticulum.resourcepath+"/"+resource.original_hash.hex() 198 resource.meta_storagepath = resource.storagepath+".meta" 199 resource.segment_index = adv.i 200 resource.total_segments = adv.l 201 202 if adv.l > 1: resource.split = True 203 else: resource.split = False 204 205 if adv.x: resource.has_metadata = True 206 else: resource.has_metadata = False 207 208 resource.hashmap = [None] * resource.total_parts 209 resource.hashmap_height = 0 210 resource.waiting_for_hmu = False 211 resource.receiving_part = False 212 resource.consecutive_completed_height = -1 213 214 previous_window = resource.link.get_last_resource_window() 215 previous_eifr = resource.link.get_last_resource_eifr() 216 if previous_window: 217 resource.window = previous_window 218 if previous_eifr: 219 resource.previous_eifr = previous_eifr 220 221 if not resource.link.has_incoming_resource(resource): 222 resource.link.register_incoming_resource(resource) 223 224 RNS.log(f"Accepting resource advertisement for {RNS.prettyhexrep(resource.hash)}. Transfer size is {RNS.prettysize(resource.size)} in {resource.total_parts} parts.", RNS.LOG_DEBUG) 225 if resource.link.callbacks.resource_started != None: 226 try: 227 resource.link.callbacks.resource_started(resource) 228 except Exception as e: 229 RNS.log("Error while executing resource started callback from "+str(resource)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 230 231 resource.hashmap_update(0, resource.hashmap_raw) 232 resource.watchdog_job() 233 return resource 234 235 else: 236 RNS.log("Ignoring resource advertisement for "+RNS.prettyhexrep(resource.hash)+", resource already transferring", RNS.LOG_DEBUG) 237 return None 238 239 except Exception as e: 240 RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG) 241 return None 242 243 # Create a resource for transmission to a remote destination 244 # The data passed can be either a bytes-array or a file opened 245 # in binary read mode. 246 def __init__(self, data, link, metadata=None, advertise=True, auto_compress=True, callback=None, progress_callback=None, 247 timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False, sent_metadata_size=0): 248 249 data_size = None 250 resource_data = None 251 self.assembly_lock = False 252 self.preparing_next_segment = False 253 self.next_segment = None 254 self.metadata = None 255 self.has_metadata = False 256 self.metadata_size = sent_metadata_size 257 258 if metadata != None: 259 packed_metadata = umsgpack.packb(metadata) 260 metadata_size = len(packed_metadata) 261 if metadata_size > Resource.METADATA_MAX_SIZE: 262 raise SystemError("Resource metadata size exceeded") 263 else: 264 self.metadata = struct.pack(">I", metadata_size)[1:] + packed_metadata 265 self.metadata_size = len(self.metadata) 266 self.has_metadata = True 267 else: 268 self.metadata = b"" 269 if sent_metadata_size > 0: self.has_metadata = True 270 271 if data != None: 272 if not hasattr(data, "read") and self.metadata_size + len(data) > Resource.MAX_EFFICIENT_SIZE: 273 original_data = data 274 data_size = len(original_data) 275 data = tempfile.TemporaryFile() 276 data.write(original_data) 277 del original_data 278 279 if hasattr(data, "read"): 280 if data_size == None: data_size = os.stat(data.name).st_size 281 self.total_size = data_size + self.metadata_size 282 283 if self.total_size <= Resource.MAX_EFFICIENT_SIZE: 284 self.total_segments = 1 285 self.segment_index = 1 286 self.split = False 287 resource_data = data.read() 288 data.close() 289 290 else: 291 # self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 292 # self.segment_index = segment_index 293 # self.split = True 294 # seek_index = segment_index-1 295 # seek_position = seek_index*Resource.MAX_EFFICIENT_SIZE 296 297 self.total_segments = ((self.total_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 298 self.segment_index = segment_index 299 self.split = True 300 seek_index = segment_index-1 301 first_read_size = Resource.MAX_EFFICIENT_SIZE - self.metadata_size 302 303 if segment_index == 1: 304 seek_position = 0 305 segment_read_size = first_read_size 306 else: 307 seek_position = first_read_size + ((seek_index-1)*Resource.MAX_EFFICIENT_SIZE) 308 segment_read_size = Resource.MAX_EFFICIENT_SIZE 309 310 data.seek(seek_position) 311 resource_data = data.read(segment_read_size) 312 self.input_file = data 313 314 elif isinstance(data, bytes): 315 data_size = len(data) 316 self.total_size = data_size + self.metadata_size 317 318 resource_data = data 319 self.total_segments = 1 320 self.segment_index = 1 321 self.split = False 322 323 elif data == None: 324 pass 325 326 else: 327 raise TypeError("Invalid data instance type passed to resource initialisation") 328 329 if resource_data: 330 if self.has_metadata: data = self.metadata + resource_data 331 else: data = resource_data 332 333 self.status = Resource.NONE 334 self.link = link 335 if self.link.mtu: 336 self.sdu = self.link.mtu - RNS.Reticulum.HEADER_MAXSIZE - RNS.Reticulum.IFAC_MIN_SIZE 337 else: 338 self.sdu = link.mdu or Resource.SDU 339 self.max_retries = Resource.MAX_RETRIES 340 self.max_adv_retries = Resource.MAX_ADV_RETRIES 341 self.retries_left = self.max_retries 342 self.timeout_factor = self.link.traffic_timeout_factor 343 self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR 344 self.sender_grace_time = Resource.SENDER_GRACE_TIME 345 self.hmu_retry_ok = False 346 self.watchdog_lock = False 347 self.__watchdog_job_id = 0 348 self.__progress_callback = progress_callback 349 self.rtt = None 350 self.rtt_rxd_bytes = 0 351 self.req_sent = 0 352 self.req_resp_rtt_rate = 0 353 self.rtt_rxd_bytes_at_part_req = 0 354 self.req_data_rtt_rate = 0 355 self.eifr = None 356 self.previous_eifr = None 357 self.fast_rate_rounds = 0 358 self.very_slow_rate_rounds = 0 359 self.request_id = request_id 360 self.started_transferring = None 361 self.is_response = is_response 362 self.auto_compress_limit = Resource.AUTO_COMPRESS_MAX_SIZE 363 self.auto_compress_option = auto_compress 364 365 if type(auto_compress) == bool: 366 self.auto_compress = auto_compress 367 elif type(auto_compress) == int: 368 self.auto_compress = True 369 self.auto_compress_limit = auto_compress 370 else: 371 raise TypeError(f"Invalid type {type(auto_compress)} for auto_compress option") 372 373 self.req_hashlist = [] 374 self.receiver_min_consecutive_height = 0 375 376 if timeout != None: 377 self.timeout = timeout 378 else: 379 self.timeout = self.link.rtt * self.link.traffic_timeout_factor 380 381 if data != None: 382 self.initiator = True 383 self.callback = callback 384 self.uncompressed_data = data 385 386 compression_began = time.time() 387 if self.auto_compress and data_size <= self.auto_compress_limit: 388 RNS.log("Compressing resource data...", RNS.LOG_EXTREME) 389 self.compressed_data = bz2.compress(self.uncompressed_data) 390 RNS.log("Compression completed in "+str(round(time.time()-compression_began, 3))+" seconds", RNS.LOG_EXTREME) 391 else: 392 self.compressed_data = self.uncompressed_data 393 394 self.uncompressed_size = len(self.uncompressed_data) 395 self.compressed_size = len(self.compressed_data) 396 397 if (self.compressed_size < self.uncompressed_size and auto_compress): 398 saved_bytes = len(self.uncompressed_data) - len(self.compressed_data) 399 RNS.log("Compression saved "+str(saved_bytes)+" bytes, sending compressed", RNS.LOG_EXTREME) 400 401 self.data = b"" 402 self.data += RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] 403 self.data += self.compressed_data 404 405 self.compressed = True 406 407 else: 408 self.data = b"" 409 self.data += RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] 410 self.data += self.uncompressed_data 411 412 self.compressed = False 413 self.compressed_data = None 414 if self.auto_compress and data_size <= self.auto_compress_limit: 415 RNS.log("Compression did not decrease size, sending uncompressed", RNS.LOG_EXTREME) 416 417 self.compressed_data = None 418 self.uncompressed_data = None 419 420 # Resources handle encryption directly to 421 # make optimal use of packet MTU on an entire 422 # encrypted stream. The Resource instance will 423 # use it's underlying link directly to encrypt. 424 self.data = self.link.encrypt(self.data) 425 self.encrypted = True 426 427 self.size = len(self.data) 428 self.sent_parts = 0 429 hashmap_entries = int(math.ceil(self.size/float(self.sdu))) 430 self.total_parts = hashmap_entries 431 432 hashmap_ok = False 433 while not hashmap_ok: 434 hashmap_computation_began = time.time() 435 RNS.log("Starting resource hashmap computation with "+str(hashmap_entries)+" entries...", RNS.LOG_EXTREME) 436 437 self.random_hash = RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] 438 self.hash = RNS.Identity.full_hash(data+self.random_hash) 439 self.truncated_hash = RNS.Identity.truncated_hash(data+self.random_hash) 440 self.expected_proof = RNS.Identity.full_hash(data+self.hash) 441 442 if original_hash == None: 443 self.original_hash = self.hash 444 else: 445 self.original_hash = original_hash 446 447 self.parts = [] 448 self.hashmap = b"" 449 collision_guard_list = [] 450 for i in range(0,hashmap_entries): 451 data = self.data[i*self.sdu:(i+1)*self.sdu] 452 map_hash = self.get_map_hash(data) 453 454 if map_hash in collision_guard_list: 455 RNS.log("Found hash collision in resource map, remapping...", RNS.LOG_DEBUG) 456 hashmap_ok = False 457 break 458 else: 459 hashmap_ok = True 460 collision_guard_list.append(map_hash) 461 if len(collision_guard_list) > ResourceAdvertisement.COLLISION_GUARD_SIZE: 462 collision_guard_list.pop(0) 463 464 part = RNS.Packet(link, data, context=RNS.Packet.RESOURCE) 465 part.pack() 466 part.map_hash = map_hash 467 468 self.hashmap += part.map_hash 469 self.parts.append(part) 470 471 RNS.log("Hashmap computation concluded in "+str(round(time.time()-hashmap_computation_began, 3))+" seconds", RNS.LOG_EXTREME) 472 473 self.data = None 474 if advertise: 475 self.advertise() 476 else: 477 self.receive_lock = Lock() 478 479 480 def hashmap_update_packet(self, plaintext): 481 if not self.status == Resource.FAILED: 482 self.last_activity = time.time() 483 self.retries_left = self.max_retries 484 485 update = umsgpack.unpackb(plaintext[RNS.Identity.HASHLENGTH//8:]) 486 self.hashmap_update(update[0], update[1]) 487 488 489 def hashmap_update(self, segment, hashmap): 490 if not self.status == Resource.FAILED: 491 self.status = Resource.TRANSFERRING 492 seg_len = ResourceAdvertisement.HASHMAP_MAX_LEN 493 hashes = len(hashmap)//Resource.MAPHASH_LEN 494 for i in range(0,hashes): 495 if self.hashmap[i+segment*seg_len] == None: 496 self.hashmap_height += 1 497 self.hashmap[i+segment*seg_len] = hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 498 499 self.waiting_for_hmu = False 500 self.request_next() 501 502 def get_map_hash(self, data): 503 return RNS.Identity.full_hash(data+self.random_hash)[:Resource.MAPHASH_LEN] 504 505 def advertise(self): 506 """ 507 Advertise the resource. If the other end of the link accepts 508 the resource advertisement it will begin transferring. 509 """ 510 thread = threading.Thread(target=self.__advertise_job, daemon=True) 511 thread.start() 512 513 if self.segment_index < self.total_segments: 514 prepare_thread = threading.Thread(target=self.__prepare_next_segment, daemon=True) 515 prepare_thread.start() 516 517 def __advertise_job(self): 518 self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV) 519 while not self.link.ready_for_new_resource(): 520 self.status = Resource.QUEUED 521 sleep(0.25) 522 523 try: 524 self.advertisement_packet.send() 525 self.last_activity = time.time() 526 self.started_transferring = self.last_activity 527 self.adv_sent = self.last_activity 528 self.rtt = None 529 self.status = Resource.ADVERTISED 530 self.retries_left = self.max_adv_retries 531 self.link.register_outgoing_resource(self) 532 RNS.log("Sent resource advertisement for "+RNS.prettyhexrep(self.hash), RNS.LOG_EXTREME) 533 except Exception as e: 534 RNS.log("Could not advertise resource, the contained exception was: "+str(e), RNS.LOG_ERROR) 535 self.cancel() 536 return 537 538 self.watchdog_job() 539 540 def update_eifr(self): 541 if self.rtt == None: 542 rtt = self.link.rtt 543 else: 544 rtt = self.rtt 545 546 if self.req_data_rtt_rate != 0: 547 expected_inflight_rate = self.req_data_rtt_rate*8 548 else: 549 if self.previous_eifr != None: 550 expected_inflight_rate = self.previous_eifr 551 else: 552 expected_inflight_rate = self.link.establishment_cost*8 / rtt 553 554 self.eifr = expected_inflight_rate 555 if self.link: self.link.expected_rate = self.eifr 556 557 def watchdog_job(self): 558 thread = threading.Thread(target=self.__watchdog_job, daemon=True) 559 thread.start() 560 561 def __watchdog_job(self): 562 self.__watchdog_job_id += 1 563 this_job_id = self.__watchdog_job_id 564 565 while self.status < Resource.ASSEMBLING and this_job_id == self.__watchdog_job_id: 566 while self.watchdog_lock: 567 sleep(0.025) 568 569 sleep_time = None 570 if self.status == Resource.ADVERTISED: 571 sleep_time = (self.adv_sent+self.timeout+Resource.PROCESSING_GRACE)-time.time() 572 if sleep_time < 0: 573 if self.retries_left <= 0: 574 RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG) 575 self.cancel() 576 sleep_time = 0.001 577 else: 578 try: 579 RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG) 580 self.retries_left -= 1 581 self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV) 582 self.advertisement_packet.send() 583 self.last_activity = time.time() 584 self.adv_sent = self.last_activity 585 sleep_time = 0.001 586 except Exception as e: 587 RNS.log("Could not resend advertisement packet, cancelling resource. The contained exception was: "+str(e), RNS.LOG_VERBOSE) 588 self.cancel() 589 590 591 elif self.status == Resource.TRANSFERRING: 592 if not self.initiator: 593 retries_used = self.max_retries - self.retries_left 594 extra_wait = retries_used * Resource.PER_RETRY_DELAY 595 596 self.update_eifr() 597 expected_tof_remaining = (self.outstanding_parts*self.sdu*8)/self.eifr 598 599 if self.req_resp_rtt_rate != 0: 600 sleep_time = self.last_activity + self.part_timeout_factor*expected_tof_remaining + Resource.RETRY_GRACE_TIME + extra_wait - time.time() 601 else: 602 sleep_time = self.last_activity + self.part_timeout_factor*((3*self.sdu)/self.eifr) + Resource.RETRY_GRACE_TIME + extra_wait - time.time() 603 604 # TODO: Remove debug at some point 605 # RNS.log(f"EIFR {RNS.prettyspeed(self.eifr)}, ETOF {RNS.prettyshorttime(expected_tof_remaining)} ", RNS.LOG_DEBUG, pt=True) 606 # RNS.log(f"Resource ST {RNS.prettyshorttime(sleep_time)}, RTT {RNS.prettyshorttime(self.rtt or self.link.rtt)}, {self.outstanding_parts} left", RNS.LOG_DEBUG, pt=True) 607 608 if sleep_time < 0: 609 if self.retries_left > 0: 610 ms = "" if self.outstanding_parts == 1 else "s" 611 RNS.log(f"Timed out waiting for {self.outstanding_parts} part{ms}, requesting retry on {self}", RNS.LOG_DEBUG) 612 if self.window > self.window_min: 613 self.window -= 1 614 if self.window_max > self.window_min: 615 self.window_max -= 1 616 if (self.window_max - self.window) > (self.window_flexibility-1): 617 self.window_max -= 1 618 619 sleep_time = 0.001 620 self.retries_left -= 1 621 self.waiting_for_hmu = False 622 self.request_next() 623 else: 624 self.cancel() 625 sleep_time = 0.001 626 else: 627 max_extra_wait = sum([(r+1) * Resource.PER_RETRY_DELAY for r in range(self.MAX_RETRIES)]) 628 max_wait = self.rtt * self.timeout_factor * self.max_retries + self.sender_grace_time + max_extra_wait 629 sleep_time = self.last_activity + max_wait - time.time() 630 if sleep_time < 0: 631 RNS.log("Resource timed out waiting for part requests", RNS.LOG_DEBUG) 632 self.cancel() 633 sleep_time = 0.001 634 635 elif self.status == Resource.AWAITING_PROOF: 636 # Decrease timeout factor since proof packets are 637 # significantly smaller than full req/resp roundtrip 638 self.timeout_factor = Resource.PROOF_TIMEOUT_FACTOR 639 640 sleep_time = self.last_part_sent + (self.rtt*self.timeout_factor+self.sender_grace_time) - time.time() 641 if sleep_time < 0: 642 if self.retries_left <= 0: 643 RNS.log("Resource timed out waiting for proof", RNS.LOG_DEBUG) 644 self.cancel() 645 sleep_time = 0.001 646 else: 647 RNS.log("All parts sent, but no resource proof received, querying network cache...", RNS.LOG_DEBUG) 648 self.retries_left -= 1 649 expected_data = self.hash + self.expected_proof 650 expected_proof_packet = RNS.Packet(self.link, expected_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) 651 expected_proof_packet.pack() 652 RNS.Transport.cache_request(expected_proof_packet.packet_hash, self.link) 653 self.last_part_sent = time.time() 654 sleep_time = 0.001 655 656 elif self.status == Resource.REJECTED: 657 sleep_time = 0.001 658 659 if sleep_time == 0: 660 RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_DEBUG) 661 if sleep_time == None or sleep_time < 0: 662 RNS.log("Timing error, cancelling resource transfer.", RNS.LOG_ERROR) 663 self.cancel() 664 665 if sleep_time != None: 666 sleep(min(sleep_time, Resource.WATCHDOG_MAX_SLEEP)) 667 668 def assemble(self): 669 if not self.status == Resource.FAILED: 670 try: 671 self.status = Resource.ASSEMBLING 672 stream = b"".join(self.parts) 673 674 if self.encrypted: data = self.link.decrypt(stream) 675 else: data = stream 676 677 # Strip off random hash 678 data = data[Resource.RANDOM_HASH_SIZE:] 679 680 if self.compressed: self.data = bz2.decompress(data) 681 else: self.data = data 682 683 calculated_hash = RNS.Identity.full_hash(self.data+self.random_hash) 684 if calculated_hash == self.hash: 685 if self.has_metadata and self.segment_index == 1: 686 # TODO: Add early metadata_ready callback 687 metadata_size = self.data[0] << 16 | self.data[1] << 8 | self.data[2] 688 packed_metadata = self.data[3:3+metadata_size] 689 metadata_file = open(self.meta_storagepath, "wb") 690 metadata_file.write(packed_metadata) 691 metadata_file.close() 692 del packed_metadata 693 data = self.data[3+metadata_size:] 694 else: 695 data = self.data 696 697 self.file = open(self.storagepath, "ab") 698 self.file.write(data) 699 self.file.close() 700 self.status = Resource.COMPLETE 701 del data 702 self.prove() 703 704 else: self.status = Resource.CORRUPT 705 706 707 except Exception as e: 708 RNS.log("Error while assembling received resource.", RNS.LOG_ERROR) 709 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 710 self.status = Resource.CORRUPT 711 712 self.link.resource_concluded(self) 713 714 if self.segment_index == self.total_segments: 715 if self.callback != None: 716 if not os.path.isfile(self.meta_storagepath): 717 self.metadata = None 718 else: 719 metadata_file = open(self.meta_storagepath, "rb") 720 self.metadata = umsgpack.unpackb(metadata_file.read()) 721 metadata_file.close() 722 try: os.unlink(self.meta_storagepath) 723 except Exception as e: 724 RNS.log(f"Error while cleaning up resource metadata file, the contained exception was: {e}", RNS.LOG_ERROR) 725 726 self.data = open(self.storagepath, "rb") 727 try: self.callback(self) 728 except Exception as e: 729 RNS.log("Error while executing resource assembled callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 730 731 try: 732 if hasattr(self.data, "close") and callable(self.data.close): self.data.close() 733 if os.path.isfile(self.storagepath): os.unlink(self.storagepath) 734 735 except Exception as e: 736 RNS.log(f"Error while cleaning up resource files, the contained exception was: {e}", RNS.LOG_ERROR) 737 else: 738 RNS.log("Resource segment "+str(self.segment_index)+" of "+str(self.total_segments)+" received, waiting for next segment to be announced", RNS.LOG_DEBUG) 739 740 741 def prove(self): 742 if not self.status == Resource.FAILED: 743 try: 744 proof = RNS.Identity.full_hash(self.data+self.hash) 745 proof_data = self.hash+proof 746 proof_packet = RNS.Packet(self.link, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) 747 proof_packet.send() 748 RNS.Transport.cache(proof_packet, force_cache=True) 749 except Exception as e: 750 RNS.log("Could not send proof packet, cancelling resource", RNS.LOG_DEBUG) 751 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 752 self.cancel() 753 754 def __prepare_next_segment(self): 755 # Prepare the next segment for advertisement 756 RNS.log(f"Preparing segment {self.segment_index+1} of {self.total_segments} for resource {self}", RNS.LOG_DEBUG) 757 self.preparing_next_segment = True 758 self.next_segment = Resource( 759 self.input_file, self.link, 760 callback = self.callback, 761 segment_index = self.segment_index+1, 762 original_hash=self.original_hash, 763 progress_callback = self.__progress_callback, 764 request_id = self.request_id, 765 is_response = self.is_response, 766 advertise = False, 767 auto_compress = self.auto_compress_option, 768 sent_metadata_size = self.metadata_size, 769 ) 770 771 def validate_proof(self, proof_data): 772 if not self.status == Resource.FAILED: 773 if len(proof_data) == RNS.Identity.HASHLENGTH//8*2: 774 if proof_data[RNS.Identity.HASHLENGTH//8:] == self.expected_proof: 775 self.status = Resource.COMPLETE 776 self.link.resource_concluded(self) 777 if self.segment_index == self.total_segments: 778 # If all segments were processed, we'll 779 # signal that the resource sending concluded 780 if self.callback != None: 781 try: self.callback(self) 782 except Exception as e: RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 783 finally: 784 try: 785 if hasattr(self, "input_file"): 786 if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close() 787 except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) 788 else: 789 try: 790 if hasattr(self, "input_file"): 791 if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close() 792 except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) 793 else: 794 # Otherwise we'll recursively create the 795 # next segment of the resource 796 if not self.preparing_next_segment: 797 RNS.log(f"Next segment preparation for resource {self} was not started yet, manually preparing now. This will cause transfer slowdown.", RNS.LOG_WARNING) 798 self.__prepare_next_segment() 799 800 while self.next_segment == None: time.sleep(0.05) 801 802 self.data = None 803 self.metadata = None 804 self.parts = None 805 self.input_file = None 806 self.link = None 807 self.req_hashlist = None 808 self.hashmap = None 809 810 self.next_segment.advertise() 811 else: 812 pass 813 else: 814 pass 815 816 817 def receive_part(self, packet): 818 with self.receive_lock: 819 820 self.receiving_part = True 821 self.last_activity = time.time() 822 self.retries_left = self.max_retries 823 824 if self.req_resp == None: 825 self.req_resp = self.last_activity 826 rtt = self.req_resp-self.req_sent 827 828 self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT 829 if self.rtt == None: 830 self.rtt = self.link.rtt 831 self.watchdog_job() 832 elif rtt < self.rtt: 833 self.rtt = max(self.rtt - self.rtt*0.05, rtt) 834 elif rtt > self.rtt: 835 self.rtt = min(self.rtt + self.rtt*0.05, rtt) 836 837 if rtt > 0: 838 req_resp_cost = len(packet.raw)+self.req_sent_bytes 839 self.req_resp_rtt_rate = req_resp_cost / rtt 840 841 if self.req_resp_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: 842 self.fast_rate_rounds += 1 843 844 if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: 845 self.window_max = Resource.WINDOW_MAX_FAST 846 847 if not self.status == Resource.FAILED: 848 self.status = Resource.TRANSFERRING 849 part_data = packet.data 850 part_hash = self.get_map_hash(part_data) 851 852 consecutive_index = self.consecutive_completed_height if self.consecutive_completed_height >= 0 else 0 853 i = consecutive_index 854 for map_hash in self.hashmap[consecutive_index:consecutive_index+self.window]: 855 if map_hash == part_hash: 856 if self.parts[i] == None: 857 858 # Insert data into parts list 859 self.parts[i] = part_data 860 self.rtt_rxd_bytes += len(part_data) 861 self.received_count += 1 862 self.outstanding_parts -= 1 863 864 # Update consecutive completed pointer 865 if i == self.consecutive_completed_height + 1: 866 self.consecutive_completed_height = i 867 868 cp = self.consecutive_completed_height + 1 869 while cp < len(self.parts) and self.parts[cp] != None: 870 self.consecutive_completed_height = cp 871 cp += 1 872 873 if self.__progress_callback != None: 874 try: 875 self.__progress_callback(self) 876 except Exception as e: 877 RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 878 879 i += 1 880 881 self.receiving_part = False 882 883 if self.received_count == self.total_parts and not self.assembly_lock: 884 self.assembly_lock = True 885 threading.Thread(target=self.assemble, daemon=True).start() 886 elif self.outstanding_parts == 0: 887 # TODO: Figure out if there is a mathematically 888 # optimal way to adjust windows 889 if self.window < self.window_max: 890 self.window += 1 891 if (self.window - self.window_min) > (self.window_flexibility-1): 892 self.window_min += 1 893 894 if self.req_sent != 0: 895 rtt = time.time()-self.req_sent 896 req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req 897 898 if rtt != 0: 899 self.req_data_rtt_rate = req_transferred/rtt 900 self.update_eifr() 901 self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes 902 903 if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: 904 self.fast_rate_rounds += 1 905 906 if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: 907 self.window_max = Resource.WINDOW_MAX_FAST 908 909 if self.fast_rate_rounds == 0 and self.req_data_rtt_rate < Resource.RATE_VERY_SLOW and self.very_slow_rate_rounds < Resource.VERY_SLOW_RATE_THRESHOLD: 910 self.very_slow_rate_rounds += 1 911 912 if self.very_slow_rate_rounds == Resource.VERY_SLOW_RATE_THRESHOLD: 913 self.window_max = Resource.WINDOW_MAX_VERY_SLOW 914 915 self.request_next() 916 else: 917 self.receiving_part = False 918 919 # Called on incoming resource to send a request for more data 920 def request_next(self): 921 while self.receiving_part: 922 sleep(0.001) 923 924 if not self.status == Resource.FAILED: 925 if not self.waiting_for_hmu: 926 self.outstanding_parts = 0 927 hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED 928 requested_hashes = b"" 929 930 i = 0; pn = self.consecutive_completed_height+1 931 search_start = pn 932 search_size = self.window 933 934 for part in self.parts[search_start:search_start+search_size]: 935 if part == None: 936 part_hash = self.hashmap[pn] 937 if part_hash != None: 938 requested_hashes += part_hash 939 self.outstanding_parts += 1 940 i += 1 941 else: 942 hashmap_exhausted = Resource.HASHMAP_IS_EXHAUSTED 943 944 pn += 1 945 if i >= self.window or hashmap_exhausted == Resource.HASHMAP_IS_EXHAUSTED: 946 break 947 948 hmu_part = bytes([hashmap_exhausted]) 949 if hashmap_exhausted == Resource.HASHMAP_IS_EXHAUSTED: 950 last_map_hash = self.hashmap[self.hashmap_height-1] 951 hmu_part += last_map_hash 952 self.waiting_for_hmu = True 953 954 request_data = hmu_part + self.hash + requested_hashes 955 request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ) 956 957 try: 958 request_packet.send() 959 self.last_activity = time.time() 960 self.req_sent = self.last_activity 961 self.req_sent_bytes = len(request_packet.raw) 962 self.req_resp = None 963 964 except Exception as e: 965 RNS.log("Could not send resource request packet, cancelling resource", RNS.LOG_DEBUG) 966 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 967 self.cancel() 968 969 # Called on outgoing resource to make it send more data 970 def request(self, request_data): 971 if not self.status == Resource.FAILED: 972 rtt = time.time() - self.adv_sent 973 if self.rtt == None: 974 self.rtt = rtt 975 976 if self.status != Resource.TRANSFERRING: 977 self.status = Resource.TRANSFERRING 978 self.watchdog_job() 979 980 self.retries_left = self.max_retries 981 982 wants_more_hashmap = True if request_data[0] == Resource.HASHMAP_IS_EXHAUSTED else False 983 pad = 1+Resource.MAPHASH_LEN if wants_more_hashmap else 1 984 985 requested_hashes = request_data[pad+RNS.Identity.HASHLENGTH//8:] 986 987 # Define the search scope 988 search_start = self.receiver_min_consecutive_height 989 search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE 990 991 map_hashes = [] 992 for i in range(0,len(requested_hashes)//Resource.MAPHASH_LEN): 993 map_hash = requested_hashes[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 994 map_hashes.append(map_hash) 995 996 search_scope = self.parts[search_start:search_end] 997 requested_parts = list(filter(lambda part: part.map_hash in map_hashes, search_scope)) 998 999 for part in requested_parts: 1000 try: 1001 if not part.sent: 1002 part.send() 1003 self.sent_parts += 1 1004 else: 1005 part.resend() 1006 1007 self.last_activity = time.time() 1008 self.last_part_sent = self.last_activity 1009 1010 except Exception as e: 1011 RNS.log("Resource could not send parts, cancelling transfer!", RNS.LOG_DEBUG) 1012 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 1013 self.cancel() 1014 1015 if wants_more_hashmap: 1016 last_map_hash = request_data[1:Resource.MAPHASH_LEN+1] 1017 1018 part_index = self.receiver_min_consecutive_height 1019 search_start = part_index 1020 search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE 1021 for part in self.parts[search_start:search_end]: 1022 part_index += 1 1023 if part.map_hash == last_map_hash: 1024 break 1025 1026 self.receiver_min_consecutive_height = max(part_index-1-Resource.WINDOW_MAX, 0) 1027 1028 if part_index % ResourceAdvertisement.HASHMAP_MAX_LEN != 0: 1029 RNS.log("Resource sequencing error, cancelling transfer!", RNS.LOG_ERROR) 1030 self.cancel() 1031 return 1032 else: 1033 segment = part_index // ResourceAdvertisement.HASHMAP_MAX_LEN 1034 1035 1036 hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN 1037 hashmap_end = min((segment+1)*ResourceAdvertisement.HASHMAP_MAX_LEN, len(self.parts)) 1038 1039 hashmap = b"" 1040 for i in range(hashmap_start,hashmap_end): 1041 hashmap += self.hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 1042 1043 hmu = self.hash+umsgpack.packb([segment, hashmap]) 1044 hmu_packet = RNS.Packet(self.link, hmu, context = RNS.Packet.RESOURCE_HMU) 1045 1046 try: 1047 hmu_packet.send() 1048 self.last_activity = time.time() 1049 except Exception as e: 1050 RNS.log("Could not send resource HMU packet, cancelling resource", RNS.LOG_DEBUG) 1051 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 1052 self.cancel() 1053 1054 if self.sent_parts == len(self.parts): 1055 self.status = Resource.AWAITING_PROOF 1056 self.retries_left = 3 1057 1058 if self.__progress_callback != None: 1059 try: 1060 self.__progress_callback(self) 1061 except Exception as e: 1062 RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1063 1064 def cancel(self): 1065 """ 1066 Cancels transferring the resource. 1067 """ 1068 if self.status < Resource.COMPLETE: 1069 self.status = Resource.FAILED 1070 if self.initiator: 1071 if self.link.status == RNS.Link.ACTIVE: 1072 try: 1073 cancel_packet = RNS.Packet(self.link, self.hash, context=RNS.Packet.RESOURCE_ICL) 1074 cancel_packet.send() 1075 except Exception as e: 1076 RNS.log("Could not send resource cancel packet, the contained exception was: "+str(e), RNS.LOG_ERROR) 1077 self.link.cancel_outgoing_resource(self) 1078 else: 1079 self.link.cancel_incoming_resource(self) 1080 1081 if self.callback != None: 1082 try: 1083 self.link.resource_concluded(self) 1084 self.callback(self) 1085 except Exception as e: 1086 RNS.log("Error while executing callbacks on resource cancel from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1087 1088 def _rejected(self): 1089 if self.status < Resource.COMPLETE: 1090 if self.initiator: 1091 self.status = Resource.REJECTED 1092 self.link.cancel_outgoing_resource(self) 1093 if self.callback != None: 1094 try: 1095 self.link.resource_concluded(self) 1096 def job(): self.callback(self) 1097 threading.Thread(target=job, daemon=True).start() 1098 except Exception as e: 1099 RNS.log("Error while executing callbacks on resource reject from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1100 1101 def set_callback(self, callback): 1102 self.callback = callback 1103 1104 def progress_callback(self, callback): 1105 self.__progress_callback = callback 1106 1107 def get_progress(self): 1108 """ 1109 :returns: The current progress of the resource transfer as a *float* between 0.0 and 1.0. 1110 """ 1111 if self.status == RNS.Resource.COMPLETE and self.segment_index == self.total_segments: 1112 return 1.0 1113 1114 elif self.initiator: 1115 if not self.split: 1116 self.processed_parts = self.sent_parts 1117 self.progress_total_parts = float(self.total_parts) 1118 1119 else: 1120 is_last_segment = self.segment_index != self.total_segments 1121 total_segments = self.total_segments 1122 processed_segments = self.segment_index-1 1123 1124 current_segment_parts = self.total_parts 1125 max_parts_per_segment = math.ceil(Resource.MAX_EFFICIENT_SIZE/self.sdu) 1126 1127 previously_processed_parts = processed_segments*max_parts_per_segment 1128 1129 if current_segment_parts < max_parts_per_segment: 1130 current_segment_factor = max_parts_per_segment / current_segment_parts 1131 else: 1132 current_segment_factor = 1 1133 1134 self.processed_parts = previously_processed_parts + self.sent_parts*current_segment_factor 1135 self.progress_total_parts = self.total_segments*max_parts_per_segment 1136 1137 else: 1138 if not self.split: 1139 self.processed_parts = self.received_count 1140 self.progress_total_parts = float(self.total_parts) 1141 1142 else: 1143 is_last_segment = self.segment_index != self.total_segments 1144 total_segments = self.total_segments 1145 processed_segments = self.segment_index-1 1146 1147 current_segment_parts = self.total_parts 1148 max_parts_per_segment = math.ceil(Resource.MAX_EFFICIENT_SIZE/self.sdu) 1149 1150 previously_processed_parts = processed_segments*max_parts_per_segment 1151 1152 if current_segment_parts < max_parts_per_segment: 1153 current_segment_factor = max_parts_per_segment / current_segment_parts 1154 else: 1155 current_segment_factor = 1 1156 1157 self.processed_parts = previously_processed_parts + self.received_count*current_segment_factor 1158 self.progress_total_parts = self.total_segments*max_parts_per_segment 1159 1160 1161 progress = min(1.0, self.processed_parts / self.progress_total_parts) 1162 return progress 1163 1164 def get_segment_progress(self): 1165 if self.status == RNS.Resource.COMPLETE and self.segment_index == self.total_segments: 1166 return 1.0 1167 elif self.initiator: 1168 processed_parts = self.sent_parts 1169 else: 1170 processed_parts = self.received_count 1171 1172 progress = min(1.0, processed_parts / self.total_parts) 1173 return progress 1174 1175 def get_transfer_size(self): 1176 """ 1177 :returns: The number of bytes needed to transfer the resource. 1178 """ 1179 return self.size 1180 1181 def get_data_size(self): 1182 """ 1183 :returns: The total data size of the resource. 1184 """ 1185 return self.total_size 1186 1187 def get_parts(self): 1188 """ 1189 :returns: The number of parts the resource will be transferred in. 1190 """ 1191 return self.total_parts 1192 1193 def get_segments(self): 1194 """ 1195 :returns: The number of segments the resource is divided into. 1196 """ 1197 return self.total_segments 1198 1199 def get_hash(self): 1200 """ 1201 :returns: The hash of the resource. 1202 """ 1203 return self.hash 1204 1205 def is_compressed(self): 1206 """ 1207 :returns: Whether the resource is compressed. 1208 """ 1209 return self.compressed 1210 1211 def __str__(self): 1212 return "<"+RNS.hexrep(self.hash,delimit=False)+"/"+RNS.hexrep(self.link.link_id,delimit=False)+">" 1213 1214 1215 class ResourceAdvertisement: 1216 OVERHEAD = 134 1217 HASHMAP_MAX_LEN = math.floor((RNS.Link.MDU-OVERHEAD)/Resource.MAPHASH_LEN) 1218 COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN 1219 1220 assert HASHMAP_MAX_LEN > 0, "The configured MTU is too small to include any map hashes in resource advertisments" 1221 1222 @staticmethod 1223 def is_request(advertisement_packet): 1224 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1225 if adv.q != None and adv.u: 1226 return True 1227 else: 1228 return False 1229 1230 1231 @staticmethod 1232 def is_response(advertisement_packet): 1233 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1234 1235 if adv.q != None and adv.p: 1236 return True 1237 else: 1238 return False 1239 1240 1241 @staticmethod 1242 def read_request_id(advertisement_packet): 1243 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1244 return adv.q 1245 1246 1247 @staticmethod 1248 def read_transfer_size(advertisement_packet): 1249 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1250 return adv.t 1251 1252 1253 @staticmethod 1254 def read_size(advertisement_packet): 1255 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1256 return adv.d 1257 1258 1259 def __init__(self, resource=None, request_id=None, is_response=False): 1260 self.link = None 1261 if resource != None: 1262 self.t = resource.size # Transfer size 1263 self.d = resource.total_size # Total uncompressed data size 1264 self.n = len(resource.parts) # Number of parts 1265 self.h = resource.hash # Resource hash 1266 self.r = resource.random_hash # Resource random hash 1267 self.o = resource.original_hash # First-segment hash 1268 self.m = resource.hashmap # Resource hashmap 1269 self.c = resource.compressed # Compression flag 1270 self.e = resource.encrypted # Encryption flag 1271 self.s = resource.split # Split flag 1272 self.x = resource.has_metadata # Metadata flag 1273 self.i = resource.segment_index # Segment index 1274 self.l = resource.total_segments # Total segments 1275 self.q = resource.request_id # ID of associated request 1276 self.u = False # Is request flag 1277 self.p = False # Is response flag 1278 1279 if self.q != None: 1280 if not resource.is_response: 1281 self.u = True 1282 self.p = False 1283 else: 1284 self.u = False 1285 self.p = True 1286 1287 # Flags 1288 self.f = 0x00 | self.x << 5 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e 1289 1290 def get_transfer_size(self): 1291 return self.t 1292 1293 def get_data_size(self): 1294 return self.d 1295 1296 def get_parts(self): 1297 return self.n 1298 1299 def get_segments(self): 1300 return self.l 1301 1302 def get_hash(self): 1303 return self.h 1304 1305 def is_compressed(self): 1306 return self.c 1307 1308 def has_metadata(self): 1309 return self.x 1310 1311 def get_link(self): 1312 return self.link 1313 1314 def pack(self, segment=0): 1315 hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN 1316 hashmap_end = min((segment+1)*(ResourceAdvertisement.HASHMAP_MAX_LEN), self.n) 1317 1318 hashmap = b"" 1319 for i in range(hashmap_start,hashmap_end): 1320 hashmap += self.m[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 1321 1322 dictionary = { 1323 "t": self.t, # Transfer size 1324 "d": self.d, # Data size 1325 "n": self.n, # Number of parts 1326 "h": self.h, # Resource hash 1327 "r": self.r, # Resource random hash 1328 "o": self.o, # Original hash 1329 "i": self.i, # Segment index 1330 "l": self.l, # Total segments 1331 "q": self.q, # Request ID 1332 "f": self.f, # Resource flags 1333 "m": hashmap 1334 } 1335 1336 return umsgpack.packb(dictionary) 1337 1338 1339 @staticmethod 1340 def unpack(data): 1341 dictionary = umsgpack.unpackb(data) 1342 1343 adv = ResourceAdvertisement() 1344 adv.t = dictionary["t"] 1345 adv.d = dictionary["d"] 1346 adv.n = dictionary["n"] 1347 adv.h = dictionary["h"] 1348 adv.r = dictionary["r"] 1349 adv.o = dictionary["o"] 1350 adv.m = dictionary["m"] 1351 adv.f = dictionary["f"] 1352 adv.i = dictionary["i"] 1353 adv.l = dictionary["l"] 1354 adv.q = dictionary["q"] 1355 adv.e = True if (adv.f & 0x01) == 0x01 else False 1356 adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False 1357 adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False 1358 adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False 1359 adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False 1360 adv.x = True if ((adv.f >> 5) & 0x01) == 0x01 else False 1361 1362 return adv