Resource.py
1 # Reticulum License 2 # 3 # Copyright (c) 2016-2025 Mark Qvist 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # - The Software shall not be used in any kind of system which includes amongst 13 # its functions the ability to purposefully do harm to human beings. 14 # 15 # - The Software shall not be used, directly or indirectly, in the creation of 16 # an artificial intelligence, machine learning or language model training 17 # dataset, including but not limited to any use that contributes to the 18 # training or development of such a model or algorithm. 19 # 20 # - The above copyright notice and this permission notice shall be included in 21 # all copies or substantial portions of the Software. 22 # 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 28 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 # SOFTWARE. 30 31 import RNS 32 import os 33 import bz2 34 import math 35 import time 36 import struct 37 import tempfile 38 import threading 39 from threading import Lock 40 from .vendor import umsgpack as umsgpack 41 from time import sleep 42 43 class Resource: 44 """ 45 The Resource class allows transferring arbitrary amounts 46 of data over a link. It will automatically handle sequencing, 47 compression, coordination and checksumming. 48 49 :param data: The data to be transferred. Can be *bytes* or an open *file handle*. See the :ref:`Filetransfer Example<example-filetransfer>` for details. 50 :param link: The :ref:`RNS.Link<api-link>` instance on which to transfer the data. 51 :param advertise: Optional. Whether to automatically advertise the resource. Can be *True* or *False*. 52 :param auto_compress: Optional. Whether to auto-compress the resource. Can be *True* or *False*. 53 :param callback: An optional *callable* with the signature *callback(resource)*. Will be called when the resource transfer concludes. 54 :param progress_callback: An optional *callable* with the signature *callback(resource)*. Will be called whenever the resource transfer progress is updated. 55 """ 56 57 # The initial window size at beginning of transfer 58 WINDOW = 4 59 60 # Absolute minimum window size during transfer 61 WINDOW_MIN = 2 62 63 # The maximum window size for transfers on slow links 64 WINDOW_MAX_SLOW = 10 65 66 # The maximum window size for transfers on very slow links 67 WINDOW_MAX_VERY_SLOW = 4 68 69 # The maximum window size for transfers on fast links 70 WINDOW_MAX_FAST = 75 71 72 # For calculating maps and guard segments, this 73 # must be set to the global maximum window. 74 WINDOW_MAX = WINDOW_MAX_FAST 75 76 # If the fast rate is sustained for this many request 77 # rounds, the fast link window size will be allowed. 78 FAST_RATE_THRESHOLD = WINDOW_MAX_SLOW - WINDOW - 2 79 80 # If the very slow rate is sustained for this many request 81 # rounds, window will be capped to the very slow limit. 82 VERY_SLOW_RATE_THRESHOLD = 2 83 84 # If the RTT rate is higher than this value, 85 # the max window size for fast links will be used. 86 # The default is 50 Kbps (the value is stored in 87 # bytes per second, hence the "/ 8"). 88 RATE_FAST = (50*1000) / 8 89 90 # If the RTT rate is lower than this value, 91 # the window size will be capped at . 92 # The default is 50 Kbps (the value is stored in 93 # bytes per second, hence the "/ 8"). 94 RATE_VERY_SLOW = (2*1000) / 8 95 96 # The minimum allowed flexibility of the window size. 97 # The difference between window_max and window_min 98 # will never be smaller than this value. 99 WINDOW_FLEXIBILITY = 4 100 101 # Number of bytes in a map hash 102 MAPHASH_LEN = 4 103 SDU = RNS.Packet.MDU 104 RANDOM_HASH_SIZE = 4 105 106 # This is an indication of what the 107 # maximum size a resource should be, if 108 # it is to be handled within reasonable 109 # time constraint, even on small systems. 110 # 111 # This constant will be used when determining 112 # how to sequence the sending of large resources. 113 # 114 # Capped at 16777215 (0xFFFFFF) per segment to 115 # fit in 3 bytes in resource advertisements. 116 MAX_EFFICIENT_SIZE = 1 * 1024 * 1024 - 1 117 RESPONSE_MAX_GRACE_TIME = 10 118 119 # Max metadata size is 16777215 (0xFFFFFF) bytes 120 METADATA_MAX_SIZE = 16 * 1024 * 1024 - 1 121 122 # The maximum size to auto-compress with 123 # bz2 before sending. 124 AUTO_COMPRESS_MAX_SIZE = 64 * 1024 * 1024 125 126 PART_TIMEOUT_FACTOR = 4 127 PART_TIMEOUT_FACTOR_AFTER_RTT = 2 128 PROOF_TIMEOUT_FACTOR = 3 129 MAX_RETRIES = 16 130 MAX_ADV_RETRIES = 4 131 SENDER_GRACE_TIME = 10.0 132 PROCESSING_GRACE = 1.0 133 RETRY_GRACE_TIME = 0.25 134 PER_RETRY_DELAY = 0.5 135 136 WATCHDOG_MAX_SLEEP = 1 137 138 HASHMAP_IS_NOT_EXHAUSTED = 0x00 139 HASHMAP_IS_EXHAUSTED = 0xFF 140 141 # Status constants 142 NONE = 0x00 143 QUEUED = 0x01 144 ADVERTISED = 0x02 145 TRANSFERRING = 0x03 146 AWAITING_PROOF = 0x04 147 ASSEMBLING = 0x05 148 COMPLETE = 0x06 149 FAILED = 0x07 150 CORRUPT = 0x08 151 REJECTED = 0x00 152 153 @staticmethod 154 def reject(advertisement_packet): 155 try: 156 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 157 resource_hash = adv.h 158 reject_packet = RNS.Packet(advertisement_packet.link, resource_hash, context=RNS.Packet.RESOURCE_RCL) 159 reject_packet.send() 160 161 except Exception as e: 162 RNS.log(f"An error ocurred while rejecting advertised resource: {e}", RNS.LOG_ERROR) 163 RNS.trace_exception(e) 164 165 @staticmethod 166 def accept(advertisement_packet, callback=None, progress_callback = None, request_id = None): 167 try: 168 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 169 170 resource = Resource(None, advertisement_packet.link, request_id = request_id) 171 resource.status = Resource.TRANSFERRING 172 173 resource.flags = adv.f 174 resource.size = adv.t 175 resource.total_size = adv.d 176 resource.uncompressed_size = adv.d 177 resource.hash = adv.h 178 resource.original_hash = adv.o 179 resource.random_hash = adv.r 180 resource.hashmap_raw = adv.m 181 resource.encrypted = True if resource.flags & 0x01 else False 182 resource.compressed = True if resource.flags >> 1 & 0x01 else False 183 resource.initiator = False 184 resource.callback = callback 185 resource.__progress_callback = progress_callback 186 resource.total_parts = int(math.ceil(resource.size/float(resource.sdu))) 187 resource.received_count = 0 188 resource.outstanding_parts = 0 189 resource.parts = [None] * resource.total_parts 190 resource.window = Resource.WINDOW 191 resource.window_max = Resource.WINDOW_MAX_SLOW 192 resource.window_min = Resource.WINDOW_MIN 193 resource.window_flexibility = Resource.WINDOW_FLEXIBILITY 194 resource.last_activity = time.time() 195 resource.started_transferring = resource.last_activity 196 197 resource.storagepath = RNS.Reticulum.resourcepath+"/"+resource.original_hash.hex() 198 resource.meta_storagepath = resource.storagepath+".meta" 199 resource.segment_index = adv.i 200 resource.total_segments = adv.l 201 202 if adv.l > 1: resource.split = True 203 else: resource.split = False 204 205 if adv.x: resource.has_metadata = True 206 else: resource.has_metadata = False 207 208 resource.hashmap = [None] * resource.total_parts 209 resource.hashmap_height = 0 210 resource.waiting_for_hmu = False 211 resource.receiving_part = False 212 resource.consecutive_completed_height = -1 213 214 previous_window = resource.link.get_last_resource_window() 215 previous_eifr = resource.link.get_last_resource_eifr() 216 if previous_window: 217 resource.window = previous_window 218 if previous_eifr: 219 resource.previous_eifr = previous_eifr 220 221 if not resource.link.has_incoming_resource(resource): 222 resource.link.register_incoming_resource(resource) 223 224 RNS.log(f"Accepting resource advertisement for {RNS.prettyhexrep(resource.hash)}. Transfer size is {RNS.prettysize(resource.size)} in {resource.total_parts} parts.", RNS.LOG_DEBUG) 225 if resource.link.callbacks.resource_started != None: 226 try: 227 resource.link.callbacks.resource_started(resource) 228 except Exception as e: 229 RNS.log("Error while executing resource started callback from "+str(resource)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 230 231 resource.hashmap_update(0, resource.hashmap_raw) 232 resource.watchdog_job() 233 return resource 234 235 else: 236 RNS.log("Ignoring resource advertisement for "+RNS.prettyhexrep(resource.hash)+", resource already transferring", RNS.LOG_DEBUG) 237 return None 238 239 except Exception as e: 240 RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG) 241 return None 242 243 # Create a resource for transmission to a remote destination 244 # The data passed can be either a bytes-array or a file opened 245 # in binary read mode. 246 def __init__(self, data, link, metadata=None, advertise=True, auto_compress=True, callback=None, progress_callback=None, 247 timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False, sent_metadata_size=0): 248 249 data_size = None 250 resource_data = None 251 self.assembly_lock = False 252 self.preparing_next_segment = False 253 self.next_segment = None 254 self.metadata = None 255 self.has_metadata = False 256 self.metadata_size = sent_metadata_size 257 258 if metadata != None: 259 packed_metadata = umsgpack.packb(metadata) 260 metadata_size = len(packed_metadata) 261 if metadata_size > Resource.METADATA_MAX_SIZE: 262 raise SystemError("Resource metadata size exceeded") 263 else: 264 self.metadata = struct.pack(">I", metadata_size)[1:] + packed_metadata 265 self.metadata_size = len(self.metadata) 266 self.has_metadata = True 267 else: 268 self.metadata = b"" 269 if sent_metadata_size > 0: self.has_metadata = True 270 271 if data != None: 272 if not hasattr(data, "read") and self.metadata_size + len(data) > Resource.MAX_EFFICIENT_SIZE: 273 original_data = data 274 data_size = len(original_data) 275 data = tempfile.TemporaryFile() 276 data.write(original_data) 277 del original_data 278 279 if hasattr(data, "read"): 280 if data_size == None: data_size = os.stat(data.name).st_size 281 self.total_size = data_size + self.metadata_size 282 283 if self.total_size <= Resource.MAX_EFFICIENT_SIZE: 284 self.total_segments = 1 285 self.segment_index = 1 286 self.split = False 287 resource_data = data.read() 288 data.close() 289 290 else: 291 # self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 292 # self.segment_index = segment_index 293 # self.split = True 294 # seek_index = segment_index-1 295 # seek_position = seek_index*Resource.MAX_EFFICIENT_SIZE 296 297 self.total_segments = ((self.total_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 298 self.segment_index = segment_index 299 self.split = True 300 seek_index = segment_index-1 301 first_read_size = Resource.MAX_EFFICIENT_SIZE - self.metadata_size 302 303 if segment_index == 1: 304 seek_position = 0 305 segment_read_size = first_read_size 306 else: 307 seek_position = first_read_size + ((seek_index-1)*Resource.MAX_EFFICIENT_SIZE) 308 segment_read_size = Resource.MAX_EFFICIENT_SIZE 309 310 data.seek(seek_position) 311 resource_data = data.read(segment_read_size) 312 self.input_file = data 313 314 elif isinstance(data, bytes): 315 data_size = len(data) 316 self.total_size = data_size + self.metadata_size 317 318 resource_data = data 319 self.total_segments = 1 320 self.segment_index = 1 321 self.split = False 322 323 elif data == None: 324 pass 325 326 else: 327 raise TypeError("Invalid data instance type passed to resource initialisation") 328 329 if resource_data: 330 if self.has_metadata: data = self.metadata + resource_data 331 else: data = resource_data 332 333 self.status = Resource.NONE 334 self.link = link 335 if self.link.mtu: 336 self.sdu = self.link.mtu - RNS.Reticulum.HEADER_MAXSIZE - RNS.Reticulum.IFAC_MIN_SIZE 337 else: 338 self.sdu = link.mdu or Resource.SDU 339 self.max_retries = Resource.MAX_RETRIES 340 self.max_adv_retries = Resource.MAX_ADV_RETRIES 341 self.retries_left = self.max_retries 342 self.timeout_factor = self.link.traffic_timeout_factor 343 self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR 344 self.sender_grace_time = Resource.SENDER_GRACE_TIME 345 self.hmu_retry_ok = False 346 self.watchdog_lock = False 347 self.__watchdog_job_id = 0 348 self.__progress_callback = progress_callback 349 self.rtt = None 350 self.rtt_rxd_bytes = 0 351 self.req_sent = 0 352 self.req_resp_rtt_rate = 0 353 self.rtt_rxd_bytes_at_part_req = 0 354 self.req_data_rtt_rate = 0 355 self.eifr = None 356 self.previous_eifr = None 357 self.fast_rate_rounds = 0 358 self.very_slow_rate_rounds = 0 359 self.request_id = request_id 360 self.started_transferring = None 361 self.is_response = is_response 362 self.auto_compress_limit = Resource.AUTO_COMPRESS_MAX_SIZE 363 self.auto_compress_option = auto_compress 364 365 if type(auto_compress) == bool: 366 self.auto_compress = auto_compress 367 elif type(auto_compress) == int: 368 self.auto_compress = True 369 self.auto_compress_limit = auto_compress 370 else: 371 raise TypeError(f"Invalid type {type(auto_compress)} for auto_compress option") 372 373 self.req_hashlist = [] 374 self.receiver_min_consecutive_height = 0 375 376 if timeout != None: 377 self.timeout = timeout 378 else: 379 self.timeout = self.link.rtt * self.link.traffic_timeout_factor 380 381 if data != None: 382 self.initiator = True 383 self.callback = callback 384 self.uncompressed_data = data 385 386 compression_began = time.time() 387 if self.auto_compress and data_size <= self.auto_compress_limit: 388 RNS.log("Compressing resource data...", RNS.LOG_EXTREME) 389 self.compressed_data = bz2.compress(self.uncompressed_data) 390 RNS.log("Compression completed in "+str(round(time.time()-compression_began, 3))+" seconds", RNS.LOG_EXTREME) 391 else: 392 self.compressed_data = self.uncompressed_data 393 394 self.uncompressed_size = len(self.uncompressed_data) 395 self.compressed_size = len(self.compressed_data) 396 397 if (self.compressed_size < self.uncompressed_size and auto_compress): 398 saved_bytes = len(self.uncompressed_data) - len(self.compressed_data) 399 RNS.log("Compression saved "+str(saved_bytes)+" bytes, sending compressed", RNS.LOG_EXTREME) 400 401 self.data = b"" 402 self.data += RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] 403 self.data += self.compressed_data 404 405 self.compressed = True 406 407 else: 408 self.data = b"" 409 self.data += RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] 410 self.data += self.uncompressed_data 411 412 self.compressed = False 413 self.compressed_data = None 414 if self.auto_compress and data_size <= self.auto_compress_limit: 415 RNS.log("Compression did not decrease size, sending uncompressed", RNS.LOG_EXTREME) 416 417 self.compressed_data = None 418 self.uncompressed_data = None 419 420 # Resources handle encryption directly to 421 # make optimal use of packet MTU on an entire 422 # encrypted stream. The Resource instance will 423 # use it's underlying link directly to encrypt. 424 self.data = self.link.encrypt(self.data) 425 self.encrypted = True 426 427 self.size = len(self.data) 428 self.sent_parts = 0 429 hashmap_entries = int(math.ceil(self.size/float(self.sdu))) 430 self.total_parts = hashmap_entries 431 432 hashmap_ok = False 433 while not hashmap_ok: 434 hashmap_computation_began = time.time() 435 RNS.log("Starting resource hashmap computation with "+str(hashmap_entries)+" entries...", RNS.LOG_EXTREME) 436 437 self.random_hash = RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] 438 self.hash = RNS.Identity.full_hash(data+self.random_hash) 439 self.truncated_hash = RNS.Identity.truncated_hash(data+self.random_hash) 440 self.expected_proof = RNS.Identity.full_hash(data+self.hash) 441 442 if original_hash == None: 443 self.original_hash = self.hash 444 else: 445 self.original_hash = original_hash 446 447 self.parts = [] 448 self.hashmap = b"" 449 collision_guard_list = [] 450 for i in range(0,hashmap_entries): 451 data = self.data[i*self.sdu:(i+1)*self.sdu] 452 map_hash = self.get_map_hash(data) 453 454 if map_hash in collision_guard_list: 455 RNS.log("Found hash collision in resource map, remapping...", RNS.LOG_DEBUG) 456 hashmap_ok = False 457 break 458 else: 459 hashmap_ok = True 460 collision_guard_list.append(map_hash) 461 if len(collision_guard_list) > ResourceAdvertisement.COLLISION_GUARD_SIZE: 462 collision_guard_list.pop(0) 463 464 part = RNS.Packet(link, data, context=RNS.Packet.RESOURCE) 465 part.pack() 466 part.map_hash = map_hash 467 468 self.hashmap += part.map_hash 469 self.parts.append(part) 470 471 RNS.log("Hashmap computation concluded in "+str(round(time.time()-hashmap_computation_began, 3))+" seconds", RNS.LOG_EXTREME) 472 473 self.data = None 474 if advertise: 475 self.advertise() 476 else: 477 self.receive_lock = Lock() 478 479 480 def hashmap_update_packet(self, plaintext): 481 if not self.status == Resource.FAILED: 482 self.last_activity = time.time() 483 self.retries_left = self.max_retries 484 485 update = umsgpack.unpackb(plaintext[RNS.Identity.HASHLENGTH//8:]) 486 self.hashmap_update(update[0], update[1]) 487 488 489 def hashmap_update(self, segment, hashmap): 490 if not self.status == Resource.FAILED: 491 self.status = Resource.TRANSFERRING 492 seg_len = ResourceAdvertisement.HASHMAP_MAX_LEN 493 hashes = len(hashmap)//Resource.MAPHASH_LEN 494 for i in range(0,hashes): 495 if self.hashmap[i+segment*seg_len] == None: 496 self.hashmap_height += 1 497 self.hashmap[i+segment*seg_len] = hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 498 499 self.waiting_for_hmu = False 500 self.request_next() 501 502 def get_map_hash(self, data): 503 return RNS.Identity.full_hash(data+self.random_hash)[:Resource.MAPHASH_LEN] 504 505 def advertise(self): 506 """ 507 Advertise the resource. If the other end of the link accepts 508 the resource advertisement it will begin transferring. 509 """ 510 thread = threading.Thread(target=self.__advertise_job, daemon=True) 511 thread.start() 512 513 if self.segment_index < self.total_segments: 514 prepare_thread = threading.Thread(target=self.__prepare_next_segment, daemon=True) 515 prepare_thread.start() 516 517 def __advertise_job(self): 518 self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV) 519 while not self.link.ready_for_new_resource(): 520 self.status = Resource.QUEUED 521 sleep(0.25) 522 523 try: 524 self.advertisement_packet.send() 525 self.last_activity = time.time() 526 self.started_transferring = self.last_activity 527 self.adv_sent = self.last_activity 528 self.rtt = None 529 self.status = Resource.ADVERTISED 530 self.retries_left = self.max_adv_retries 531 self.link.register_outgoing_resource(self) 532 RNS.log("Sent resource advertisement for "+RNS.prettyhexrep(self.hash), RNS.LOG_EXTREME) 533 except Exception as e: 534 RNS.log("Could not advertise resource, the contained exception was: "+str(e), RNS.LOG_ERROR) 535 self.cancel() 536 return 537 538 self.watchdog_job() 539 540 def update_eifr(self): 541 if self.rtt == None: 542 rtt = self.link.rtt 543 else: 544 rtt = self.rtt 545 546 if self.req_data_rtt_rate != 0: 547 expected_inflight_rate = self.req_data_rtt_rate*8 548 else: 549 if self.previous_eifr != None: 550 expected_inflight_rate = self.previous_eifr 551 else: 552 expected_inflight_rate = self.link.establishment_cost*8 / rtt 553 554 self.eifr = expected_inflight_rate 555 if self.link: self.link.expected_rate = self.eifr 556 557 def watchdog_job(self): 558 thread = threading.Thread(target=self.__watchdog_job, daemon=True) 559 thread.start() 560 561 def __watchdog_job(self): 562 self.__watchdog_job_id += 1 563 this_job_id = self.__watchdog_job_id 564 565 while self.status < Resource.ASSEMBLING and this_job_id == self.__watchdog_job_id: 566 while self.watchdog_lock: 567 sleep(0.025) 568 569 sleep_time = None 570 if self.status == Resource.ADVERTISED: 571 sleep_time = (self.adv_sent+self.timeout+Resource.PROCESSING_GRACE)-time.time() 572 if sleep_time < 0: 573 if self.retries_left <= 0: 574 RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG) 575 self.cancel() 576 sleep_time = 0.001 577 else: 578 try: 579 RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG) 580 self.retries_left -= 1 581 self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV) 582 self.advertisement_packet.send() 583 self.last_activity = time.time() 584 self.adv_sent = self.last_activity 585 sleep_time = 0.001 586 except Exception as e: 587 RNS.log("Could not resend advertisement packet, cancelling resource. The contained exception was: "+str(e), RNS.LOG_VERBOSE) 588 self.cancel() 589 590 591 elif self.status == Resource.TRANSFERRING: 592 if not self.initiator: 593 retries_used = self.max_retries - self.retries_left 594 extra_wait = retries_used * Resource.PER_RETRY_DELAY 595 596 self.update_eifr() 597 expected_tof_remaining = (self.outstanding_parts*self.sdu*8)/self.eifr 598 599 if self.req_resp_rtt_rate != 0: 600 sleep_time = self.last_activity + self.part_timeout_factor*expected_tof_remaining + Resource.RETRY_GRACE_TIME + extra_wait - time.time() 601 else: 602 sleep_time = self.last_activity + self.part_timeout_factor*((3*self.sdu)/self.eifr) + Resource.RETRY_GRACE_TIME + extra_wait - time.time() 603 604 # TODO: Remove debug at some point 605 # RNS.log(f"EIFR {RNS.prettyspeed(self.eifr)}, ETOF {RNS.prettyshorttime(expected_tof_remaining)} ", RNS.LOG_DEBUG, pt=True) 606 # RNS.log(f"Resource ST {RNS.prettyshorttime(sleep_time)}, RTT {RNS.prettyshorttime(self.rtt or self.link.rtt)}, {self.outstanding_parts} left", RNS.LOG_DEBUG, pt=True) 607 608 if sleep_time < 0: 609 if self.retries_left > 0: 610 ms = "" if self.outstanding_parts == 1 else "s" 611 RNS.log("Timed out waiting for "+str(self.outstanding_parts)+" part"+ms+", requesting retry", RNS.LOG_DEBUG) 612 if self.window > self.window_min: 613 self.window -= 1 614 if self.window_max > self.window_min: 615 self.window_max -= 1 616 if (self.window_max - self.window) > (self.window_flexibility-1): 617 self.window_max -= 1 618 619 sleep_time = 0.001 620 self.retries_left -= 1 621 self.waiting_for_hmu = False 622 self.request_next() 623 else: 624 self.cancel() 625 sleep_time = 0.001 626 else: 627 max_extra_wait = sum([(r+1) * Resource.PER_RETRY_DELAY for r in range(self.MAX_RETRIES)]) 628 max_wait = self.rtt * self.timeout_factor * self.max_retries + self.sender_grace_time + max_extra_wait 629 sleep_time = self.last_activity + max_wait - time.time() 630 if sleep_time < 0: 631 RNS.log("Resource timed out waiting for part requests", RNS.LOG_DEBUG) 632 self.cancel() 633 sleep_time = 0.001 634 635 elif self.status == Resource.AWAITING_PROOF: 636 # Decrease timeout factor since proof packets are 637 # significantly smaller than full req/resp roundtrip 638 self.timeout_factor = Resource.PROOF_TIMEOUT_FACTOR 639 640 sleep_time = self.last_part_sent + (self.rtt*self.timeout_factor+self.sender_grace_time) - time.time() 641 if sleep_time < 0: 642 if self.retries_left <= 0: 643 RNS.log("Resource timed out waiting for proof", RNS.LOG_DEBUG) 644 self.cancel() 645 sleep_time = 0.001 646 else: 647 RNS.log("All parts sent, but no resource proof received, querying network cache...", RNS.LOG_DEBUG) 648 self.retries_left -= 1 649 expected_data = self.hash + self.expected_proof 650 expected_proof_packet = RNS.Packet(self.link, expected_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) 651 expected_proof_packet.pack() 652 RNS.Transport.cache_request(expected_proof_packet.packet_hash, self.link) 653 self.last_part_sent = time.time() 654 sleep_time = 0.001 655 656 elif self.status == Resource.REJECTED: 657 sleep_time = 0.001 658 659 if sleep_time == 0: 660 RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_DEBUG) 661 if sleep_time == None or sleep_time < 0: 662 RNS.log("Timing error, cancelling resource transfer.", RNS.LOG_ERROR) 663 self.cancel() 664 665 if sleep_time != None: 666 sleep(min(sleep_time, Resource.WATCHDOG_MAX_SLEEP)) 667 668 def assemble(self): 669 if not self.status == Resource.FAILED: 670 try: 671 self.status = Resource.ASSEMBLING 672 stream = b"".join(self.parts) 673 674 if self.encrypted: data = self.link.decrypt(stream) 675 else: data = stream 676 677 # Strip off random hash 678 data = data[Resource.RANDOM_HASH_SIZE:] 679 680 if self.compressed: self.data = bz2.decompress(data) 681 else: self.data = data 682 683 calculated_hash = RNS.Identity.full_hash(self.data+self.random_hash) 684 if calculated_hash == self.hash: 685 if self.has_metadata and self.segment_index == 1: 686 # TODO: Add early metadata_ready callback 687 metadata_size = self.data[0] << 16 | self.data[1] << 8 | self.data[2] 688 packed_metadata = self.data[3:3+metadata_size] 689 metadata_file = open(self.meta_storagepath, "wb") 690 metadata_file.write(packed_metadata) 691 metadata_file.close() 692 del packed_metadata 693 data = self.data[3+metadata_size:] 694 else: 695 data = self.data 696 697 self.file = open(self.storagepath, "ab") 698 self.file.write(data) 699 self.file.close() 700 self.status = Resource.COMPLETE 701 del data 702 self.prove() 703 704 else: self.status = Resource.CORRUPT 705 706 707 except Exception as e: 708 RNS.log("Error while assembling received resource.", RNS.LOG_ERROR) 709 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 710 self.status = Resource.CORRUPT 711 712 self.link.resource_concluded(self) 713 714 if self.segment_index == self.total_segments: 715 if self.callback != None: 716 if not os.path.isfile(self.meta_storagepath): 717 self.metadata = None 718 else: 719 metadata_file = open(self.meta_storagepath, "rb") 720 self.metadata = umsgpack.unpackb(metadata_file.read()) 721 metadata_file.close() 722 try: os.unlink(self.meta_storagepath) 723 except Exception as e: 724 RNS.log(f"Error while cleaning up resource metadata file, the contained exception was: {e}", RNS.LOG_ERROR) 725 726 self.data = open(self.storagepath, "rb") 727 try: self.callback(self) 728 except Exception as e: 729 RNS.log("Error while executing resource assembled callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 730 731 try: 732 if hasattr(self.data, "close") and callable(self.data.close): self.data.close() 733 if os.path.isfile(self.storagepath): os.unlink(self.storagepath) 734 735 except Exception as e: 736 RNS.log(f"Error while cleaning up resource files, the contained exception was: {e}", RNS.LOG_ERROR) 737 else: 738 RNS.log("Resource segment "+str(self.segment_index)+" of "+str(self.total_segments)+" received, waiting for next segment to be announced", RNS.LOG_DEBUG) 739 740 741 def prove(self): 742 if not self.status == Resource.FAILED: 743 try: 744 proof = RNS.Identity.full_hash(self.data+self.hash) 745 proof_data = self.hash+proof 746 proof_packet = RNS.Packet(self.link, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) 747 proof_packet.send() 748 RNS.Transport.cache(proof_packet, force_cache=True) 749 except Exception as e: 750 RNS.log("Could not send proof packet, cancelling resource", RNS.LOG_DEBUG) 751 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 752 self.cancel() 753 754 def __prepare_next_segment(self): 755 # Prepare the next segment for advertisement 756 RNS.log(f"Preparing segment {self.segment_index+1} of {self.total_segments} for resource {self}", RNS.LOG_DEBUG) 757 self.preparing_next_segment = True 758 self.next_segment = Resource( 759 self.input_file, self.link, 760 callback = self.callback, 761 segment_index = self.segment_index+1, 762 original_hash=self.original_hash, 763 progress_callback = self.__progress_callback, 764 request_id = self.request_id, 765 is_response = self.is_response, 766 advertise = False, 767 auto_compress = self.auto_compress_option, 768 sent_metadata_size = self.metadata_size, 769 ) 770 771 def validate_proof(self, proof_data): 772 if not self.status == Resource.FAILED: 773 if len(proof_data) == RNS.Identity.HASHLENGTH//8*2: 774 if proof_data[RNS.Identity.HASHLENGTH//8:] == self.expected_proof: 775 self.status = Resource.COMPLETE 776 self.link.resource_concluded(self) 777 if self.segment_index == self.total_segments: 778 # If all segments were processed, we'll 779 # signal that the resource sending concluded 780 if self.callback != None: 781 try: self.callback(self) 782 except Exception as e: RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 783 finally: 784 try: 785 if hasattr(self, "input_file"): 786 if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close() 787 except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) 788 else: 789 try: 790 if hasattr(self, "input_file"): 791 if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close() 792 except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) 793 else: 794 # Otherwise we'll recursively create the 795 # next segment of the resource 796 if not self.preparing_next_segment: 797 RNS.log(f"Next segment preparation for resource {self} was not started yet, manually preparing now. This will cause transfer slowdown.", RNS.LOG_WARNING) 798 self.__prepare_next_segment() 799 800 while self.next_segment == None: time.sleep(0.05) 801 802 self.data = None 803 self.metadata = None 804 self.parts = None 805 self.input_file = None 806 self.link = None 807 self.req_hashlist = None 808 self.hashmap = None 809 810 self.next_segment.advertise() 811 else: 812 pass 813 else: 814 pass 815 816 817 def receive_part(self, packet): 818 with self.receive_lock: 819 820 self.receiving_part = True 821 self.last_activity = time.time() 822 self.retries_left = self.max_retries 823 824 if self.req_resp == None: 825 self.req_resp = self.last_activity 826 rtt = self.req_resp-self.req_sent 827 828 self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT 829 if self.rtt == None: 830 self.rtt = self.link.rtt 831 self.watchdog_job() 832 elif rtt < self.rtt: 833 self.rtt = max(self.rtt - self.rtt*0.05, rtt) 834 elif rtt > self.rtt: 835 self.rtt = min(self.rtt + self.rtt*0.05, rtt) 836 837 if rtt > 0: 838 req_resp_cost = len(packet.raw)+self.req_sent_bytes 839 self.req_resp_rtt_rate = req_resp_cost / rtt 840 841 if self.req_resp_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: 842 self.fast_rate_rounds += 1 843 844 if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: 845 self.window_max = Resource.WINDOW_MAX_FAST 846 847 if not self.status == Resource.FAILED: 848 self.status = Resource.TRANSFERRING 849 part_data = packet.data 850 part_hash = self.get_map_hash(part_data) 851 852 consecutive_index = self.consecutive_completed_height if self.consecutive_completed_height >= 0 else 0 853 i = consecutive_index 854 for map_hash in self.hashmap[consecutive_index:consecutive_index+self.window]: 855 if map_hash == part_hash: 856 if self.parts[i] == None: 857 858 # Insert data into parts list 859 self.parts[i] = part_data 860 self.rtt_rxd_bytes += len(part_data) 861 self.received_count += 1 862 self.outstanding_parts -= 1 863 864 # Update consecutive completed pointer 865 if i == self.consecutive_completed_height + 1: 866 self.consecutive_completed_height = i 867 868 cp = self.consecutive_completed_height + 1 869 while cp < len(self.parts) and self.parts[cp] != None: 870 self.consecutive_completed_height = cp 871 cp += 1 872 873 if self.__progress_callback != None: 874 try: 875 self.__progress_callback(self) 876 except Exception as e: 877 RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 878 879 i += 1 880 881 self.receiving_part = False 882 883 if self.received_count == self.total_parts and not self.assembly_lock: 884 self.assembly_lock = True 885 self.assemble() 886 elif self.outstanding_parts == 0: 887 # TODO: Figure out if there is a mathematically 888 # optimal way to adjust windows 889 if self.window < self.window_max: 890 self.window += 1 891 if (self.window - self.window_min) > (self.window_flexibility-1): 892 self.window_min += 1 893 894 if self.req_sent != 0: 895 rtt = time.time()-self.req_sent 896 req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req 897 898 if rtt != 0: 899 self.req_data_rtt_rate = req_transferred/rtt 900 self.update_eifr() 901 self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes 902 903 if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: 904 self.fast_rate_rounds += 1 905 906 if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: 907 self.window_max = Resource.WINDOW_MAX_FAST 908 909 if self.fast_rate_rounds == 0 and self.req_data_rtt_rate < Resource.RATE_VERY_SLOW and self.very_slow_rate_rounds < Resource.VERY_SLOW_RATE_THRESHOLD: 910 self.very_slow_rate_rounds += 1 911 912 if self.very_slow_rate_rounds == Resource.VERY_SLOW_RATE_THRESHOLD: 913 self.window_max = Resource.WINDOW_MAX_VERY_SLOW 914 915 self.request_next() 916 else: 917 self.receiving_part = False 918 919 # Called on incoming resource to send a request for more data 920 def request_next(self): 921 while self.receiving_part: 922 sleep(0.001) 923 924 if not self.status == Resource.FAILED: 925 if not self.waiting_for_hmu: 926 self.outstanding_parts = 0 927 hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED 928 requested_hashes = b"" 929 930 i = 0; pn = self.consecutive_completed_height+1 931 search_start = pn 932 search_size = self.window 933 934 for part in self.parts[search_start:search_start+search_size]: 935 if part == None: 936 part_hash = self.hashmap[pn] 937 if part_hash != None: 938 requested_hashes += part_hash 939 self.outstanding_parts += 1 940 i += 1 941 else: 942 hashmap_exhausted = Resource.HASHMAP_IS_EXHAUSTED 943 944 pn += 1 945 if i >= self.window or hashmap_exhausted == Resource.HASHMAP_IS_EXHAUSTED: 946 break 947 948 hmu_part = bytes([hashmap_exhausted]) 949 if hashmap_exhausted == Resource.HASHMAP_IS_EXHAUSTED: 950 last_map_hash = self.hashmap[self.hashmap_height-1] 951 hmu_part += last_map_hash 952 self.waiting_for_hmu = True 953 954 request_data = hmu_part + self.hash + requested_hashes 955 request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ) 956 957 try: 958 request_packet.send() 959 self.last_activity = time.time() 960 self.req_sent = self.last_activity 961 self.req_sent_bytes = len(request_packet.raw) 962 self.req_resp = None 963 964 except Exception as e: 965 RNS.log("Could not send resource request packet, cancelling resource", RNS.LOG_DEBUG) 966 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 967 self.cancel() 968 969 # Called on outgoing resource to make it send more data 970 def request(self, request_data): 971 if not self.status == Resource.FAILED: 972 rtt = time.time() - self.adv_sent 973 if self.rtt == None: 974 self.rtt = rtt 975 976 if self.status != Resource.TRANSFERRING: 977 self.status = Resource.TRANSFERRING 978 self.watchdog_job() 979 980 self.retries_left = self.max_retries 981 982 wants_more_hashmap = True if request_data[0] == Resource.HASHMAP_IS_EXHAUSTED else False 983 pad = 1+Resource.MAPHASH_LEN if wants_more_hashmap else 1 984 985 requested_hashes = request_data[pad+RNS.Identity.HASHLENGTH//8:] 986 987 # Define the search scope 988 search_start = self.receiver_min_consecutive_height 989 search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE 990 991 map_hashes = [] 992 for i in range(0,len(requested_hashes)//Resource.MAPHASH_LEN): 993 map_hash = requested_hashes[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 994 map_hashes.append(map_hash) 995 996 search_scope = self.parts[search_start:search_end] 997 requested_parts = list(filter(lambda part: part.map_hash in map_hashes, search_scope)) 998 999 for part in requested_parts: 1000 try: 1001 if not part.sent: 1002 part.send() 1003 self.sent_parts += 1 1004 else: 1005 part.resend() 1006 1007 self.last_activity = time.time() 1008 self.last_part_sent = self.last_activity 1009 1010 except Exception as e: 1011 RNS.log("Resource could not send parts, cancelling transfer!", RNS.LOG_DEBUG) 1012 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 1013 self.cancel() 1014 1015 if wants_more_hashmap: 1016 last_map_hash = request_data[1:Resource.MAPHASH_LEN+1] 1017 1018 part_index = self.receiver_min_consecutive_height 1019 search_start = part_index 1020 search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE 1021 for part in self.parts[search_start:search_end]: 1022 part_index += 1 1023 if part.map_hash == last_map_hash: 1024 break 1025 1026 self.receiver_min_consecutive_height = max(part_index-1-Resource.WINDOW_MAX, 0) 1027 1028 if part_index % ResourceAdvertisement.HASHMAP_MAX_LEN != 0: 1029 RNS.log("Resource sequencing error, cancelling transfer!", RNS.LOG_ERROR) 1030 self.cancel() 1031 return 1032 else: 1033 segment = part_index // ResourceAdvertisement.HASHMAP_MAX_LEN 1034 1035 1036 hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN 1037 hashmap_end = min((segment+1)*ResourceAdvertisement.HASHMAP_MAX_LEN, len(self.parts)) 1038 1039 hashmap = b"" 1040 for i in range(hashmap_start,hashmap_end): 1041 hashmap += self.hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 1042 1043 hmu = self.hash+umsgpack.packb([segment, hashmap]) 1044 hmu_packet = RNS.Packet(self.link, hmu, context = RNS.Packet.RESOURCE_HMU) 1045 1046 try: 1047 hmu_packet.send() 1048 self.last_activity = time.time() 1049 except Exception as e: 1050 RNS.log("Could not send resource HMU packet, cancelling resource", RNS.LOG_DEBUG) 1051 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 1052 self.cancel() 1053 1054 if self.sent_parts == len(self.parts): 1055 self.status = Resource.AWAITING_PROOF 1056 self.retries_left = 3 1057 1058 if self.__progress_callback != None: 1059 try: 1060 self.__progress_callback(self) 1061 except Exception as e: 1062 RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1063 1064 def cancel(self): 1065 """ 1066 Cancels transferring the resource. 1067 """ 1068 if self.status < Resource.COMPLETE: 1069 self.status = Resource.FAILED 1070 if self.initiator: 1071 if self.link.status == RNS.Link.ACTIVE: 1072 try: 1073 cancel_packet = RNS.Packet(self.link, self.hash, context=RNS.Packet.RESOURCE_ICL) 1074 cancel_packet.send() 1075 except Exception as e: 1076 RNS.log("Could not send resource cancel packet, the contained exception was: "+str(e), RNS.LOG_ERROR) 1077 self.link.cancel_outgoing_resource(self) 1078 else: 1079 self.link.cancel_incoming_resource(self) 1080 1081 if self.callback != None: 1082 try: 1083 self.link.resource_concluded(self) 1084 self.callback(self) 1085 except Exception as e: 1086 RNS.log("Error while executing callbacks on resource cancel from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1087 1088 def _rejected(self): 1089 if self.status < Resource.COMPLETE: 1090 if self.initiator: 1091 self.status = Resource.REJECTED 1092 self.link.cancel_outgoing_resource(self) 1093 if self.callback != None: 1094 try: 1095 self.link.resource_concluded(self) 1096 self.callback(self) 1097 except Exception as e: 1098 RNS.log("Error while executing callbacks on resource reject from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 1099 1100 def set_callback(self, callback): 1101 self.callback = callback 1102 1103 def progress_callback(self, callback): 1104 self.__progress_callback = callback 1105 1106 def get_progress(self): 1107 """ 1108 :returns: The current progress of the resource transfer as a *float* between 0.0 and 1.0. 1109 """ 1110 if self.status == RNS.Resource.COMPLETE and self.segment_index == self.total_segments: 1111 return 1.0 1112 1113 elif self.initiator: 1114 if not self.split: 1115 self.processed_parts = self.sent_parts 1116 self.progress_total_parts = float(self.total_parts) 1117 1118 else: 1119 is_last_segment = self.segment_index != self.total_segments 1120 total_segments = self.total_segments 1121 processed_segments = self.segment_index-1 1122 1123 current_segment_parts = self.total_parts 1124 max_parts_per_segment = math.ceil(Resource.MAX_EFFICIENT_SIZE/self.sdu) 1125 1126 previously_processed_parts = processed_segments*max_parts_per_segment 1127 1128 if current_segment_parts < max_parts_per_segment: 1129 current_segment_factor = max_parts_per_segment / current_segment_parts 1130 else: 1131 current_segment_factor = 1 1132 1133 self.processed_parts = previously_processed_parts + self.sent_parts*current_segment_factor 1134 self.progress_total_parts = self.total_segments*max_parts_per_segment 1135 1136 else: 1137 if not self.split: 1138 self.processed_parts = self.received_count 1139 self.progress_total_parts = float(self.total_parts) 1140 1141 else: 1142 is_last_segment = self.segment_index != self.total_segments 1143 total_segments = self.total_segments 1144 processed_segments = self.segment_index-1 1145 1146 current_segment_parts = self.total_parts 1147 max_parts_per_segment = math.ceil(Resource.MAX_EFFICIENT_SIZE/self.sdu) 1148 1149 previously_processed_parts = processed_segments*max_parts_per_segment 1150 1151 if current_segment_parts < max_parts_per_segment: 1152 current_segment_factor = max_parts_per_segment / current_segment_parts 1153 else: 1154 current_segment_factor = 1 1155 1156 self.processed_parts = previously_processed_parts + self.received_count*current_segment_factor 1157 self.progress_total_parts = self.total_segments*max_parts_per_segment 1158 1159 1160 progress = min(1.0, self.processed_parts / self.progress_total_parts) 1161 return progress 1162 1163 def get_segment_progress(self): 1164 if self.status == RNS.Resource.COMPLETE and self.segment_index == self.total_segments: 1165 return 1.0 1166 elif self.initiator: 1167 processed_parts = self.sent_parts 1168 else: 1169 processed_parts = self.received_count 1170 1171 progress = min(1.0, processed_parts / self.total_parts) 1172 return progress 1173 1174 def get_transfer_size(self): 1175 """ 1176 :returns: The number of bytes needed to transfer the resource. 1177 """ 1178 return self.size 1179 1180 def get_data_size(self): 1181 """ 1182 :returns: The total data size of the resource. 1183 """ 1184 return self.total_size 1185 1186 def get_parts(self): 1187 """ 1188 :returns: The number of parts the resource will be transferred in. 1189 """ 1190 return self.total_parts 1191 1192 def get_segments(self): 1193 """ 1194 :returns: The number of segments the resource is divided into. 1195 """ 1196 return self.total_segments 1197 1198 def get_hash(self): 1199 """ 1200 :returns: The hash of the resource. 1201 """ 1202 return self.hash 1203 1204 def is_compressed(self): 1205 """ 1206 :returns: Whether the resource is compressed. 1207 """ 1208 return self.compressed 1209 1210 def __str__(self): 1211 return "<"+RNS.hexrep(self.hash,delimit=False)+"/"+RNS.hexrep(self.link.link_id,delimit=False)+">" 1212 1213 1214 class ResourceAdvertisement: 1215 OVERHEAD = 134 1216 HASHMAP_MAX_LEN = math.floor((RNS.Link.MDU-OVERHEAD)/Resource.MAPHASH_LEN) 1217 COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN 1218 1219 assert HASHMAP_MAX_LEN > 0, "The configured MTU is too small to include any map hashes in resource advertisments" 1220 1221 @staticmethod 1222 def is_request(advertisement_packet): 1223 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1224 if adv.q != None and adv.u: 1225 return True 1226 else: 1227 return False 1228 1229 1230 @staticmethod 1231 def is_response(advertisement_packet): 1232 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1233 1234 if adv.q != None and adv.p: 1235 return True 1236 else: 1237 return False 1238 1239 1240 @staticmethod 1241 def read_request_id(advertisement_packet): 1242 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1243 return adv.q 1244 1245 1246 @staticmethod 1247 def read_transfer_size(advertisement_packet): 1248 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1249 return adv.t 1250 1251 1252 @staticmethod 1253 def read_size(advertisement_packet): 1254 adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) 1255 return adv.d 1256 1257 1258 def __init__(self, resource=None, request_id=None, is_response=False): 1259 self.link = None 1260 if resource != None: 1261 self.t = resource.size # Transfer size 1262 self.d = resource.total_size # Total uncompressed data size 1263 self.n = len(resource.parts) # Number of parts 1264 self.h = resource.hash # Resource hash 1265 self.r = resource.random_hash # Resource random hash 1266 self.o = resource.original_hash # First-segment hash 1267 self.m = resource.hashmap # Resource hashmap 1268 self.c = resource.compressed # Compression flag 1269 self.e = resource.encrypted # Encryption flag 1270 self.s = resource.split # Split flag 1271 self.x = resource.has_metadata # Metadata flag 1272 self.i = resource.segment_index # Segment index 1273 self.l = resource.total_segments # Total segments 1274 self.q = resource.request_id # ID of associated request 1275 self.u = False # Is request flag 1276 self.p = False # Is response flag 1277 1278 if self.q != None: 1279 if not resource.is_response: 1280 self.u = True 1281 self.p = False 1282 else: 1283 self.u = False 1284 self.p = True 1285 1286 # Flags 1287 self.f = 0x00 | self.x << 5 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e 1288 1289 def get_transfer_size(self): 1290 return self.t 1291 1292 def get_data_size(self): 1293 return self.d 1294 1295 def get_parts(self): 1296 return self.n 1297 1298 def get_segments(self): 1299 return self.l 1300 1301 def get_hash(self): 1302 return self.h 1303 1304 def is_compressed(self): 1305 return self.c 1306 1307 def has_metadata(self): 1308 return self.x 1309 1310 def get_link(self): 1311 return self.link 1312 1313 def pack(self, segment=0): 1314 hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN 1315 hashmap_end = min((segment+1)*(ResourceAdvertisement.HASHMAP_MAX_LEN), self.n) 1316 1317 hashmap = b"" 1318 for i in range(hashmap_start,hashmap_end): 1319 hashmap += self.m[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] 1320 1321 dictionary = { 1322 "t": self.t, # Transfer size 1323 "d": self.d, # Data size 1324 "n": self.n, # Number of parts 1325 "h": self.h, # Resource hash 1326 "r": self.r, # Resource random hash 1327 "o": self.o, # Original hash 1328 "i": self.i, # Segment index 1329 "l": self.l, # Total segments 1330 "q": self.q, # Request ID 1331 "f": self.f, # Resource flags 1332 "m": hashmap 1333 } 1334 1335 return umsgpack.packb(dictionary) 1336 1337 1338 @staticmethod 1339 def unpack(data): 1340 dictionary = umsgpack.unpackb(data) 1341 1342 adv = ResourceAdvertisement() 1343 adv.t = dictionary["t"] 1344 adv.d = dictionary["d"] 1345 adv.n = dictionary["n"] 1346 adv.h = dictionary["h"] 1347 adv.r = dictionary["r"] 1348 adv.o = dictionary["o"] 1349 adv.m = dictionary["m"] 1350 adv.f = dictionary["f"] 1351 adv.i = dictionary["i"] 1352 adv.l = dictionary["l"] 1353 adv.q = dictionary["q"] 1354 adv.e = True if (adv.f & 0x01) == 0x01 else False 1355 adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False 1356 adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False 1357 adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False 1358 adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False 1359 adv.x = True if ((adv.f >> 5) & 0x01) == 0x01 else False 1360 1361 return adv