block_sync.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::{ 20 helpers::{PeerPair, PrepareSyncRequest, SyncRequest}, 21 locators::BlockLocators, 22 }; 23 use alphaos_node_bft_ledger_service::LedgerService; 24 use alphaos_node_network::PeerPoolHandling; 25 use alphaos_node_router::messages::DataBlocks; 26 use alphaos_node_sync_communication_service::CommunicationService; 27 use alphaos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS}; 28 29 use alphavm::{ 30 console::network::{ConsensusVersion, Network}, 31 prelude::block::Block, 32 utilities::ensure_equals, 33 }; 34 35 use anyhow::{bail, ensure, Result}; 36 use indexmap::{IndexMap, IndexSet}; 37 use itertools::Itertools; 38 #[cfg(feature = "locktick")] 39 use locktick::parking_lot::RwLock; 40 #[cfg(feature = "locktick")] 41 use locktick::tokio::Mutex as TMutex; 42 #[cfg(not(feature = "locktick"))] 43 use parking_lot::RwLock; 44 use rand::seq::{IteratorRandom, SliceRandom}; 45 use std::{ 46 collections::{hash_map, BTreeMap, HashMap, HashSet}, 47 net::{IpAddr, Ipv4Addr, SocketAddr}, 48 sync::Arc, 49 time::{Duration, Instant}, 50 }; 51 #[cfg(not(feature = "locktick"))] 52 use tokio::sync::Mutex as TMutex; 53 use tokio::sync::Notify; 54 55 mod helpers; 56 use helpers::rangify_heights; 57 58 mod sync_state; 59 use sync_state::SyncState; 60 61 mod metrics; 62 use metrics::BlockSyncMetrics; 63 64 // The redundancy factor decreases the possibility of a malicious peers sending us an invalid block locator 65 // by requiring multiple peers to advertise the same (prefix of) block locators. 66 // However, we do not use this in production yet. 67 #[cfg(not(test))] 68 pub const REDUNDANCY_FACTOR: usize = 1; 69 #[cfg(test)] 70 pub const REDUNDANCY_FACTOR: usize = 3; 71 72 /// The time nodes wait between issuing batches of block requests to avoid triggering spam detection. 73 /// 74 /// The current rate limit for all messages is around 160k per second (see [`Gateway::max_cache_events`]). 75 /// This constant limits number of block requests to a much lower 100 per second. 76 /// 77 // TODO(kaimast): base rate limits on how many requests were sent to each peer instead. 78 pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10); 79 80 const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3; 81 const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5; 82 83 const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600); 84 const BLOCK_PROCESSING_TIMEOUT: Duration = Duration::from_secs(120); 85 86 /// The maximum number of outstanding block requests. 87 /// Once a node hits this limit, it will not issue any new requests until existing requests time out or receive responses. 88 const MAX_BLOCK_REQUESTS: usize = 200; // 200 requests (200 * 5 = 1000 blocks) 89 90 /// The maximum number of blocks tolerated before the primary is considered behind its peers. 91 pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks 92 93 /// This is a dummy IP address that is used to represent the local node. 94 /// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections. 95 pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); 96 97 /// Handle to an outstanding requested, containing the request itself and its timestamp. 98 /// This does not contain the response so that checking for responses does not require iterating over all requests. 99 #[derive(Clone)] 100 struct OutstandingRequest<N: Network> { 101 request: SyncRequest<N>, 102 timestamp: Instant, 103 /// The corresponding response (if any). 104 /// This is guaranteed to be Some if sync_ips for the given request are empty. 105 response: Option<Block<N>>, 106 } 107 108 /// Information about a block request (used for the REST API). 109 #[derive(Clone, serde::Serialize)] 110 pub struct BlockRequestInfo { 111 /// Seconds since the request was created 112 elapsed: u64, 113 /// Has the request been responded to? 114 done: bool, 115 } 116 117 /// Summary of completed all in-flight requests. 118 #[derive(Clone, serde::Serialize)] 119 pub struct BlockRequestsSummary { 120 outstanding: String, 121 completed: String, 122 } 123 124 impl<N: Network> OutstandingRequest<N> { 125 /// Get a reference to the IPs of peers that have not responded to the request (yet). 126 fn sync_ips(&self) -> &IndexSet<SocketAddr> { 127 let (_, _, sync_ips) = &self.request; 128 sync_ips 129 } 130 131 /// Get a mutable reference to the IPs of peers that have not responded to the request (yet). 132 fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> { 133 let (_, _, sync_ips) = &mut self.request; 134 sync_ips 135 } 136 } 137 138 /// A struct that keeps track of synchronizing blocks with other nodes. 139 /// 140 /// It generates requests to send to other peers and processes responses to those requests. 141 /// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from. 142 /// 143 /// # Notes 144 /// - The actual network communication happens in `alphaos_node::Client` (for clients and provers) and in `alphaos_node_bft::Sync` (for validators). 145 /// 146 /// - Validators only sync from other nodes using this struct if they fall behind, e.g., 147 /// because they experience a network partition. 148 /// In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved 149 /// by a supermajority of the committee. 150 pub struct BlockSync<N: Network> { 151 /// The ledger. 152 ledger: Arc<dyn LedgerService<N>>, 153 154 /// The map of peer IP to their block locators. 155 /// The block locators are consistent with the ledger and every other peer's block locators. 156 locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>, 157 158 /// The map of peer-to-peer to their common ancestor. 159 /// This map is used to determine which peers to request blocks from. 160 /// 161 /// Lock ordering: when locking both, `common_ancestors` and `locators`, `common_ancestors` must be locked first. 162 common_ancestors: RwLock<IndexMap<PeerPair, u32>>, 163 164 /// The block requests in progress and their responses. 165 requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>, 166 167 /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance). 168 /// 169 /// Lock ordering: if you lock `sync_state` and `requests`, you must lock `sync_state` first. 170 sync_state: RwLock<SyncState>, 171 172 /// The lock used to ensure that [`Self::advance_with_sync_blocks()`] is called by one task at a time. 173 advance_with_sync_blocks_lock: TMutex<()>, 174 175 /// Gets notified when there was an update to the locators or a peer disconnected. 176 peer_notify: Notify, 177 178 /// Gets notified when we received a new block response. 179 response_notify: Notify, 180 181 /// Tracks sync speed 182 metrics: BlockSyncMetrics, 183 } 184 185 impl<N: Network> BlockSync<N> { 186 /// Initializes a new block sync module. 187 pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self { 188 // Make sync state aware of the blocks that already exist on disk at startup. 189 let sync_state = SyncState::new_with_height(ledger.latest_block_height()); 190 191 Self { 192 ledger, 193 sync_state: RwLock::new(sync_state), 194 peer_notify: Default::default(), 195 response_notify: Default::default(), 196 locators: Default::default(), 197 requests: Default::default(), 198 common_ancestors: Default::default(), 199 advance_with_sync_blocks_lock: Default::default(), 200 metrics: Default::default(), 201 } 202 } 203 204 /// Blocks until something about a peer changes, 205 /// or block request has been fully processed (either successfully or unsuccessfully). 206 /// 207 /// Used by the outgoing task. 208 pub async fn wait_for_peer_update(&self) { 209 self.peer_notify.notified().await 210 } 211 212 /// Blocks until there is a new response to a block request. 213 /// 214 /// Used by the incoming task. 215 pub async fn wait_for_block_responses(&self) { 216 self.response_notify.notified().await 217 } 218 219 /// Returns `true` if the node is synced up to the latest block (within the given tolerance). 220 #[inline] 221 pub fn is_block_synced(&self) -> bool { 222 self.sync_state.read().is_block_synced() 223 } 224 225 /// Returns `true` if there a blocks to fetch or responses to process. 226 /// 227 /// This will always return true if [`Self::is_block_synced`] returns false, 228 /// but it can return true when [`Self::is_block_synced`] returns true 229 /// (due to the latter having a tolerance of one block). 230 #[inline] 231 pub fn can_block_sync(&self) -> bool { 232 self.sync_state.read().can_block_sync() || self.has_pending_responses() 233 } 234 235 /// Returns the number of blocks the node is behind the greatest peer height, 236 /// or `None` if no peers are connected yet. 237 #[inline] 238 pub fn num_blocks_behind(&self) -> Option<u32> { 239 self.sync_state.read().num_blocks_behind() 240 } 241 242 /// Returns the greatest block height of any connected peer. 243 #[inline] 244 pub fn greatest_peer_block_height(&self) -> Option<u32> { 245 self.sync_state.read().get_greatest_peer_height() 246 } 247 248 /// Returns the current sync height of this node. 249 /// The sync height is always greater or equal to the ledger height. 250 #[inline] 251 pub fn get_sync_height(&self) -> u32 { 252 self.sync_state.read().get_sync_height() 253 } 254 255 /// Returns the number of blocks we requested from peers, but have not received yet. 256 #[inline] 257 pub fn num_outstanding_block_requests(&self) -> usize { 258 self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count() 259 } 260 261 /// The total number of block request, including the ones that have been answered already but not processed yet. 262 #[inline] 263 pub fn num_total_block_requests(&self) -> usize { 264 self.requests.read().len() 265 } 266 267 //// Returns the latest locator height for all known peers. 268 pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> { 269 self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect() 270 } 271 272 //// Returns information about all in-flight block requests. 273 pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> { 274 self.requests 275 .read() 276 .iter() 277 .map(|(height, request)| { 278 (*height, BlockRequestInfo { 279 done: request.sync_ips().is_empty(), 280 elapsed: request.timestamp.elapsed().as_secs(), 281 }) 282 }) 283 .collect() 284 } 285 286 /// Returns a summary of all in-flight requests. 287 pub fn get_block_requests_summary(&self) -> BlockRequestsSummary { 288 let completed = self 289 .requests 290 .read() 291 .iter() 292 .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None }) 293 .collect::<Vec<_>>(); 294 295 let outstanding = self 296 .requests 297 .read() 298 .iter() 299 .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None }) 300 .collect::<Vec<_>>(); 301 302 BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) } 303 } 304 305 pub fn get_sync_speed(&self) -> f64 { 306 self.metrics.get_sync_speed() 307 } 308 } 309 310 // Helper functions needed for testing 311 #[cfg(test)] 312 impl<N: Network> BlockSync<N> { 313 /// Returns the latest block height of the given peer IP. 314 fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> { 315 self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height()) 316 } 317 318 /// Returns the common ancestor for the given peer pair, if it exists. 319 fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> { 320 self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied() 321 } 322 323 /// Returns the block request for the given height, if it exists. 324 fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> { 325 self.requests.read().get(&height).map(|e| e.request.clone()) 326 } 327 328 /// Returns the timestamp of the last time the block was requested, if it exists. 329 fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> { 330 self.requests.read().get(&height).map(|e| e.timestamp) 331 } 332 } 333 334 impl<N: Network> BlockSync<N> { 335 /// Returns the block locators. 336 #[inline] 337 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> { 338 // Retrieve the latest block height. 339 let latest_height = self.ledger.latest_block_height(); 340 341 // Initialize the recents map. 342 // TODO: generalize this for RECENT_INTERVAL > 1, or remove this comment if we hardwire that to 1 343 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS); 344 // Retrieve the recent block hashes. 345 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height { 346 recents.insert(height, self.ledger.get_block_hash(height)?); 347 } 348 349 // Initialize the checkpoints map. 350 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?); 351 // Retrieve the checkpoint block hashes. 352 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) { 353 checkpoints.insert(height, self.ledger.get_block_hash(height)?); 354 } 355 356 // Construct the block locators. 357 BlockLocators::new(recents, checkpoints) 358 } 359 360 /// Returns true if there are pending responses to block requests that need to be processed. 361 pub fn has_pending_responses(&self) -> bool { 362 self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0 363 } 364 365 /// Send a batch of block requests. 366 pub async fn send_block_requests<C: CommunicationService>( 367 &self, 368 communication: &C, 369 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>, 370 requests: &[(u32, PrepareSyncRequest<N>)], 371 ) -> bool { 372 let (start_height, max_num_sync_ips) = match requests.first() { 373 Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips), 374 None => { 375 warn!("Block sync failed - no block requests"); 376 return false; 377 } 378 }; 379 380 debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys()); 381 382 // Use a randomly sampled subset of the sync IPs. 383 let sync_ips: IndexSet<_> = 384 sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect(); 385 386 // Calculate the end height. 387 let end_height = start_height.saturating_add(requests.len() as u32); 388 389 // Insert the chunk of block requests. 390 for (height, (hash, previous_hash, _)) in requests.iter() { 391 // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk. 392 if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) { 393 warn!("Block sync failed - {error}"); 394 return false; 395 } 396 } 397 398 /* Send the block request to the peers */ 399 400 // Construct the message. 401 let message = C::prepare_block_request(start_height, end_height); 402 403 // Send the message to the peers. 404 let mut tasks = Vec::with_capacity(sync_ips.len()); 405 for sync_ip in sync_ips { 406 let sender = communication.send(sync_ip, message.clone()).await; 407 let task = tokio::spawn(async move { 408 // Ensure the request is sent successfully. 409 match sender { 410 Some(sender) => { 411 if let Err(err) = sender.await { 412 warn!("Failed to send block request to peer '{sync_ip}': {err}"); 413 false 414 } else { 415 true 416 } 417 } 418 None => { 419 warn!("Failed to send block request to peer '{sync_ip}': no such peer"); 420 false 421 } 422 } 423 }); 424 425 tasks.push(task); 426 } 427 428 // Wait for all sends to finish at the same time. 429 for result in futures::future::join_all(tasks).await { 430 let success = match result { 431 Ok(success) => success, 432 Err(err) => { 433 error!("tokio join error: {err}"); 434 false 435 } 436 }; 437 438 // If sending fails for any peer, remove the block request from the sync pool. 439 if !success { 440 // Remove the entire block request from the sync pool. 441 let mut requests = self.requests.write(); 442 for height in start_height..end_height { 443 requests.remove(&height); 444 } 445 // Break out of the loop. 446 return false; 447 } 448 } 449 true 450 } 451 452 /// Inserts a new block response from the given peer IP. 453 /// 454 /// Returns an error if the block was malformed, or we already received a different block for this height. 455 /// This function also removes all block requests from the given peer IP on failure. 456 /// 457 /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`. 458 /// 459 #[inline] 460 pub fn insert_block_responses( 461 &self, 462 peer_ip: SocketAddr, 463 blocks: Vec<Block<N>>, 464 latest_consensus_version: Option<ConsensusVersion>, 465 ) -> Result<()> { 466 let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else { 467 bail!("Empty block response"); 468 }; 469 470 let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?; 471 472 // Perform consensus version check, if possible. 473 // This check is only enabled after nodes have reached V12. 474 if expected_consensus_version >= ConsensusVersion::V12 { 475 if let Some(latest_consensus_version) = latest_consensus_version { 476 ensure_equals!( 477 expected_consensus_version, 478 latest_consensus_version, 479 "the peer's consensus version for height {last_height} does not match ours" 480 ); 481 } else { 482 bail!("The peer did not send a consensus version"); 483 } 484 } 485 486 // Insert the candidate blocks into the sync pool. 487 for block in blocks { 488 if let Err(error) = self.insert_block_response(peer_ip, block) { 489 self.remove_block_requests_to_peer(&peer_ip); 490 bail!("{error}"); 491 } 492 } 493 Ok(()) 494 } 495 496 /// Returns the next block for the given `next_height` if the request is complete, 497 /// or `None` otherwise. This does not remove the block from the `responses` map. 498 #[inline] 499 pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> { 500 // Determine if the request is complete: 501 // either there is no request for `next_height`, or the request has no peer socket addresses left. 502 if let Some(entry) = self.requests.read().get(&next_height) { 503 let is_complete = entry.sync_ips().is_empty(); 504 if !is_complete { 505 return None; 506 } 507 508 // If the request is complete, return the block from the responses, if there is one. 509 if entry.response.is_none() { 510 warn!("Request for height {next_height} is complete but no response exists"); 511 } 512 entry.response.clone() 513 } else { 514 None 515 } 516 } 517 518 /// Attempts to advance synchronization by processing completed block responses. 519 /// 520 /// Returns true, if new blocks were added to the ledger. 521 /// 522 /// # Usage 523 /// This is only called in [`Client::try_block_sync`] and should not be called concurrently by multiple tasks. 524 /// Validators do not call this function, and instead invoke 525 /// [`alphaos_node_bft::Sync::try_advancing_block_synchronization`] which also updates the BFT state. 526 #[inline] 527 pub async fn try_advancing_block_synchronization(&self) -> Result<bool> { 528 // Acquire the lock to ensure this function is called only once at a time. 529 // If the lock is already acquired, return early. 530 // 531 // Note: This lock should not be needed anymore as there is only one place we call it from, 532 // but we keep it for now out of caution. 533 // TODO(kaimast): remove this eventually. 534 let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else { 535 trace!("Skipping attempt to advance block synchronziation as it is already in progress"); 536 return Ok(false); 537 }; 538 539 // Start with the current height. 540 let mut current_height = self.ledger.latest_block_height(); 541 let start_height = current_height; 542 trace!( 543 "Try advancing with block responses (at block {current_height}, current sync speed is {})", 544 self.get_sync_speed() 545 ); 546 547 loop { 548 let next_height = current_height + 1; 549 550 let Some(block) = self.peek_next_block(next_height) else { 551 break; 552 }; 553 554 // Ensure the block height matches. 555 if block.height() != next_height { 556 warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height()); 557 break; 558 } 559 560 let ledger = self.ledger.clone(); 561 let advanced = tokio::task::spawn_blocking(move || { 562 // Try to check the next block and advance to it. 563 match ledger.check_next_block(&block) { 564 Ok(_) => match ledger.advance_to_next_block(&block) { 565 Ok(_) => true, 566 Err(err) => { 567 warn!( 568 "Failed to advance to next block (height: {}, hash: '{}'): {err}", 569 block.height(), 570 block.hash() 571 ); 572 false 573 } 574 }, 575 Err(err) => { 576 warn!( 577 "The next block (height: {}, hash: '{}') is invalid - {err}", 578 block.height(), 579 block.hash() 580 ); 581 false 582 } 583 } 584 }) 585 .await?; 586 587 // Only count successful requests. 588 if advanced { 589 self.count_request_completed(); 590 } 591 592 // Remove the block response. 593 self.remove_block_response(next_height); 594 595 // If advancing failed, exit the loop. 596 if !advanced { 597 break; 598 } 599 600 // Update the latest height. 601 current_height = next_height; 602 } 603 604 if current_height > start_height { 605 self.set_sync_height(current_height); 606 Ok(true) 607 } else { 608 Ok(false) 609 } 610 } 611 } 612 613 impl<N: Network> BlockSync<N> { 614 /// Returns the sync peers with their latest heights, and their minimum common ancestor, if the node can sync. 615 /// This function returns peers that are consistent with each other, and have a block height 616 /// that is greater than the ledger height of this node. 617 /// 618 /// # Locking 619 /// This will read-lock `common_ancestors` and `sync_state`, but not at the same time. 620 pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> { 621 // Retrieve the current sync height. 622 let current_height = self.get_sync_height(); 623 624 if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) { 625 // Map the locators into the latest height. 626 let sync_peers = 627 sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect(); 628 // Return the sync peers and their minimum common ancestor. 629 Some((sync_peers, min_common_ancestor)) 630 } else { 631 None 632 } 633 } 634 635 /// Updates the block locators and common ancestors for the given peer IP. 636 /// 637 /// This function does not need to check that the block locators are well-formed, 638 /// because that is already done in [`BlockLocators::new()`], as noted in [`BlockLocators`]. 639 /// 640 /// This function does **not** check 641 /// that the block locators are consistent with the peer's previous block locators or other peers' block locators. 642 pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> { 643 // Update the locators entry for the given peer IP. 644 // We perform this update atomically, and drop the lock as soon as we are done with the update. 645 match self.locators.write().entry(peer_ip) { 646 hash_map::Entry::Occupied(mut e) => { 647 // Return early if the block locators did not change. 648 if e.get() == locators { 649 return Ok(()); 650 } 651 652 let old_height = e.get().latest_locator_height(); 653 let new_height = locators.latest_locator_height(); 654 655 if old_height > new_height { 656 debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",); 657 } 658 e.insert(locators.clone()); 659 } 660 hash_map::Entry::Vacant(e) => { 661 e.insert(locators.clone()); 662 } 663 } 664 665 // Compute the common ancestor with this node. 666 let new_local_ancestor = { 667 let mut ancestor = 0; 668 // Attention: Please do not optimize this loop, as it performs fork-detection. In addition, 669 // by iterating upwards, it also early-terminates malicious block locators at the *first* point 670 // of bifurcation in their ledger history, which is a critical safety guarantee provided here. 671 for (height, hash) in locators.clone().into_iter() { 672 if let Ok(ledger_hash) = self.ledger.get_block_hash(height) { 673 match ledger_hash == hash { 674 true => ancestor = height, 675 false => { 676 debug!("Detected fork with peer \"{peer_ip}\" at height {height}"); 677 break; 678 } 679 } 680 } 681 } 682 ancestor 683 }; 684 685 // Compute the common ancestor with every other peer. 686 // Do not hold write lock to `common_ancestors` here, because this can take a while with many peers. 687 let ancestor_updates: Vec<_> = self 688 .locators 689 .read() 690 .iter() 691 .filter_map(|(other_ip, other_locators)| { 692 // Skip if the other peer is the given peer. 693 if other_ip == &peer_ip { 694 return None; 695 } 696 // Compute the common ancestor with the other peer. 697 let mut ancestor = 0; 698 for (height, hash) in other_locators.clone().into_iter() { 699 if let Some(expected_hash) = locators.get_hash(height) { 700 match expected_hash == hash { 701 true => ancestor = height, 702 false => { 703 debug!( 704 "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}" 705 ); 706 break; 707 } 708 } 709 } 710 } 711 712 Some((PeerPair(peer_ip, *other_ip), ancestor)) 713 }) 714 .collect(); 715 716 // Update the map of common ancestors. 717 // Scope the lock, so it is dropped before locking `sync_state`. 718 { 719 let mut common_ancestors = self.common_ancestors.write(); 720 common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor); 721 722 for (peer_pair, new_ancestor) in ancestor_updates.into_iter() { 723 common_ancestors.insert(peer_pair, new_ancestor); 724 } 725 } 726 727 // Update sync state, because the greatest peer height may have decreased. 728 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() { 729 self.sync_state.write().set_greatest_peer_height(greatest_peer_height); 730 } else { 731 error!("Got new block locators but greatest peer height is zero."); 732 } 733 734 // Notify the sync loop that something changed. 735 self.peer_notify.notify_one(); 736 737 Ok(()) 738 } 739 740 /// TODO (howardwu): Remove the `common_ancestor` entry. But check that this is safe 741 /// (that we don't rely upon it for safety when we re-connect with the same peer). 742 /// Removes the peer from the sync pool, if they exist. 743 pub fn remove_peer(&self, peer_ip: &SocketAddr) { 744 trace!("Removing peer {peer_ip} from block sync"); 745 746 // Remove the locators entry for the given peer IP. 747 self.locators.write().remove(peer_ip); 748 // Remove all common ancestor entries for this peers. 749 self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip)); 750 // Remove all block requests to the peer. 751 self.remove_block_requests_to_peer(peer_ip); 752 753 // Update sync state, because the greatest peer height may have decreased. 754 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() { 755 self.sync_state.write().set_greatest_peer_height(greatest_peer_height); 756 } else { 757 // There are no more peers left. 758 self.sync_state.write().clear_greatest_peer_height(); 759 } 760 761 // Notify the sync loop that something changed. 762 self.peer_notify.notify_one(); 763 } 764 } 765 766 // Helper type for prepare_block_requests 767 pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>); 768 769 impl<N: Network> BlockSync<N> { 770 /// Returns a list of block requests and the sync peers, if the node needs to sync. 771 /// 772 /// You usually want to call `remove_timed_out_block_requests` before invoking this function. 773 /// 774 /// # Concurrency 775 /// This should be called by at most one task at a time. 776 /// 777 /// # Usage 778 /// - For validators, the primary spawns exactly one task that periodically calls 779 /// `bft::Sync::try_issuing_block_requests`. There is no possibility of concurrent calls to it. 780 /// - For clients, `Client::initialize_sync` spawn exactly one task that periodically calls 781 /// `Client::try_issuing_block_requests` which calls this function. 782 /// - Provers do not call this function. 783 pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> { 784 // Used to print more information when we max out on requests. 785 let print_requests = || { 786 if tracing::enabled!(tracing::Level::TRACE) { 787 let summary = self.get_block_requests_summary(); 788 789 trace!("The following requests are complete but not processed yet: {:?}", summary.completed); 790 trace!("The following requests are still outstanding: {:?}", summary.outstanding); 791 } 792 }; 793 794 // Do not hold lock here as, currently, `find_sync_peers_inner` can take a while. 795 let current_height = self.get_sync_height(); 796 797 // Ensure to not exceed the maximum number of outstanding block requests. 798 let max_outstanding_block_requests = 799 (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32); 800 801 // Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet. 802 let max_total_requests = 4 * max_outstanding_block_requests; 803 804 let max_new_blocks_to_request = 805 max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32); 806 807 // Prepare the block requests and sync peers, or returns an empty result if there is nothing to request. 808 if self.num_total_block_requests() >= max_total_requests as usize { 809 trace!( 810 "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more." 811 ); 812 813 print_requests(); 814 Default::default() 815 } else if max_new_blocks_to_request == 0 { 816 trace!( 817 "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more." 818 ); 819 820 print_requests(); 821 Default::default() 822 } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) { 823 // Retrieve the greatest block height of any connected peer. 824 // We do not need to update the sync state here, as that already happens when the block locators are received. 825 let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0); 826 827 // Construct the list of block requests. 828 let requests = self.construct_requests( 829 &sync_peers, 830 current_height, 831 min_common_ancestor, 832 max_new_blocks_to_request, 833 greatest_peer_height, 834 ); 835 836 (requests, sync_peers) 837 } else if self.requests.read().is_empty() { 838 // This can happen during a race condition where the node just finished syncing. 839 // It does not make sense to log or change the sync status here. 840 // Checking the sync status here also does not make sense, as the node might as well have switched back 841 // from `synced` to `syncing` between calling `find_sync_peers_inner` and this line. 842 843 Default::default() 844 } else { 845 // This happens if we already requested all advertised blocks. 846 trace!("No new blocks can be requested, but there are still outstanding requests."); 847 848 print_requests(); 849 Default::default() 850 } 851 } 852 853 /// Should only be called by validators when they successfully process a block request. 854 /// (for other nodes this will be automatically called internally) 855 /// 856 /// TODO(kaimast): remove this public function once the sync logic is fully unified `BlockSync`. 857 pub fn count_request_completed(&self) { 858 self.metrics.count_request_completed(); 859 } 860 861 /// Set the sync height to a the given value. 862 /// This is a no-op if `new_height` is equal or less to the current sync height. 863 pub fn set_sync_height(&self, new_height: u32) { 864 // Scope state lock to avoid locking state and metrics at the same time. 865 let fully_synced = { 866 let mut state = self.sync_state.write(); 867 state.set_sync_height(new_height); 868 !state.can_block_sync() 869 }; 870 871 if fully_synced { 872 self.metrics.mark_fully_synced(); 873 } 874 } 875 876 /// Inserts a block request for the given height. 877 fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> { 878 // Ensure the block request does not already exist. 879 self.check_block_request(height)?; 880 // Ensure the sync IPs are not empty. 881 ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs"); 882 // Insert the block request. 883 self.requests.write().insert(height, OutstandingRequest { 884 request: (hash, previous_hash, sync_ips), 885 timestamp: Instant::now(), 886 response: None, 887 }); 888 Ok(()) 889 } 890 891 /// Inserts the given block response, after checking that the request exists and the response is well-formed. 892 /// On success, this function removes the peer IP from the request sync peers and inserts the response. 893 fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> { 894 // Retrieve the block height. 895 let height = block.height(); 896 let mut requests = self.requests.write(); 897 898 if self.ledger.contains_block_height(height) { 899 bail!("The sync request was removed because we already advanced"); 900 } 901 902 let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") }; 903 904 // Retrieve the request entry for the candidate block. 905 let (expected_hash, expected_previous_hash, sync_ips) = &entry.request; 906 907 // Ensure the candidate block hash matches the expected hash. 908 if let Some(expected_hash) = expected_hash { 909 if block.hash() != *expected_hash { 910 bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect") 911 } 912 } 913 // Ensure the previous block hash matches if it exists. 914 if let Some(expected_previous_hash) = expected_previous_hash { 915 if block.previous_hash() != *expected_previous_hash { 916 bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect") 917 } 918 } 919 // Ensure the sync pool requested this block from the given peer. 920 if !sync_ips.contains(&peer_ip) { 921 bail!("The sync pool did not request block {height} from '{peer_ip}'") 922 } 923 924 // Remove the peer IP from the request entry. 925 entry.sync_ips_mut().swap_remove(&peer_ip); 926 927 if let Some(existing_block) = &entry.response { 928 // If the candidate block was already present, ensure it is the same block. 929 if block != *existing_block { 930 bail!("Candidate block {height} from '{peer_ip}' is malformed"); 931 } 932 } else { 933 entry.response = Some(block.clone()); 934 } 935 936 trace!("Received a new and valid block response for height {height}"); 937 938 // Notify the sync loop that something changed. 939 self.response_notify.notify_one(); 940 941 Ok(()) 942 } 943 944 /// Checks that a block request for the given height does not already exist. 945 fn check_block_request(&self, height: u32) -> Result<()> { 946 // Ensure the block height is not already in the ledger. 947 if self.ledger.contains_block_height(height) { 948 bail!("Failed to add block request, as block {height} exists in the ledger"); 949 } 950 // Ensure the block height is not already requested. 951 if self.requests.read().contains_key(&height) { 952 bail!("Failed to add block request, as block {height} exists in the requests map"); 953 } 954 955 Ok(()) 956 } 957 958 /// Removes the block request and response for the given height 959 /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete. 960 /// 961 /// Precondition: This may only be called after `peek_next_block` has returned `Some`, 962 /// which has checked if the request for the given height is complete 963 /// and there is a block with the given `height` in the `responses` map. 964 pub fn remove_block_response(&self, height: u32) { 965 // Remove the request entry for the given height. 966 if let Some(e) = self.requests.write().remove(&height) { 967 trace!( 968 "Block request for height {height} was completed in {}ms (sync speed is {})", 969 e.timestamp.elapsed().as_millis(), 970 self.get_sync_speed() 971 ); 972 973 // Notify the sending task that less requests are in-flight. 974 self.peer_notify.notify_one(); 975 } 976 } 977 978 /// Removes all block requests for the given peer IP. 979 /// 980 /// This is used when disconnecting from a peer or when a peer sends invalid block responses. 981 fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) { 982 trace!("Block sync is removing all block requests to peer {peer_ip}..."); 983 984 // Remove the peer IP from the requests map. If any request entry is now empty, 985 // and its corresponding response entry is also empty, then remove that request entry altogether. 986 self.requests.write().retain(|height, e| { 987 let had_peer = e.sync_ips_mut().swap_remove(peer_ip); 988 989 // Only remove requests that were sent to this peer, that have no other peer that can respond instead, 990 // and that were not completed yet. 991 let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some(); 992 if !retain { 993 trace!("Removed block request timestamp for {peer_ip} at height {height}"); 994 } 995 retain 996 }); 997 998 // No need to remove responses here, because requests with responses will be retained. 999 } 1000 1001 /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time. 1002 /// 1003 /// This removes the corresponding block responses and returns the set of peers/addresses that timed out. 1004 /// It will ask the peer pool handling service to ban any timed-out peers. 1005 /// 1006 /// # Return Value 1007 /// On success it will return `None` if there is nothing to re-request, or a set of new of block requests that replaced the timed-out requests. 1008 /// This set of new requests can also replace requests that timed out earlier, and which we were not able to re-request yet. 1009 /// 1010 /// This function will return an error if it cannot re-request blocks due to a lack of peers. 1011 /// In this case, the current iteration of block synchronization should not continue and the node should re-try later instead. 1012 pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>( 1013 &self, 1014 _peer_pool_handler: &P, 1015 ) -> Result<Option<BlockRequestBatch<N>>> { 1016 // Acquire the write lock on the requests map. 1017 let mut requests = self.requests.write(); 1018 1019 // Retrieve the current time. 1020 let now = Instant::now(); 1021 1022 // Retrieve the current block height 1023 let current_height = self.ledger.latest_block_height(); 1024 1025 // Track the number of timed out block requests (only used to print a log message). 1026 let mut timed_out_requests = vec![]; 1027 1028 // Track which peers should be banned due to unresponsiveness. 1029 let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new(); 1030 1031 // Remove timed out block requests. 1032 requests.retain(|height, e| { 1033 let is_obsolete = *height <= current_height; 1034 // Determine if the request is incomplete. 1035 let is_complete = e.sync_ips().is_empty(); 1036 // Determine if timeouts have been exceeded. 1037 let peer_timeout_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT; 1038 let processing_timeout_elapsed = now.duration_since(e.timestamp) > BLOCK_PROCESSING_TIMEOUT; 1039 1040 // Determine if the request has timed out. 1041 // Timeout if: 1042 // 1. Waiting for peers AND peer timeout exceeded, OR 1043 // 2. All responses received AND processing timeout exceeded 1044 let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed); 1045 1046 // Retain if this is not a timeout and is not obsolete. 1047 let retain = !is_timeout && !is_obsolete; 1048 1049 if is_timeout { 1050 if is_complete { 1051 warn!( 1052 "Block {} processing timeout: unprocessed for {}s (all peers responded)", 1053 height, 1054 e.timestamp.elapsed().as_secs() 1055 ); 1056 } else { 1057 warn!( 1058 "Block {} peer timeout: waiting on {:?} after {}s", 1059 height, 1060 e.sync_ips(), 1061 e.timestamp.elapsed().as_secs() 1062 ); 1063 } 1064 1065 // Increment the number of timed out block requests. 1066 timed_out_requests.push(*height); 1067 } else if is_obsolete { 1068 trace!("Block request at height {height} became obsolete (current_height={current_height})"); 1069 } 1070 1071 // If the request timed out, also remove and ban given peer. 1072 if is_timeout { 1073 for peer_ip in e.sync_ips().iter() { 1074 peers_to_ban.insert(*peer_ip); 1075 } 1076 } 1077 1078 retain 1079 }); 1080 1081 if !timed_out_requests.is_empty() { 1082 debug!("{num} block requests timed out", num = timed_out_requests.len()); 1083 } 1084 1085 let next_request_height = requests.iter().next().map(|(h, _)| *h); 1086 1087 // Avoid locking `locators` and `requests` at the same time. 1088 drop(requests); 1089 1090 // Now remove and ban any unresponsive peers 1091 for peer_ip in peers_to_ban { 1092 self.remove_peer(&peer_ip); 1093 // TODO: Uncomment this when we have a more rigorous analysis and testing of peer banning. 1094 // peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests")); 1095 } 1096 1097 // Determine if we need to re-issue any timed-out requests. 1098 // If there are no requests remaining or no gap at the beginning, 1099 // we do not need to re-issue requests and will just issue them regularly. 1100 // 1101 // This needs to be checked even if timed_out_requests is empty, because we might not be able to re-issue 1102 // requests immediately if there are no other peers at a given time. 1103 // Further, this only closes the first gap. So multiple calls to this might be needed. 1104 let sync_height = self.get_sync_height(); 1105 let start_height = sync_height + 1; 1106 1107 let end_height = if let Some(next_height) = next_request_height { 1108 if next_height > start_height { 1109 // The end height is exclusive, so use the height of the first existing block requests as the end 1110 next_height 1111 } else { 1112 // Nothing to do. 1113 // Do not log here as this check happens frequently. 1114 return Ok(None); 1115 } 1116 } else { 1117 // Nothing to do. 1118 // Do not log here as this check happens frequently. 1119 return Ok(None); 1120 }; 1121 1122 // Set the maximum number of blocks, so that they do not exceed the end height. 1123 let max_new_blocks_to_request = end_height - start_height; 1124 1125 let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else { 1126 // This generally shouldn't happen, because there cannot be outstanding requests when no peers are connected. 1127 bail!("Cannot re-request blocks because no or not enough peers are connected"); 1128 }; 1129 1130 // Retrieve the greatest block height of any connected peer. 1131 let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else { 1132 // This should never happen because `sync_peers` is guaranteed to be non-empty. 1133 bail!("Cannot re-request blocks because no or not enough peers are connected"); 1134 }; 1135 1136 // (Try to) construct the requests. 1137 let requests = self.construct_requests( 1138 &sync_peers, 1139 sync_height, 1140 min_common_ancestor, 1141 max_new_blocks_to_request, 1142 greatest_peer_height, 1143 ); 1144 1145 // If the ledger advanced concurrenctly, there may be no requests to issue after all. 1146 // The given height may also be greater `start_height` due to concurerent block advancement. 1147 if let Some((height, _)) = requests.as_slice().first() { 1148 debug!("Re-requesting blocks starting at height {height}"); 1149 Ok(Some((requests, sync_peers))) 1150 } else { 1151 // Do not log here as this constitutes a benign race condition. 1152 Ok(None) 1153 } 1154 } 1155 1156 /// Finds the peers to sync from and the shared common ancestor, starting at the give height. 1157 /// 1158 /// Unlike [`Self::find_sync_peers`] this does not only return the latest locators height, but the full BlockLocators for each peer. 1159 /// Returns `None` if there are no peers to sync from. 1160 /// 1161 /// # Locking 1162 /// This function will read-lock `common_ancestors`. 1163 fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> { 1164 // Retrieve the latest ledger height. 1165 let latest_ledger_height = self.ledger.latest_block_height(); 1166 1167 // Pick a set of peers above the latest ledger height, and include their locators. 1168 // This will sort the peers by locator height in descending order. 1169 let candidate_locators: IndexMap<_, _> = self 1170 .locators 1171 .read() 1172 .iter() 1173 .filter(|(_, locators)| locators.latest_locator_height() > current_height) 1174 .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height())) 1175 .take(NUM_SYNC_CANDIDATE_PEERS) 1176 .map(|(peer_ip, locators)| (*peer_ip, locators.clone())) 1177 .collect(); 1178 1179 // Case 0: If there are no candidate peers, return `None`. 1180 if candidate_locators.is_empty() { 1181 trace!("Found no sync peers with height greater {current_height}"); 1182 return None; 1183 } 1184 1185 // TODO (howardwu): Change this to the highest cumulative weight for Phase 3. 1186 // Case 1: If all of the candidate peers share a common ancestor below the latest ledger height, 1187 // then pick the peer with the highest height, and find peers (up to extra redundancy) with 1188 // a common ancestor above the block request range. Set the end height to their common ancestor. 1189 1190 // Determine the threshold number of peers to sync from. 1191 let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR); 1192 1193 // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height 1194 // and a cohort of peers who share a common ancestor above this node's latest ledger height. 1195 for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() { 1196 // The height of the common ancestor shared by all selected peers. 1197 let mut min_common_ancestor = peer_locators.latest_locator_height(); 1198 1199 // The peers we will synchronize from. 1200 // As the previous iteration did not succeed, restart with the next candidate peers. 1201 let mut sync_peers = vec![(*peer_ip, peer_locators.clone())]; 1202 1203 // Try adding other peers consistent with this one to the sync peer set. 1204 for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) { 1205 // Check if these two peers have a common ancestor above the latest ledger height. 1206 if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) { 1207 // If so, then check that their block locators are consistent. 1208 if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) { 1209 // If their common ancestor is less than the minimum common ancestor, then update it. 1210 min_common_ancestor = min_common_ancestor.min(*common_ancestor); 1211 1212 // Add the other peer to the list of sync peers. 1213 sync_peers.push((*other_ip, other_locators.clone())); 1214 } 1215 } 1216 } 1217 1218 // If we have enough sync peers above the latest ledger height, finish and return them. 1219 if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request { 1220 // Shuffle the sync peers prior to returning. This ensures the rest of the stack 1221 // does not rely on the order of the sync peers, and that the sync peers are not biased. 1222 sync_peers.shuffle(&mut rand::thread_rng()); 1223 1224 // Collect into an IndexMap and return. 1225 return Some((sync_peers.into_iter().collect(), min_common_ancestor)); 1226 } 1227 } 1228 1229 // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None. 1230 None 1231 } 1232 1233 /// Given the sync peers and their minimum common ancestor, return a list of block requests. 1234 fn construct_requests( 1235 &self, 1236 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>, 1237 sync_height: u32, 1238 min_common_ancestor: u32, 1239 max_blocks_to_request: u32, 1240 greatest_peer_height: u32, 1241 ) -> Vec<(u32, PrepareSyncRequest<N>)> { 1242 // Compute the start height for the block requests. 1243 let start_height = { 1244 let requests = self.requests.read(); 1245 let ledger_height = self.ledger.latest_block_height(); 1246 1247 // Do not issue requests for blocks already contained in the ledger. 1248 let mut start_height = ledger_height.max(sync_height + 1); 1249 1250 // Do not issue requests that already exist. 1251 while requests.contains_key(&start_height) { 1252 start_height += 1; 1253 } 1254 1255 start_height 1256 }; 1257 1258 // If the minimum common ancestor is below the start height, then return early. 1259 if min_common_ancestor < start_height { 1260 if start_height < greatest_peer_height { 1261 trace!( 1262 "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})" 1263 ); 1264 } 1265 return Default::default(); 1266 } 1267 1268 // Compute the end height for the block request. 1269 let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request); 1270 1271 // Construct the block hashes to request. 1272 let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len()); 1273 // Track the largest number of sync IPs required for any block request in the sequence of requests. 1274 let mut max_num_sync_ips = 1; 1275 1276 for height in start_height..end_height { 1277 // Ensure the current height is not in the ledger or already requested. 1278 if let Err(err) = self.check_block_request(height) { 1279 trace!("{err}"); 1280 1281 // If the sequence of block requests is interrupted, then return early. 1282 // Otherwise, continue until the first start height that is new. 1283 match request_hashes.is_empty() { 1284 true => continue, 1285 false => break, 1286 } 1287 } 1288 1289 // Construct the block request. 1290 let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers); 1291 1292 // Handle the dishonest case. 1293 if !is_honest { 1294 // TODO (howardwu): Consider performing an integrity check on peers (to disconnect). 1295 warn!("Detected dishonest peer(s) when preparing block request"); 1296 // If there are not enough peers in the dishonest case, then return early. 1297 if sync_peers.len() < num_sync_ips { 1298 break; 1299 } 1300 } 1301 1302 // Update the maximum number of sync IPs. 1303 max_num_sync_ips = max_num_sync_ips.max(num_sync_ips); 1304 1305 // Append the request. 1306 request_hashes.insert(height, (hash, previous_hash)); 1307 } 1308 1309 // Construct the requests with the same sync ips. 1310 request_hashes 1311 .into_iter() 1312 .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips))) 1313 .collect() 1314 } 1315 } 1316 1317 /// If any peer is detected to be dishonest in this function, it will not set the hash or previous hash, 1318 /// in order to allow the caller to determine what to do. 1319 fn construct_request<N: Network>( 1320 height: u32, 1321 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>, 1322 ) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) { 1323 let mut hash = None; 1324 let mut hash_redundancy: usize = 0; 1325 let mut previous_hash = None; 1326 let mut is_honest = true; 1327 1328 for peer_locators in sync_peers.values() { 1329 if let Some(candidate_hash) = peer_locators.get_hash(height) { 1330 match hash { 1331 // Increment the redundancy count if the hash matches. 1332 Some(hash) if hash == candidate_hash => hash_redundancy += 1, 1333 // Some peer is dishonest. 1334 Some(_) => { 1335 hash = None; 1336 hash_redundancy = 0; 1337 previous_hash = None; 1338 is_honest = false; 1339 break; 1340 } 1341 // Set the hash if it is not set. 1342 None => { 1343 hash = Some(candidate_hash); 1344 hash_redundancy = 1; 1345 } 1346 } 1347 } 1348 if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) { 1349 match previous_hash { 1350 // Increment the redundancy count if the previous hash matches. 1351 Some(previous_hash) if previous_hash == candidate_previous_hash => (), 1352 // Some peer is dishonest. 1353 Some(_) => { 1354 hash = None; 1355 hash_redundancy = 0; 1356 previous_hash = None; 1357 is_honest = false; 1358 break; 1359 } 1360 // Set the previous hash if it is not set. 1361 None => previous_hash = Some(candidate_previous_hash), 1362 } 1363 } 1364 } 1365 1366 // Note that we intentionally do not just pick the peers that have the hash we have chosen, 1367 // to give stronger confidence that we are syncing during times when the network is consistent/stable. 1368 let num_sync_ips = { 1369 // Extra redundant peers - as the block hash was dishonest. 1370 if !is_honest { 1371 // Choose up to the extra redundancy factor in sync peers. 1372 EXTRA_REDUNDANCY_FACTOR 1373 } 1374 // No redundant peers - as we have redundancy on the block hash. 1375 else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR { 1376 // Choose one sync peer. 1377 1 1378 } 1379 // Redundant peers - as we do not have redundancy on the block hash. 1380 else { 1381 // Choose up to the redundancy factor in sync peers. 1382 REDUNDANCY_FACTOR 1383 } 1384 }; 1385 1386 (hash, previous_hash, num_sync_ips, is_honest) 1387 } 1388 1389 #[cfg(test)] 1390 mod tests { 1391 use super::*; 1392 use crate::locators::{ 1393 test_helpers::{sample_block_locators, sample_block_locators_with_fork}, 1394 CHECKPOINT_INTERVAL, 1395 NUM_RECENT_BLOCKS, 1396 }; 1397 1398 use alphaos_node_bft_ledger_service::MockLedgerService; 1399 use alphaos_node_network::{NodeType, Peer, Resolver}; 1400 use alphaos_node_tcp::{Tcp, P2P}; 1401 use alphavm::{ 1402 ledger::committee::Committee, 1403 prelude::{Field, TestRng}, 1404 }; 1405 1406 use indexmap::{indexset, IndexSet}; 1407 #[cfg(feature = "locktick")] 1408 use locktick::parking_lot::RwLock; 1409 #[cfg(not(feature = "locktick"))] 1410 use parking_lot::RwLock; 1411 use rand::Rng; 1412 use std::net::{IpAddr, Ipv4Addr}; 1413 1414 type CurrentNetwork = alphavm::prelude::MainnetV0; 1415 1416 #[derive(Default)] 1417 struct DummyPeerPoolHandler { 1418 peers_to_ban: RwLock<Vec<SocketAddr>>, 1419 } 1420 1421 impl P2P for DummyPeerPoolHandler { 1422 fn tcp(&self) -> &Tcp { 1423 unreachable!(); 1424 } 1425 } 1426 1427 impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler { 1428 const MAXIMUM_POOL_SIZE: usize = 10; 1429 const OWNER: &str = "[DummyPeerPoolHandler]"; 1430 const PEER_SLASHING_COUNT: usize = 0; 1431 1432 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> { 1433 unreachable!(); 1434 } 1435 1436 fn resolver(&self) -> &RwLock<Resolver<N>> { 1437 unreachable!(); 1438 } 1439 1440 fn is_dev(&self) -> bool { 1441 true 1442 } 1443 1444 fn trusted_peers_only(&self) -> bool { 1445 false 1446 } 1447 1448 fn node_type(&self) -> NodeType { 1449 NodeType::Client 1450 } 1451 1452 fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) { 1453 self.peers_to_ban.write().push(listener_addr); 1454 } 1455 } 1456 1457 /// Returns the peer IP for the sync pool. 1458 fn sample_peer_ip(id: u16) -> SocketAddr { 1459 assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)"); 1460 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id) 1461 } 1462 1463 /// Returns a sample committee. 1464 fn sample_committee() -> Committee<CurrentNetwork> { 1465 let rng = &mut TestRng::default(); 1466 alphavm::ledger::committee::test_helpers::sample_committee(rng) 1467 } 1468 1469 /// Returns the ledger service, initialized to the given height. 1470 fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> { 1471 MockLedgerService::new_at_height(sample_committee(), height) 1472 } 1473 1474 /// Returns the sync pool, with the ledger initialized to the given height. 1475 fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> { 1476 BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height))) 1477 } 1478 1479 /// Returns a vector of randomly sampled block heights in [0, max_height]. 1480 /// 1481 /// The maximum value will always be included in the result. 1482 fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> { 1483 assert!(num_values > 0, "Cannot generate an empty vector"); 1484 assert!((max_height as usize) >= num_values); 1485 1486 let mut rng = TestRng::default(); 1487 1488 let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values); 1489 1490 heights.push(max_height); 1491 1492 heights 1493 } 1494 1495 /// Returns a duplicate (deep copy) of the sync pool with a different ledger height. 1496 fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> { 1497 BlockSync::<CurrentNetwork> { 1498 peer_notify: Notify::new(), 1499 response_notify: Default::default(), 1500 ledger: Arc::new(sample_ledger_service(height)), 1501 locators: RwLock::new(sync.locators.read().clone()), 1502 common_ancestors: RwLock::new(sync.common_ancestors.read().clone()), 1503 requests: RwLock::new(sync.requests.read().clone()), 1504 sync_state: RwLock::new(sync.sync_state.read().clone()), 1505 advance_with_sync_blocks_lock: Default::default(), 1506 metrics: Default::default(), 1507 } 1508 } 1509 1510 /// Checks that the sync pool (starting at genesis) returns the correct requests. 1511 fn check_prepare_block_requests( 1512 sync: BlockSync<CurrentNetwork>, 1513 min_common_ancestor: u32, 1514 peers: IndexSet<SocketAddr>, 1515 ) { 1516 let rng = &mut TestRng::default(); 1517 1518 // Check test assumptions are met. 1519 assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis"); 1520 1521 // Determine the number of peers within range of this sync pool. 1522 let num_peers_within_recent_range_of_ledger = { 1523 // If no peers are within range, then set to 0. 1524 if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 { 1525 0 1526 } 1527 // Otherwise, manually check the number of peers within range. 1528 else { 1529 peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count() 1530 } 1531 }; 1532 1533 // Prepare the block requests. 1534 let (requests, sync_peers) = sync.prepare_block_requests(); 1535 1536 // If there are no peers, then there should be no requests. 1537 if peers.is_empty() { 1538 assert!(requests.is_empty()); 1539 return; 1540 } 1541 1542 // Otherwise, there should be requests. 1543 let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS); 1544 assert_eq!(requests.len(), expected_num_requests); 1545 1546 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() { 1547 // Construct the sync IPs. 1548 let sync_ips: IndexSet<_> = 1549 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1550 assert_eq!(height, 1 + idx as u32); 1551 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into())); 1552 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into())); 1553 1554 if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR { 1555 assert_eq!(sync_ips.len(), 1); 1556 } else { 1557 assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger); 1558 assert_eq!(sync_ips, peers); 1559 } 1560 } 1561 } 1562 1563 /// Tests that height and hash values are set correctly using many different maximum block heights. 1564 #[test] 1565 fn test_latest_block_height() { 1566 for height in generate_block_heights(100_001, 5000) { 1567 let sync = sample_sync_at_height(height); 1568 // Check that the latest block height is the maximum height. 1569 assert_eq!(sync.ledger.latest_block_height(), height); 1570 1571 // Check the hash to height mapping 1572 assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0); 1573 assert_eq!( 1574 sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(), 1575 height 1576 ); 1577 } 1578 } 1579 1580 #[test] 1581 fn test_get_block_hash() { 1582 for height in generate_block_heights(100_001, 5000) { 1583 let sync = sample_sync_at_height(height); 1584 1585 // Check the height to hash mapping 1586 assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into()); 1587 assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into()); 1588 } 1589 } 1590 1591 #[test] 1592 fn test_prepare_block_requests() { 1593 for num_peers in 0..111 { 1594 println!("Testing with {num_peers} peers"); 1595 1596 let sync = sample_sync_at_height(0); 1597 1598 let mut peers = indexset![]; 1599 1600 for peer_id in 1..=num_peers { 1601 // Add a peer. 1602 sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap(); 1603 // Add the peer to the set of peers. 1604 peers.insert(sample_peer_ip(peer_id)); 1605 } 1606 1607 // If all peers are ahead, then requests should be prepared. 1608 check_prepare_block_requests(sync, 10, peers); 1609 } 1610 } 1611 1612 #[test] 1613 fn test_prepare_block_requests_with_leading_fork_at_11() { 1614 let sync = sample_sync_at_height(0); 1615 1616 // Intuitively, peer 1's fork is above peer 2 and peer 3's height. 1617 // So from peer 2 and peer 3's perspective, they don't even realize that peer 1 is on a fork. 1618 // Thus, you can sync up to block 10 from any of the 3 peers. 1619 1620 // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 11, 1621 // then the sync pool should request blocks 1..=10 from the NUM_REDUNDANCY peers. 1622 // This is safe because the leading fork is at 11, and the sync pool is at 0, 1623 // so all candidate peers are at least 10 blocks ahead of the sync pool. 1624 1625 // Add a peer (fork). 1626 let peer_1 = sample_peer_ip(1); 1627 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap(); 1628 1629 // Add a peer. 1630 let peer_2 = sample_peer_ip(2); 1631 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap(); 1632 1633 // Add a peer. 1634 let peer_3 = sample_peer_ip(3); 1635 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap(); 1636 1637 // Prepare the block requests. 1638 let (requests, _) = sync.prepare_block_requests(); 1639 assert_eq!(requests.len(), 10); 1640 1641 // Check the requests. 1642 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() { 1643 assert_eq!(height, 1 + idx as u32); 1644 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into())); 1645 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into())); 1646 assert_eq!(num_sync_ips, 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash. 1647 } 1648 } 1649 1650 #[test] 1651 fn test_prepare_block_requests_with_leading_fork_at_10() { 1652 let rng = &mut TestRng::default(); 1653 let sync = sample_sync_at_height(0); 1654 1655 // Intuitively, peer 1's fork is at peer 2 and peer 3's height. 1656 // So from peer 2 and peer 3's perspective, they recognize that peer 1 has forked. 1657 // Thus, you don't have NUM_REDUNDANCY peers to sync to block 10. 1658 // 1659 // Now, while you could in theory sync up to block 9 from any of the 3 peers, 1660 // we choose not to do this as either side is likely to disconnect from us, 1661 // and we would rather wait for enough redundant peers before syncing. 1662 1663 // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 10, 1664 // then the sync pool should not request blocks as 1 peer conflicts with the other NUM_REDUNDANCY-1 peers. 1665 // We choose to sync with a cohort of peers that are *consistent* with each other, 1666 // and prioritize from descending heights (so the highest peer gets priority). 1667 1668 // Add a peer (fork). 1669 let peer_1 = sample_peer_ip(1); 1670 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap(); 1671 1672 // Add a peer. 1673 let peer_2 = sample_peer_ip(2); 1674 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap(); 1675 1676 // Add a peer. 1677 let peer_3 = sample_peer_ip(3); 1678 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap(); 1679 1680 // Prepare the block requests. 1681 let (requests, _) = sync.prepare_block_requests(); 1682 assert_eq!(requests.len(), 0); 1683 1684 // When there are NUM_REDUNDANCY+1 peers ahead, and 1 is on a fork, then there should be block requests. 1685 1686 // Add a peer. 1687 let peer_4 = sample_peer_ip(4); 1688 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap(); 1689 1690 // Prepare the block requests. 1691 let (requests, sync_peers) = sync.prepare_block_requests(); 1692 assert_eq!(requests.len(), 10); 1693 1694 // Check the requests. 1695 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() { 1696 // Construct the sync IPs. 1697 let sync_ips: IndexSet<_> = 1698 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1699 assert_eq!(height, 1 + idx as u32); 1700 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into())); 1701 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into())); 1702 assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash. 1703 assert_ne!(sync_ips[0], peer_1); // It should never be the forked peer. 1704 } 1705 } 1706 1707 #[test] 1708 fn test_prepare_block_requests_with_trailing_fork_at_9() { 1709 let rng = &mut TestRng::default(); 1710 let sync = sample_sync_at_height(0); 1711 1712 // Peer 1 and 2 diverge from peer 3 at block 10. We only sync when there are NUM_REDUNDANCY peers 1713 // who are *consistent* with each other. So if you add a 4th peer that is consistent with peer 1 and 2, 1714 // then you should be able to sync up to block 10, thereby biasing away from peer 3. 1715 1716 // Add a peer (fork). 1717 let peer_1 = sample_peer_ip(1); 1718 sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap(); 1719 1720 // Add a peer. 1721 let peer_2 = sample_peer_ip(2); 1722 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap(); 1723 1724 // Add a peer. 1725 let peer_3 = sample_peer_ip(3); 1726 sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap(); 1727 1728 // Prepare the block requests. 1729 let (requests, _) = sync.prepare_block_requests(); 1730 assert_eq!(requests.len(), 0); 1731 1732 // When there are NUM_REDUNDANCY+1 peers ahead, and peer 3 is on a fork, then there should be block requests. 1733 1734 // Add a peer. 1735 let peer_4 = sample_peer_ip(4); 1736 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap(); 1737 1738 // Prepare the block requests. 1739 let (requests, sync_peers) = sync.prepare_block_requests(); 1740 assert_eq!(requests.len(), 10); 1741 1742 // Check the requests. 1743 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() { 1744 // Construct the sync IPs. 1745 let sync_ips: IndexSet<_> = 1746 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1747 assert_eq!(height, 1 + idx as u32); 1748 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into())); 1749 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into())); 1750 assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash. 1751 assert_ne!(sync_ips[0], peer_3); // It should never be the forked peer. 1752 } 1753 } 1754 1755 #[test] 1756 fn test_insert_block_requests() { 1757 let rng = &mut TestRng::default(); 1758 let sync = sample_sync_at_height(0); 1759 1760 // Add a peer. 1761 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap(); 1762 1763 // Prepare the block requests. 1764 let (requests, sync_peers) = sync.prepare_block_requests(); 1765 assert_eq!(requests.len(), 10); 1766 1767 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() { 1768 // Construct the sync IPs. 1769 let sync_ips: IndexSet<_> = 1770 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1771 // Insert the block request. 1772 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap(); 1773 // Check that the block requests were inserted. 1774 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips))); 1775 assert!(sync.get_block_request_timestamp(height).is_some()); 1776 } 1777 1778 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() { 1779 // Construct the sync IPs. 1780 let sync_ips: IndexSet<_> = 1781 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1782 // Check that the block requests are still inserted. 1783 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips))); 1784 assert!(sync.get_block_request_timestamp(height).is_some()); 1785 } 1786 1787 for (height, (hash, previous_hash, num_sync_ips)) in requests { 1788 // Construct the sync IPs. 1789 let sync_ips: IndexSet<_> = 1790 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1791 // Ensure that the block requests cannot be inserted twice. 1792 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err(); 1793 // Check that the block requests are still inserted. 1794 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips))); 1795 assert!(sync.get_block_request_timestamp(height).is_some()); 1796 } 1797 } 1798 1799 #[test] 1800 fn test_insert_block_requests_fails() { 1801 let sync = sample_sync_at_height(9); 1802 1803 // Add a peer. 1804 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap(); 1805 1806 // Inserting a block height that is already in the ledger should fail. 1807 sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err(); 1808 // Inserting a block height that is not in the ledger should succeed. 1809 sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap(); 1810 } 1811 1812 #[test] 1813 fn test_update_peer_locators() { 1814 let sync = sample_sync_at_height(0); 1815 1816 // Test 2 peers. 1817 let peer1_ip = sample_peer_ip(1); 1818 for peer1_height in 0..500u32 { 1819 sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap(); 1820 assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height)); 1821 1822 let peer2_ip = sample_peer_ip(2); 1823 for peer2_height in 0..500u32 { 1824 println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}"); 1825 1826 sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap(); 1827 assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height)); 1828 1829 // Compute the distance between the peers. 1830 let distance = peer1_height.abs_diff(peer2_height); 1831 1832 // Check the common ancestor. 1833 if distance < NUM_RECENT_BLOCKS as u32 { 1834 let expected_ancestor = core::cmp::min(peer1_height, peer2_height); 1835 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor)); 1836 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor)); 1837 } else { 1838 let min_checkpoints = 1839 core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL); 1840 let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL; 1841 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor)); 1842 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor)); 1843 } 1844 } 1845 } 1846 } 1847 1848 #[test] 1849 fn test_remove_peer() { 1850 let sync = sample_sync_at_height(0); 1851 1852 let peer_ip = sample_peer_ip(1); 1853 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap(); 1854 assert_eq!(sync.get_peer_height(&peer_ip), Some(100)); 1855 1856 sync.remove_peer(&peer_ip); 1857 assert_eq!(sync.get_peer_height(&peer_ip), None); 1858 1859 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap(); 1860 assert_eq!(sync.get_peer_height(&peer_ip), Some(200)); 1861 1862 sync.remove_peer(&peer_ip); 1863 assert_eq!(sync.get_peer_height(&peer_ip), None); 1864 } 1865 1866 #[test] 1867 fn test_locators_insert_remove_insert() { 1868 let sync = sample_sync_at_height(0); 1869 1870 let peer_ip = sample_peer_ip(1); 1871 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap(); 1872 assert_eq!(sync.get_peer_height(&peer_ip), Some(100)); 1873 1874 sync.remove_peer(&peer_ip); 1875 assert_eq!(sync.get_peer_height(&peer_ip), None); 1876 1877 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap(); 1878 assert_eq!(sync.get_peer_height(&peer_ip), Some(200)); 1879 } 1880 1881 #[test] 1882 fn test_requests_insert_remove_insert() { 1883 let rng = &mut TestRng::default(); 1884 let sync = sample_sync_at_height(0); 1885 1886 // Add a peer. 1887 let peer_ip = sample_peer_ip(1); 1888 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap(); 1889 1890 // Prepare the block requests. 1891 let (requests, sync_peers) = sync.prepare_block_requests(); 1892 assert_eq!(requests.len(), 10); 1893 1894 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() { 1895 // Construct the sync IPs. 1896 let sync_ips: IndexSet<_> = 1897 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1898 // Insert the block request. 1899 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap(); 1900 // Check that the block requests were inserted. 1901 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips))); 1902 assert!(sync.get_block_request_timestamp(height).is_some()); 1903 } 1904 1905 // Remove the peer. 1906 sync.remove_peer(&peer_ip); 1907 1908 for (height, _) in requests { 1909 // Check that the block requests were removed. 1910 assert_eq!(sync.get_block_request(height), None); 1911 assert!(sync.get_block_request_timestamp(height).is_none()); 1912 } 1913 1914 // As there is no peer, it should not be possible to prepare block requests. 1915 let (requests, _) = sync.prepare_block_requests(); 1916 assert_eq!(requests.len(), 0); 1917 1918 // Add the peer again. 1919 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap(); 1920 1921 // Prepare the block requests. 1922 let (requests, _) = sync.prepare_block_requests(); 1923 assert_eq!(requests.len(), 10); 1924 1925 for (height, (hash, previous_hash, num_sync_ips)) in requests { 1926 // Construct the sync IPs. 1927 let sync_ips: IndexSet<_> = 1928 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1929 // Insert the block request. 1930 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap(); 1931 // Check that the block requests were inserted. 1932 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips))); 1933 assert!(sync.get_block_request_timestamp(height).is_some()); 1934 } 1935 } 1936 1937 #[test] 1938 fn test_obsolete_block_requests() { 1939 let rng = &mut TestRng::default(); 1940 let sync = sample_sync_at_height(0); 1941 1942 let locator_height = rng.gen_range(0..50); 1943 1944 // Add a peer. 1945 let locators = sample_block_locators(locator_height); 1946 sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap(); 1947 1948 // Construct block requests 1949 let (requests, sync_peers) = sync.prepare_block_requests(); 1950 assert_eq!(requests.len(), locator_height as usize); 1951 1952 // Add the block requests to the sync module. 1953 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() { 1954 // Construct the sync IPs. 1955 let sync_ips: IndexSet<_> = 1956 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect(); 1957 // Insert the block request. 1958 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap(); 1959 // Check that the block requests were inserted. 1960 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips))); 1961 assert!(sync.get_block_request_timestamp(height).is_some()); 1962 } 1963 1964 // Duplicate a new sync module with a different height to simulate block advancement. 1965 // This range needs to be inclusive, so that the range is never empty, 1966 // even with a locator height of 0. 1967 let ledger_height = rng.gen_range(0..=locator_height); 1968 let new_sync = duplicate_sync_at_new_height(&sync, ledger_height); 1969 1970 // Check that the number of requests is the same. 1971 assert_eq!(new_sync.requests.read().len(), requests.len()); 1972 1973 // Remove timed out block requests. 1974 let c = DummyPeerPoolHandler::default(); 1975 new_sync.handle_block_request_timeouts(&c).unwrap(); 1976 1977 // Check that the number of requests is reduced based on the ledger height. 1978 assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize); 1979 } 1980 1981 #[test] 1982 fn test_timed_out_block_request() { 1983 let sync = sample_sync_at_height(0); 1984 let peer_ip = sample_peer_ip(1); 1985 let locators = sample_block_locators(10); 1986 let block_hash = locators.get_hash(1); 1987 1988 sync.update_peer_locators(peer_ip, &locators).unwrap(); 1989 1990 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1); 1991 1992 // Add a timed-out request 1993 sync.requests.write().insert(1, OutstandingRequest { 1994 request: (block_hash, None, [peer_ip].into()), 1995 timestamp, 1996 response: None, 1997 }); 1998 1999 assert_eq!(sync.requests.read().len(), 1); 2000 assert_eq!(sync.locators.read().len(), 1); 2001 2002 // Remove timed out block requests. 2003 let c = DummyPeerPoolHandler::default(); 2004 sync.handle_block_request_timeouts(&c).unwrap(); 2005 2006 // let ban_list = c.peers_to_ban.write(); 2007 // assert_eq!(ban_list.len(), 1); 2008 // assert_eq!(ban_list.iter().next(), Some(&peer_ip)); 2009 2010 assert!(sync.requests.read().is_empty()); 2011 assert!(sync.locators.read().is_empty()); 2012 } 2013 2014 #[test] 2015 fn test_reissue_timed_out_block_request() { 2016 let sync = sample_sync_at_height(0); 2017 let peer_ip1 = sample_peer_ip(1); 2018 let peer_ip2 = sample_peer_ip(2); 2019 let peer_ip3 = sample_peer_ip(3); 2020 2021 let locators = sample_block_locators(10); 2022 let block_hash1 = locators.get_hash(1); 2023 let block_hash2 = locators.get_hash(2); 2024 2025 sync.update_peer_locators(peer_ip1, &locators).unwrap(); 2026 sync.update_peer_locators(peer_ip2, &locators).unwrap(); 2027 sync.update_peer_locators(peer_ip3, &locators).unwrap(); 2028 2029 assert_eq!(sync.locators.read().len(), 3); 2030 2031 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1); 2032 2033 // Add a timed-out request 2034 sync.requests.write().insert(1, OutstandingRequest { 2035 request: (block_hash1, None, [peer_ip1].into()), 2036 timestamp, 2037 response: None, 2038 }); 2039 2040 // Add a timed-out request 2041 sync.requests.write().insert(2, OutstandingRequest { 2042 request: (block_hash2, None, [peer_ip2].into()), 2043 timestamp: Instant::now(), 2044 response: None, 2045 }); 2046 2047 assert_eq!(sync.requests.read().len(), 2); 2048 2049 // Remove timed out block requests. 2050 let c = DummyPeerPoolHandler::default(); 2051 2052 let re_requests = sync.handle_block_request_timeouts(&c).unwrap(); 2053 2054 // let ban_list = c.peers_to_ban.write(); 2055 // assert_eq!(ban_list.len(), 1); 2056 // assert_eq!(ban_list.iter().next(), Some(&peer_ip1)); 2057 2058 assert_eq!(sync.requests.read().len(), 1); 2059 assert_eq!(sync.locators.read().len(), 2); 2060 2061 let (new_requests, new_sync_ips) = re_requests.unwrap(); 2062 assert_eq!(new_requests.len(), 1); 2063 2064 let (height, (hash, _, _)) = new_requests.first().unwrap(); 2065 assert_eq!(*height, 1); 2066 assert_eq!(*hash, block_hash1); 2067 assert_eq!(new_sync_ips.len(), 2); 2068 2069 // Make sure the removed peer is not in the sync_peer set. 2070 let mut iter = new_sync_ips.iter(); 2071 assert_ne!(iter.next().unwrap().0, &peer_ip1); 2072 assert_ne!(iter.next().unwrap().0, &peer_ip1); 2073 } 2074 2075 #[test] 2076 fn test_processing_timeout_logic() { 2077 // Test that a completed request (all peers responded) times out after BLOCK_PROCESSING_TIMEOUT 2078 let now = Instant::now(); 2079 let old_timestamp = now - Duration::from_secs(150); // 150s ago (exceeds 120s processing timeout) 2080 2081 // Simulate a completed request (empty sync_ips means all peers responded) 2082 let is_complete = true; // sync_ips().is_empty() 2083 let peer_timeout_elapsed = old_timestamp.elapsed() > BLOCK_REQUEST_TIMEOUT; // false (150s < 600s) 2084 let processing_timeout_elapsed = old_timestamp.elapsed() > BLOCK_PROCESSING_TIMEOUT; // true (150s > 120s) 2085 2086 let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed); 2087 2088 assert!(!peer_timeout_elapsed, "Should not exceed peer timeout yet"); 2089 assert!(processing_timeout_elapsed, "Should exceed processing timeout"); 2090 assert!(is_timeout, "Should timeout due to processing delay"); 2091 } 2092 2093 #[test] 2094 fn test_peer_timeout_logic() { 2095 // Test that an incomplete request (waiting for peers) times out after BLOCK_REQUEST_TIMEOUT 2096 let now = Instant::now(); 2097 let old_timestamp = now - Duration::from_secs(700); // 700s ago (exceeds 600s peer timeout) 2098 2099 // Simulate an incomplete request (non-empty sync_ips means waiting for peers) 2100 let is_complete = false; // !sync_ips().is_empty() 2101 let peer_timeout_elapsed = old_timestamp.elapsed() > BLOCK_REQUEST_TIMEOUT; // true (700s > 600s) 2102 let processing_timeout_elapsed = old_timestamp.elapsed() > BLOCK_PROCESSING_TIMEOUT; // true (700s > 120s) 2103 2104 let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed); 2105 2106 assert!(peer_timeout_elapsed, "Should exceed peer timeout"); 2107 assert!(processing_timeout_elapsed, "Should also exceed processing timeout"); 2108 assert!(is_timeout, "Should timeout due to peer delay"); 2109 } 2110 2111 #[test] 2112 fn test_no_timeout_when_fresh() { 2113 // Test that a recent request does not timeout 2114 let now = Instant::now(); 2115 let recent_timestamp = now - Duration::from_secs(10); // 10s ago (fresh) 2116 2117 // Test both completed and incomplete states 2118 for is_complete in [true, false] { 2119 let peer_timeout_elapsed = recent_timestamp.elapsed() > BLOCK_REQUEST_TIMEOUT; // false 2120 let processing_timeout_elapsed = recent_timestamp.elapsed() > BLOCK_PROCESSING_TIMEOUT; // false 2121 2122 let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed); 2123 2124 assert!(!peer_timeout_elapsed, "Should not exceed peer timeout"); 2125 assert!(!processing_timeout_elapsed, "Should not exceed processing timeout"); 2126 assert!(!is_timeout, "Fresh request should not timeout (is_complete={})", is_complete); 2127 } 2128 } 2129 2130 #[test] 2131 fn test_timeout_boundary_conditions() { 2132 // Test boundary conditions for both timeouts 2133 let now = Instant::now(); 2134 2135 // Just before processing timeout (119s) 2136 let just_before_processing = now - Duration::from_secs(119); 2137 let is_complete = true; 2138 let processing_timeout_elapsed = just_before_processing.elapsed() > BLOCK_PROCESSING_TIMEOUT; 2139 let is_timeout = is_complete && processing_timeout_elapsed; 2140 assert!(!is_timeout, "Should not timeout at 119s for processing"); 2141 2142 // Just after processing timeout (121s) 2143 let just_after_processing = now - Duration::from_secs(121); 2144 let processing_timeout_elapsed = just_after_processing.elapsed() > BLOCK_PROCESSING_TIMEOUT; 2145 let is_timeout = is_complete && processing_timeout_elapsed; 2146 assert!(is_timeout, "Should timeout at 121s for processing"); 2147 2148 // Just before peer timeout (599s) 2149 let just_before_peer = now - Duration::from_secs(599); 2150 let is_complete = false; 2151 let peer_timeout_elapsed = just_before_peer.elapsed() > BLOCK_REQUEST_TIMEOUT; 2152 let is_timeout = !is_complete && peer_timeout_elapsed; 2153 assert!(!is_timeout, "Should not timeout at 599s for peer response"); 2154 2155 // Just after peer timeout (601s) 2156 let just_after_peer = now - Duration::from_secs(601); 2157 let peer_timeout_elapsed = just_after_peer.elapsed() > BLOCK_REQUEST_TIMEOUT; 2158 let is_timeout = !is_complete && peer_timeout_elapsed; 2159 assert!(is_timeout, "Should timeout at 601s for peer response"); 2160 } 2161 }