/ node / sync / src / block_sync.rs
block_sync.rs
   1  // Copyright (c) 2025-2026 ACDC Network
   2  // This file is part of the alphaos library.
   3  //
   4  // Alpha Chain | Delta Chain Protocol
   5  // International Monetary Graphite.
   6  //
   7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
   8  // They built world-class ZK infrastructure. We installed the EASY button.
   9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
  10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
  11  //
  12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
  13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
  14  // No rights reserved. No permission required. No warranty. No refunds.
  15  //
  16  // https://creativecommons.org/publicdomain/zero/1.0/
  17  // SPDX-License-Identifier: CC0-1.0
  18  
  19  use crate::{
  20      helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
  21      locators::BlockLocators,
  22  };
  23  use alphaos_node_bft_ledger_service::LedgerService;
  24  use alphaos_node_network::PeerPoolHandling;
  25  use alphaos_node_router::messages::DataBlocks;
  26  use alphaos_node_sync_communication_service::CommunicationService;
  27  use alphaos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
  28  
  29  use alphavm::{
  30      console::network::{ConsensusVersion, Network},
  31      prelude::block::Block,
  32      utilities::ensure_equals,
  33  };
  34  
  35  use anyhow::{bail, ensure, Result};
  36  use indexmap::{IndexMap, IndexSet};
  37  use itertools::Itertools;
  38  #[cfg(feature = "locktick")]
  39  use locktick::parking_lot::RwLock;
  40  #[cfg(feature = "locktick")]
  41  use locktick::tokio::Mutex as TMutex;
  42  #[cfg(not(feature = "locktick"))]
  43  use parking_lot::RwLock;
  44  use rand::seq::{IteratorRandom, SliceRandom};
  45  use std::{
  46      collections::{hash_map, BTreeMap, HashMap, HashSet},
  47      net::{IpAddr, Ipv4Addr, SocketAddr},
  48      sync::Arc,
  49      time::{Duration, Instant},
  50  };
  51  #[cfg(not(feature = "locktick"))]
  52  use tokio::sync::Mutex as TMutex;
  53  use tokio::sync::Notify;
  54  
  55  mod helpers;
  56  use helpers::rangify_heights;
  57  
  58  mod sync_state;
  59  use sync_state::SyncState;
  60  
  61  mod metrics;
  62  use metrics::BlockSyncMetrics;
  63  
  64  // The redundancy factor decreases the possibility of a malicious peers sending us an invalid block locator
  65  // by requiring multiple peers to advertise the same (prefix of) block locators.
  66  // However, we do not use this in production yet.
  67  #[cfg(not(test))]
  68  pub const REDUNDANCY_FACTOR: usize = 1;
  69  #[cfg(test)]
  70  pub const REDUNDANCY_FACTOR: usize = 3;
  71  
  72  /// The time nodes wait between issuing batches of block requests to avoid triggering spam detection.
  73  ///
  74  /// The current rate limit for all messages is around 160k  per second (see [`Gateway::max_cache_events`]).
  75  /// This constant limits number of block requests to a much lower 100 per second.
  76  ///
  77  // TODO(kaimast): base rate limits on how many requests were sent to each peer instead.
  78  pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
  79  
  80  const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
  81  const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
  82  
  83  const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
  84  const BLOCK_PROCESSING_TIMEOUT: Duration = Duration::from_secs(120);
  85  
  86  /// The maximum number of outstanding block requests.
  87  /// Once a node hits this limit, it will not issue any new requests until existing requests time out or receive responses.
  88  const MAX_BLOCK_REQUESTS: usize = 200; // 200 requests (200 * 5 = 1000 blocks)
  89  
  90  /// The maximum number of blocks tolerated before the primary is considered behind its peers.
  91  pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks
  92  
  93  /// This is a dummy IP address that is used to represent the local node.
  94  /// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections.
  95  pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
  96  
  97  /// Handle to an outstanding requested, containing the request itself and its timestamp.
  98  /// This does not contain the response so that checking for responses does not require iterating over all requests.
  99  #[derive(Clone)]
 100  struct OutstandingRequest<N: Network> {
 101      request: SyncRequest<N>,
 102      timestamp: Instant,
 103      /// The corresponding response (if any).
 104      /// This is guaranteed to be Some if sync_ips for the given request are empty.
 105      response: Option<Block<N>>,
 106  }
 107  
 108  /// Information about a block request (used for the REST API).
 109  #[derive(Clone, serde::Serialize)]
 110  pub struct BlockRequestInfo {
 111      /// Seconds since the request was created
 112      elapsed: u64,
 113      /// Has the request been responded to?
 114      done: bool,
 115  }
 116  
 117  /// Summary of completed all in-flight requests.
 118  #[derive(Clone, serde::Serialize)]
 119  pub struct BlockRequestsSummary {
 120      outstanding: String,
 121      completed: String,
 122  }
 123  
 124  impl<N: Network> OutstandingRequest<N> {
 125      /// Get a reference to the IPs of peers that have not responded to the request (yet).
 126      fn sync_ips(&self) -> &IndexSet<SocketAddr> {
 127          let (_, _, sync_ips) = &self.request;
 128          sync_ips
 129      }
 130  
 131      /// Get a mutable reference to the IPs of peers that have not responded to the request (yet).
 132      fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
 133          let (_, _, sync_ips) = &mut self.request;
 134          sync_ips
 135      }
 136  }
 137  
 138  /// A struct that keeps track of synchronizing blocks with other nodes.
 139  ///
 140  /// It generates requests to send to other peers and processes responses to those requests.
 141  /// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from.
 142  ///
 143  /// # Notes
 144  /// - The actual network communication happens in `alphaos_node::Client` (for clients and provers) and in `alphaos_node_bft::Sync` (for validators).
 145  ///
 146  /// - Validators only sync from other nodes using this struct if they fall behind, e.g.,
 147  ///   because they experience a network partition.
 148  ///   In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved
 149  ///   by a supermajority of the committee.
 150  pub struct BlockSync<N: Network> {
 151      /// The ledger.
 152      ledger: Arc<dyn LedgerService<N>>,
 153  
 154      /// The map of peer IP to their block locators.
 155      /// The block locators are consistent with the ledger and every other peer's block locators.
 156      locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
 157  
 158      /// The map of peer-to-peer to their common ancestor.
 159      /// This map is used to determine which peers to request blocks from.
 160      ///
 161      /// Lock ordering: when locking both, `common_ancestors` and `locators`, `common_ancestors` must be locked first.
 162      common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
 163  
 164      /// The block requests in progress and their responses.
 165      requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
 166  
 167      /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
 168      ///
 169      /// Lock ordering: if you lock `sync_state` and `requests`, you must lock `sync_state` first.
 170      sync_state: RwLock<SyncState>,
 171  
 172      /// The lock used to ensure that [`Self::advance_with_sync_blocks()`] is called by one task at a time.
 173      advance_with_sync_blocks_lock: TMutex<()>,
 174  
 175      /// Gets notified when there was an update to the locators or a peer disconnected.
 176      peer_notify: Notify,
 177  
 178      /// Gets notified when we received a new block response.
 179      response_notify: Notify,
 180  
 181      /// Tracks sync speed
 182      metrics: BlockSyncMetrics,
 183  }
 184  
 185  impl<N: Network> BlockSync<N> {
 186      /// Initializes a new block sync module.
 187      pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
 188          // Make sync state aware of the blocks that already exist on disk at startup.
 189          let sync_state = SyncState::new_with_height(ledger.latest_block_height());
 190  
 191          Self {
 192              ledger,
 193              sync_state: RwLock::new(sync_state),
 194              peer_notify: Default::default(),
 195              response_notify: Default::default(),
 196              locators: Default::default(),
 197              requests: Default::default(),
 198              common_ancestors: Default::default(),
 199              advance_with_sync_blocks_lock: Default::default(),
 200              metrics: Default::default(),
 201          }
 202      }
 203  
 204      /// Blocks until something about a peer changes,
 205      /// or block request has been fully processed (either successfully or unsuccessfully).
 206      ///
 207      /// Used by the outgoing task.
 208      pub async fn wait_for_peer_update(&self) {
 209          self.peer_notify.notified().await
 210      }
 211  
 212      /// Blocks until there is a new response to a block request.
 213      ///
 214      /// Used by the incoming task.
 215      pub async fn wait_for_block_responses(&self) {
 216          self.response_notify.notified().await
 217      }
 218  
 219      /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
 220      #[inline]
 221      pub fn is_block_synced(&self) -> bool {
 222          self.sync_state.read().is_block_synced()
 223      }
 224  
 225      /// Returns `true` if there a blocks to fetch or responses to process.
 226      ///
 227      /// This will always return true if [`Self::is_block_synced`] returns false,
 228      /// but it can return true when [`Self::is_block_synced`] returns true
 229      /// (due to the latter having a tolerance of one block).
 230      #[inline]
 231      pub fn can_block_sync(&self) -> bool {
 232          self.sync_state.read().can_block_sync() || self.has_pending_responses()
 233      }
 234  
 235      /// Returns the number of blocks the node is behind the greatest peer height,
 236      /// or `None` if no peers are connected yet.
 237      #[inline]
 238      pub fn num_blocks_behind(&self) -> Option<u32> {
 239          self.sync_state.read().num_blocks_behind()
 240      }
 241  
 242      /// Returns the greatest block height of any connected peer.
 243      #[inline]
 244      pub fn greatest_peer_block_height(&self) -> Option<u32> {
 245          self.sync_state.read().get_greatest_peer_height()
 246      }
 247  
 248      /// Returns the current sync height of this node.
 249      /// The sync height is always greater or equal to the ledger height.
 250      #[inline]
 251      pub fn get_sync_height(&self) -> u32 {
 252          self.sync_state.read().get_sync_height()
 253      }
 254  
 255      /// Returns the number of blocks we requested from peers, but have not received yet.
 256      #[inline]
 257      pub fn num_outstanding_block_requests(&self) -> usize {
 258          self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
 259      }
 260  
 261      /// The total number of block request, including the ones that have been answered already but not processed yet.
 262      #[inline]
 263      pub fn num_total_block_requests(&self) -> usize {
 264          self.requests.read().len()
 265      }
 266  
 267      //// Returns the latest locator height for all known peers.
 268      pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
 269          self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
 270      }
 271  
 272      //// Returns information about all in-flight block requests.
 273      pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
 274          self.requests
 275              .read()
 276              .iter()
 277              .map(|(height, request)| {
 278                  (*height, BlockRequestInfo {
 279                      done: request.sync_ips().is_empty(),
 280                      elapsed: request.timestamp.elapsed().as_secs(),
 281                  })
 282              })
 283              .collect()
 284      }
 285  
 286      /// Returns a summary of all in-flight requests.
 287      pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
 288          let completed = self
 289              .requests
 290              .read()
 291              .iter()
 292              .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
 293              .collect::<Vec<_>>();
 294  
 295          let outstanding = self
 296              .requests
 297              .read()
 298              .iter()
 299              .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
 300              .collect::<Vec<_>>();
 301  
 302          BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
 303      }
 304  
 305      pub fn get_sync_speed(&self) -> f64 {
 306          self.metrics.get_sync_speed()
 307      }
 308  }
 309  
 310  // Helper functions needed for testing
 311  #[cfg(test)]
 312  impl<N: Network> BlockSync<N> {
 313      /// Returns the latest block height of the given peer IP.
 314      fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
 315          self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
 316      }
 317  
 318      /// Returns the common ancestor for the given peer pair, if it exists.
 319      fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
 320          self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
 321      }
 322  
 323      /// Returns the block request for the given height, if it exists.
 324      fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
 325          self.requests.read().get(&height).map(|e| e.request.clone())
 326      }
 327  
 328      /// Returns the timestamp of the last time the block was requested, if it exists.
 329      fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
 330          self.requests.read().get(&height).map(|e| e.timestamp)
 331      }
 332  }
 333  
 334  impl<N: Network> BlockSync<N> {
 335      /// Returns the block locators.
 336      #[inline]
 337      pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
 338          // Retrieve the latest block height.
 339          let latest_height = self.ledger.latest_block_height();
 340  
 341          // Initialize the recents map.
 342          // TODO: generalize this for RECENT_INTERVAL > 1, or remove this comment if we hardwire that to 1
 343          let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
 344          // Retrieve the recent block hashes.
 345          for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
 346              recents.insert(height, self.ledger.get_block_hash(height)?);
 347          }
 348  
 349          // Initialize the checkpoints map.
 350          let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
 351          // Retrieve the checkpoint block hashes.
 352          for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
 353              checkpoints.insert(height, self.ledger.get_block_hash(height)?);
 354          }
 355  
 356          // Construct the block locators.
 357          BlockLocators::new(recents, checkpoints)
 358      }
 359  
 360      /// Returns true if there are pending responses to block requests that need to be processed.
 361      pub fn has_pending_responses(&self) -> bool {
 362          self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
 363      }
 364  
 365      /// Send a batch of block requests.
 366      pub async fn send_block_requests<C: CommunicationService>(
 367          &self,
 368          communication: &C,
 369          sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
 370          requests: &[(u32, PrepareSyncRequest<N>)],
 371      ) -> bool {
 372          let (start_height, max_num_sync_ips) = match requests.first() {
 373              Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
 374              None => {
 375                  warn!("Block sync failed - no block requests");
 376                  return false;
 377              }
 378          };
 379  
 380          debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys());
 381  
 382          // Use a randomly sampled subset of the sync IPs.
 383          let sync_ips: IndexSet<_> =
 384              sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
 385  
 386          // Calculate the end height.
 387          let end_height = start_height.saturating_add(requests.len() as u32);
 388  
 389          // Insert the chunk of block requests.
 390          for (height, (hash, previous_hash, _)) in requests.iter() {
 391              // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk.
 392              if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
 393                  warn!("Block sync failed - {error}");
 394                  return false;
 395              }
 396          }
 397  
 398          /* Send the block request to the peers */
 399  
 400          // Construct the message.
 401          let message = C::prepare_block_request(start_height, end_height);
 402  
 403          // Send the message to the peers.
 404          let mut tasks = Vec::with_capacity(sync_ips.len());
 405          for sync_ip in sync_ips {
 406              let sender = communication.send(sync_ip, message.clone()).await;
 407              let task = tokio::spawn(async move {
 408                  // Ensure the request is sent successfully.
 409                  match sender {
 410                      Some(sender) => {
 411                          if let Err(err) = sender.await {
 412                              warn!("Failed to send block request to peer '{sync_ip}': {err}");
 413                              false
 414                          } else {
 415                              true
 416                          }
 417                      }
 418                      None => {
 419                          warn!("Failed to send block request to peer '{sync_ip}': no such peer");
 420                          false
 421                      }
 422                  }
 423              });
 424  
 425              tasks.push(task);
 426          }
 427  
 428          // Wait for all sends to finish at the same time.
 429          for result in futures::future::join_all(tasks).await {
 430              let success = match result {
 431                  Ok(success) => success,
 432                  Err(err) => {
 433                      error!("tokio join error: {err}");
 434                      false
 435                  }
 436              };
 437  
 438              // If sending fails for any peer, remove the block request from the sync pool.
 439              if !success {
 440                  // Remove the entire block request from the sync pool.
 441                  let mut requests = self.requests.write();
 442                  for height in start_height..end_height {
 443                      requests.remove(&height);
 444                  }
 445                  // Break out of the loop.
 446                  return false;
 447              }
 448          }
 449          true
 450      }
 451  
 452      /// Inserts a new block response from the given peer IP.
 453      ///
 454      /// Returns an error if the block was malformed, or we already received a different block for this height.
 455      /// This function also removes all block requests from the given peer IP on failure.
 456      ///
 457      /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`.
 458      ///
 459      #[inline]
 460      pub fn insert_block_responses(
 461          &self,
 462          peer_ip: SocketAddr,
 463          blocks: Vec<Block<N>>,
 464          latest_consensus_version: Option<ConsensusVersion>,
 465      ) -> Result<()> {
 466          let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
 467              bail!("Empty block response");
 468          };
 469  
 470          let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?;
 471  
 472          // Perform consensus version check, if possible.
 473          // This check is only enabled after nodes have reached V12.
 474          if expected_consensus_version >= ConsensusVersion::V12 {
 475              if let Some(latest_consensus_version) = latest_consensus_version {
 476                  ensure_equals!(
 477                      expected_consensus_version,
 478                      latest_consensus_version,
 479                      "the peer's consensus version for height {last_height} does not match ours"
 480                  );
 481              } else {
 482                  bail!("The peer did not send a consensus version");
 483              }
 484          }
 485  
 486          // Insert the candidate blocks into the sync pool.
 487          for block in blocks {
 488              if let Err(error) = self.insert_block_response(peer_ip, block) {
 489                  self.remove_block_requests_to_peer(&peer_ip);
 490                  bail!("{error}");
 491              }
 492          }
 493          Ok(())
 494      }
 495  
 496      /// Returns the next block for the given `next_height` if the request is complete,
 497      /// or `None` otherwise. This does not remove the block from the `responses` map.
 498      #[inline]
 499      pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
 500          // Determine if the request is complete:
 501          // either there is no request for `next_height`, or the request has no peer socket addresses left.
 502          if let Some(entry) = self.requests.read().get(&next_height) {
 503              let is_complete = entry.sync_ips().is_empty();
 504              if !is_complete {
 505                  return None;
 506              }
 507  
 508              // If the request is complete, return the block from the responses, if there is one.
 509              if entry.response.is_none() {
 510                  warn!("Request for height {next_height} is complete but no response exists");
 511              }
 512              entry.response.clone()
 513          } else {
 514              None
 515          }
 516      }
 517  
 518      /// Attempts to advance synchronization by processing completed block responses.
 519      ///
 520      /// Returns true, if new blocks were added to the ledger.
 521      ///
 522      /// # Usage
 523      /// This is only called in [`Client::try_block_sync`] and should not be called concurrently by multiple tasks.
 524      /// Validators do not call this function, and instead invoke
 525      /// [`alphaos_node_bft::Sync::try_advancing_block_synchronization`] which also updates the BFT state.
 526      #[inline]
 527      pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
 528          // Acquire the lock to ensure this function is called only once at a time.
 529          // If the lock is already acquired, return early.
 530          //
 531          // Note: This lock should not be needed anymore as there is only one place we call it from,
 532          // but we keep it for now out of caution.
 533          // TODO(kaimast): remove this eventually.
 534          let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
 535              trace!("Skipping attempt to advance block synchronziation as it is already in progress");
 536              return Ok(false);
 537          };
 538  
 539          // Start with the current height.
 540          let mut current_height = self.ledger.latest_block_height();
 541          let start_height = current_height;
 542          trace!(
 543              "Try advancing with block responses (at block {current_height}, current sync speed is {})",
 544              self.get_sync_speed()
 545          );
 546  
 547          loop {
 548              let next_height = current_height + 1;
 549  
 550              let Some(block) = self.peek_next_block(next_height) else {
 551                  break;
 552              };
 553  
 554              // Ensure the block height matches.
 555              if block.height() != next_height {
 556                  warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
 557                  break;
 558              }
 559  
 560              let ledger = self.ledger.clone();
 561              let advanced = tokio::task::spawn_blocking(move || {
 562                  // Try to check the next block and advance to it.
 563                  match ledger.check_next_block(&block) {
 564                      Ok(_) => match ledger.advance_to_next_block(&block) {
 565                          Ok(_) => true,
 566                          Err(err) => {
 567                              warn!(
 568                                  "Failed to advance to next block (height: {}, hash: '{}'): {err}",
 569                                  block.height(),
 570                                  block.hash()
 571                              );
 572                              false
 573                          }
 574                      },
 575                      Err(err) => {
 576                          warn!(
 577                              "The next block (height: {}, hash: '{}') is invalid - {err}",
 578                              block.height(),
 579                              block.hash()
 580                          );
 581                          false
 582                      }
 583                  }
 584              })
 585              .await?;
 586  
 587              // Only count successful requests.
 588              if advanced {
 589                  self.count_request_completed();
 590              }
 591  
 592              // Remove the block response.
 593              self.remove_block_response(next_height);
 594  
 595              // If advancing failed, exit the loop.
 596              if !advanced {
 597                  break;
 598              }
 599  
 600              // Update the latest height.
 601              current_height = next_height;
 602          }
 603  
 604          if current_height > start_height {
 605              self.set_sync_height(current_height);
 606              Ok(true)
 607          } else {
 608              Ok(false)
 609          }
 610      }
 611  }
 612  
 613  impl<N: Network> BlockSync<N> {
 614      /// Returns the sync peers with their latest heights, and their minimum common ancestor, if the node can sync.
 615      /// This function returns peers that are consistent with each other, and have a block height
 616      /// that is greater than the ledger height of this node.
 617      ///
 618      /// # Locking
 619      /// This will read-lock `common_ancestors` and `sync_state`, but not at the same time.
 620      pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
 621          // Retrieve the current sync height.
 622          let current_height = self.get_sync_height();
 623  
 624          if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
 625              // Map the locators into the latest height.
 626              let sync_peers =
 627                  sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
 628              // Return the sync peers and their minimum common ancestor.
 629              Some((sync_peers, min_common_ancestor))
 630          } else {
 631              None
 632          }
 633      }
 634  
 635      /// Updates the block locators and common ancestors for the given peer IP.
 636      ///
 637      /// This function does not need to check that the block locators are well-formed,
 638      /// because that is already done in [`BlockLocators::new()`], as noted in [`BlockLocators`].
 639      ///
 640      /// This function does **not** check
 641      /// that the block locators are consistent with the peer's previous block locators or other peers' block locators.
 642      pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
 643          // Update the locators entry for the given peer IP.
 644          // We perform this update atomically, and drop the lock as soon as we are done with the update.
 645          match self.locators.write().entry(peer_ip) {
 646              hash_map::Entry::Occupied(mut e) => {
 647                  // Return early if the block locators did not change.
 648                  if e.get() == locators {
 649                      return Ok(());
 650                  }
 651  
 652                  let old_height = e.get().latest_locator_height();
 653                  let new_height = locators.latest_locator_height();
 654  
 655                  if old_height > new_height {
 656                      debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
 657                  }
 658                  e.insert(locators.clone());
 659              }
 660              hash_map::Entry::Vacant(e) => {
 661                  e.insert(locators.clone());
 662              }
 663          }
 664  
 665          // Compute the common ancestor with this node.
 666          let new_local_ancestor = {
 667              let mut ancestor = 0;
 668              // Attention: Please do not optimize this loop, as it performs fork-detection. In addition,
 669              // by iterating upwards, it also early-terminates malicious block locators at the *first* point
 670              // of bifurcation in their ledger history, which is a critical safety guarantee provided here.
 671              for (height, hash) in locators.clone().into_iter() {
 672                  if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
 673                      match ledger_hash == hash {
 674                          true => ancestor = height,
 675                          false => {
 676                              debug!("Detected fork with peer \"{peer_ip}\" at height {height}");
 677                              break;
 678                          }
 679                      }
 680                  }
 681              }
 682              ancestor
 683          };
 684  
 685          // Compute the common ancestor with every other peer.
 686          // Do not hold write lock to `common_ancestors` here, because this can take a while with many peers.
 687          let ancestor_updates: Vec<_> = self
 688              .locators
 689              .read()
 690              .iter()
 691              .filter_map(|(other_ip, other_locators)| {
 692                  // Skip if the other peer is the given peer.
 693                  if other_ip == &peer_ip {
 694                      return None;
 695                  }
 696                  // Compute the common ancestor with the other peer.
 697                  let mut ancestor = 0;
 698                  for (height, hash) in other_locators.clone().into_iter() {
 699                      if let Some(expected_hash) = locators.get_hash(height) {
 700                          match expected_hash == hash {
 701                              true => ancestor = height,
 702                              false => {
 703                                  debug!(
 704                                      "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
 705                                  );
 706                                  break;
 707                              }
 708                          }
 709                      }
 710                  }
 711  
 712                  Some((PeerPair(peer_ip, *other_ip), ancestor))
 713              })
 714              .collect();
 715  
 716          // Update the map of common ancestors.
 717          // Scope the lock, so it is dropped before locking `sync_state`.
 718          {
 719              let mut common_ancestors = self.common_ancestors.write();
 720              common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
 721  
 722              for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
 723                  common_ancestors.insert(peer_pair, new_ancestor);
 724              }
 725          }
 726  
 727          // Update sync state, because the greatest peer height may have decreased.
 728          if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
 729              self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
 730          } else {
 731              error!("Got new block locators but greatest peer height is zero.");
 732          }
 733  
 734          // Notify the sync loop that something changed.
 735          self.peer_notify.notify_one();
 736  
 737          Ok(())
 738      }
 739  
 740      /// TODO (howardwu): Remove the `common_ancestor` entry. But check that this is safe
 741      ///  (that we don't rely upon it for safety when we re-connect with the same peer).
 742      /// Removes the peer from the sync pool, if they exist.
 743      pub fn remove_peer(&self, peer_ip: &SocketAddr) {
 744          trace!("Removing peer {peer_ip} from block sync");
 745  
 746          // Remove the locators entry for the given peer IP.
 747          self.locators.write().remove(peer_ip);
 748          // Remove all common ancestor entries for this peers.
 749          self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
 750          // Remove all block requests to the peer.
 751          self.remove_block_requests_to_peer(peer_ip);
 752  
 753          // Update sync state, because the greatest peer height may have decreased.
 754          if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
 755              self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
 756          } else {
 757              // There are no more peers left.
 758              self.sync_state.write().clear_greatest_peer_height();
 759          }
 760  
 761          // Notify the sync loop that something changed.
 762          self.peer_notify.notify_one();
 763      }
 764  }
 765  
 766  // Helper type for prepare_block_requests
 767  pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
 768  
 769  impl<N: Network> BlockSync<N> {
 770      /// Returns a list of block requests and the sync peers, if the node needs to sync.
 771      ///
 772      /// You usually want to call `remove_timed_out_block_requests` before invoking this function.
 773      ///
 774      /// # Concurrency
 775      /// This should be called by at most one task at a time.
 776      ///
 777      /// # Usage
 778      ///  - For validators, the primary spawns exactly one task that periodically calls
 779      ///    `bft::Sync::try_issuing_block_requests`. There is no possibility of concurrent calls to it.
 780      ///  - For clients, `Client::initialize_sync` spawn exactly one task that periodically calls
 781      ///    `Client::try_issuing_block_requests` which calls this function.
 782      ///  - Provers do not call this function.
 783      pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
 784          // Used to print more information when we max out on requests.
 785          let print_requests = || {
 786              if tracing::enabled!(tracing::Level::TRACE) {
 787                  let summary = self.get_block_requests_summary();
 788  
 789                  trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
 790                  trace!("The following requests are still outstanding: {:?}", summary.outstanding);
 791              }
 792          };
 793  
 794          // Do not hold lock here as, currently, `find_sync_peers_inner` can take a while.
 795          let current_height = self.get_sync_height();
 796  
 797          // Ensure to not exceed the maximum number of outstanding block requests.
 798          let max_outstanding_block_requests =
 799              (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
 800  
 801          // Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet.
 802          let max_total_requests = 4 * max_outstanding_block_requests;
 803  
 804          let max_new_blocks_to_request =
 805              max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
 806  
 807          // Prepare the block requests and sync peers, or returns an empty result if there is nothing to request.
 808          if self.num_total_block_requests() >= max_total_requests as usize {
 809              trace!(
 810                  "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
 811              );
 812  
 813              print_requests();
 814              Default::default()
 815          } else if max_new_blocks_to_request == 0 {
 816              trace!(
 817                  "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
 818              );
 819  
 820              print_requests();
 821              Default::default()
 822          } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
 823              // Retrieve the greatest block height of any connected peer.
 824              // We do not need to update the sync state here, as that already happens when the block locators are received.
 825              let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
 826  
 827              // Construct the list of block requests.
 828              let requests = self.construct_requests(
 829                  &sync_peers,
 830                  current_height,
 831                  min_common_ancestor,
 832                  max_new_blocks_to_request,
 833                  greatest_peer_height,
 834              );
 835  
 836              (requests, sync_peers)
 837          } else if self.requests.read().is_empty() {
 838              // This can happen during a race condition where the node just finished syncing.
 839              // It does not make sense to log or change the sync status here.
 840              // Checking the sync status here also does not make sense, as the node might as well have switched back
 841              //  from `synced` to `syncing` between calling `find_sync_peers_inner` and this line.
 842  
 843              Default::default()
 844          } else {
 845              // This happens if we already requested all advertised blocks.
 846              trace!("No new blocks can be requested, but there are still outstanding requests.");
 847  
 848              print_requests();
 849              Default::default()
 850          }
 851      }
 852  
 853      /// Should only be called by validators when they successfully process a block request.
 854      /// (for other nodes this will be automatically called internally)
 855      ///
 856      /// TODO(kaimast): remove this public function once the sync logic is fully unified `BlockSync`.
 857      pub fn count_request_completed(&self) {
 858          self.metrics.count_request_completed();
 859      }
 860  
 861      /// Set the sync height to a the given value.
 862      /// This is a no-op if `new_height` is equal or less to the current sync height.
 863      pub fn set_sync_height(&self, new_height: u32) {
 864          // Scope state lock to avoid locking state and metrics at the same time.
 865          let fully_synced = {
 866              let mut state = self.sync_state.write();
 867              state.set_sync_height(new_height);
 868              !state.can_block_sync()
 869          };
 870  
 871          if fully_synced {
 872              self.metrics.mark_fully_synced();
 873          }
 874      }
 875  
 876      /// Inserts a block request for the given height.
 877      fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
 878          // Ensure the block request does not already exist.
 879          self.check_block_request(height)?;
 880          // Ensure the sync IPs are not empty.
 881          ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
 882          // Insert the block request.
 883          self.requests.write().insert(height, OutstandingRequest {
 884              request: (hash, previous_hash, sync_ips),
 885              timestamp: Instant::now(),
 886              response: None,
 887          });
 888          Ok(())
 889      }
 890  
 891      /// Inserts the given block response, after checking that the request exists and the response is well-formed.
 892      /// On success, this function removes the peer IP from the request sync peers and inserts the response.
 893      fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
 894          // Retrieve the block height.
 895          let height = block.height();
 896          let mut requests = self.requests.write();
 897  
 898          if self.ledger.contains_block_height(height) {
 899              bail!("The sync request was removed because we already advanced");
 900          }
 901  
 902          let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
 903  
 904          // Retrieve the request entry for the candidate block.
 905          let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
 906  
 907          // Ensure the candidate block hash matches the expected hash.
 908          if let Some(expected_hash) = expected_hash {
 909              if block.hash() != *expected_hash {
 910                  bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
 911              }
 912          }
 913          // Ensure the previous block hash matches if it exists.
 914          if let Some(expected_previous_hash) = expected_previous_hash {
 915              if block.previous_hash() != *expected_previous_hash {
 916                  bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
 917              }
 918          }
 919          // Ensure the sync pool requested this block from the given peer.
 920          if !sync_ips.contains(&peer_ip) {
 921              bail!("The sync pool did not request block {height} from '{peer_ip}'")
 922          }
 923  
 924          // Remove the peer IP from the request entry.
 925          entry.sync_ips_mut().swap_remove(&peer_ip);
 926  
 927          if let Some(existing_block) = &entry.response {
 928              // If the candidate block was already present, ensure it is the same block.
 929              if block != *existing_block {
 930                  bail!("Candidate block {height} from '{peer_ip}' is malformed");
 931              }
 932          } else {
 933              entry.response = Some(block.clone());
 934          }
 935  
 936          trace!("Received a new and valid block response for height {height}");
 937  
 938          // Notify the sync loop that something changed.
 939          self.response_notify.notify_one();
 940  
 941          Ok(())
 942      }
 943  
 944      /// Checks that a block request for the given height does not already exist.
 945      fn check_block_request(&self, height: u32) -> Result<()> {
 946          // Ensure the block height is not already in the ledger.
 947          if self.ledger.contains_block_height(height) {
 948              bail!("Failed to add block request, as block {height} exists in the ledger");
 949          }
 950          // Ensure the block height is not already requested.
 951          if self.requests.read().contains_key(&height) {
 952              bail!("Failed to add block request, as block {height} exists in the requests map");
 953          }
 954  
 955          Ok(())
 956      }
 957  
 958      /// Removes the block request and response for the given height
 959      /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete.
 960      ///
 961      /// Precondition: This may only be called after `peek_next_block` has returned `Some`,
 962      /// which has checked if the request for the given height is complete
 963      /// and there is a block with the given `height` in the `responses` map.
 964      pub fn remove_block_response(&self, height: u32) {
 965          // Remove the request entry for the given height.
 966          if let Some(e) = self.requests.write().remove(&height) {
 967              trace!(
 968                  "Block request for height {height} was completed in {}ms (sync speed is {})",
 969                  e.timestamp.elapsed().as_millis(),
 970                  self.get_sync_speed()
 971              );
 972  
 973              // Notify the sending task that less requests are in-flight.
 974              self.peer_notify.notify_one();
 975          }
 976      }
 977  
 978      /// Removes all block requests for the given peer IP.
 979      ///
 980      /// This is used when disconnecting from a peer or when a peer sends invalid block responses.
 981      fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
 982          trace!("Block sync is removing all block requests to peer {peer_ip}...");
 983  
 984          // Remove the peer IP from the requests map. If any request entry is now empty,
 985          // and its corresponding response entry is also empty, then remove that request entry altogether.
 986          self.requests.write().retain(|height, e| {
 987              let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
 988  
 989              // Only remove requests that were sent to this peer, that have no other peer that can respond instead,
 990              // and that were not completed yet.
 991              let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
 992              if !retain {
 993                  trace!("Removed block request timestamp for {peer_ip} at height {height}");
 994              }
 995              retain
 996          });
 997  
 998          // No need to remove responses here, because requests with responses will be retained.
 999      }
1000  
1001      /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time.
1002      ///
1003      /// This removes the corresponding block responses and returns the set of peers/addresses that timed out.
1004      /// It will ask the peer pool handling service to ban any timed-out peers.
1005      ///
1006      /// # Return Value
1007      /// On success it will return `None` if there is nothing to re-request, or a set of new of block requests that replaced the timed-out requests.
1008      /// This set of new requests can also replace requests that timed out earlier, and which we were not able to re-request yet.
1009      ///
1010      /// This function will return an error if it cannot re-request blocks due to a lack of peers.
1011      /// In this case, the current iteration of block synchronization should not continue and the node should re-try later instead.
1012      pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
1013          &self,
1014          _peer_pool_handler: &P,
1015      ) -> Result<Option<BlockRequestBatch<N>>> {
1016          // Acquire the write lock on the requests map.
1017          let mut requests = self.requests.write();
1018  
1019          // Retrieve the current time.
1020          let now = Instant::now();
1021  
1022          // Retrieve the current block height
1023          let current_height = self.ledger.latest_block_height();
1024  
1025          // Track the number of timed out block requests (only used to print a log message).
1026          let mut timed_out_requests = vec![];
1027  
1028          // Track which peers should be banned due to unresponsiveness.
1029          let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1030  
1031          // Remove timed out block requests.
1032          requests.retain(|height, e| {
1033              let is_obsolete = *height <= current_height;
1034              // Determine if the request is incomplete.
1035              let is_complete = e.sync_ips().is_empty();
1036              // Determine if timeouts have been exceeded.
1037              let peer_timeout_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1038              let processing_timeout_elapsed = now.duration_since(e.timestamp) > BLOCK_PROCESSING_TIMEOUT;
1039  
1040              // Determine if the request has timed out.
1041              // Timeout if:
1042              // 1. Waiting for peers AND peer timeout exceeded, OR
1043              // 2. All responses received AND processing timeout exceeded
1044              let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed);
1045  
1046              // Retain if this is not a timeout and is not obsolete.
1047              let retain = !is_timeout && !is_obsolete;
1048  
1049              if is_timeout {
1050                  if is_complete {
1051                      warn!(
1052                          "Block {} processing timeout: unprocessed for {}s (all peers responded)",
1053                          height,
1054                          e.timestamp.elapsed().as_secs()
1055                      );
1056                  } else {
1057                      warn!(
1058                          "Block {} peer timeout: waiting on {:?} after {}s",
1059                          height,
1060                          e.sync_ips(),
1061                          e.timestamp.elapsed().as_secs()
1062                      );
1063                  }
1064  
1065                  // Increment the number of timed out block requests.
1066                  timed_out_requests.push(*height);
1067              } else if is_obsolete {
1068                  trace!("Block request at height {height} became obsolete (current_height={current_height})");
1069              }
1070  
1071              // If the request timed out, also remove and ban given peer.
1072              if is_timeout {
1073                  for peer_ip in e.sync_ips().iter() {
1074                      peers_to_ban.insert(*peer_ip);
1075                  }
1076              }
1077  
1078              retain
1079          });
1080  
1081          if !timed_out_requests.is_empty() {
1082              debug!("{num} block requests timed out", num = timed_out_requests.len());
1083          }
1084  
1085          let next_request_height = requests.iter().next().map(|(h, _)| *h);
1086  
1087          // Avoid locking `locators` and `requests` at the same time.
1088          drop(requests);
1089  
1090          // Now remove and ban any unresponsive peers
1091          for peer_ip in peers_to_ban {
1092              self.remove_peer(&peer_ip);
1093              // TODO: Uncomment this when we have a more rigorous analysis and testing of peer banning.
1094              // peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1095          }
1096  
1097          // Determine if we need to re-issue any timed-out requests.
1098          // If there are no requests remaining or no gap at the beginning,
1099          // we do not need to re-issue requests and will just issue them regularly.
1100          //
1101          // This needs to be checked even if timed_out_requests is empty, because we might not be able to re-issue
1102          // requests immediately if there are no other peers at a given time.
1103          // Further, this only closes the first gap. So multiple calls to this might be needed.
1104          let sync_height = self.get_sync_height();
1105          let start_height = sync_height + 1;
1106  
1107          let end_height = if let Some(next_height) = next_request_height {
1108              if next_height > start_height {
1109                  // The end height is exclusive, so use the height of the first existing block requests as the end
1110                  next_height
1111              } else {
1112                  // Nothing to do.
1113                  // Do not log here as this check happens frequently.
1114                  return Ok(None);
1115              }
1116          } else {
1117              // Nothing to do.
1118              // Do not log here as this check happens frequently.
1119              return Ok(None);
1120          };
1121  
1122          // Set the maximum number of blocks, so that they do not exceed the end height.
1123          let max_new_blocks_to_request = end_height - start_height;
1124  
1125          let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1126              // This generally shouldn't happen, because there cannot be outstanding requests when no peers are connected.
1127              bail!("Cannot re-request blocks because no or not enough peers are connected");
1128          };
1129  
1130          // Retrieve the greatest block height of any connected peer.
1131          let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1132              // This should never happen because `sync_peers` is guaranteed to be non-empty.
1133              bail!("Cannot re-request blocks because no or not enough peers are connected");
1134          };
1135  
1136          // (Try to) construct the requests.
1137          let requests = self.construct_requests(
1138              &sync_peers,
1139              sync_height,
1140              min_common_ancestor,
1141              max_new_blocks_to_request,
1142              greatest_peer_height,
1143          );
1144  
1145          // If the ledger advanced concurrenctly, there may be no requests to issue after all.
1146          // The given height may also be greater `start_height` due to concurerent block advancement.
1147          if let Some((height, _)) = requests.as_slice().first() {
1148              debug!("Re-requesting blocks starting at height {height}");
1149              Ok(Some((requests, sync_peers)))
1150          } else {
1151              // Do not log here as this constitutes a benign race condition.
1152              Ok(None)
1153          }
1154      }
1155  
1156      /// Finds the peers to sync from and the shared common ancestor, starting at the give height.
1157      ///
1158      /// Unlike [`Self::find_sync_peers`] this does not only return the latest locators height, but the full BlockLocators for each peer.
1159      /// Returns `None` if there are no peers to sync from.
1160      ///
1161      /// # Locking
1162      /// This function will read-lock `common_ancestors`.
1163      fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1164          // Retrieve the latest ledger height.
1165          let latest_ledger_height = self.ledger.latest_block_height();
1166  
1167          // Pick a set of peers above the latest ledger height, and include their locators.
1168          // This will sort the peers by locator height in descending order.
1169          let candidate_locators: IndexMap<_, _> = self
1170              .locators
1171              .read()
1172              .iter()
1173              .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1174              .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1175              .take(NUM_SYNC_CANDIDATE_PEERS)
1176              .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1177              .collect();
1178  
1179          // Case 0: If there are no candidate peers, return `None`.
1180          if candidate_locators.is_empty() {
1181              trace!("Found no sync peers with height greater {current_height}");
1182              return None;
1183          }
1184  
1185          // TODO (howardwu): Change this to the highest cumulative weight for Phase 3.
1186          // Case 1: If all of the candidate peers share a common ancestor below the latest ledger height,
1187          // then pick the peer with the highest height, and find peers (up to extra redundancy) with
1188          // a common ancestor above the block request range. Set the end height to their common ancestor.
1189  
1190          // Determine the threshold number of peers to sync from.
1191          let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1192  
1193          // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height
1194          // and a cohort of peers who share a common ancestor above this node's latest ledger height.
1195          for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1196              // The height of the common ancestor shared by all selected peers.
1197              let mut min_common_ancestor = peer_locators.latest_locator_height();
1198  
1199              // The peers we will synchronize from.
1200              // As the previous iteration did not succeed, restart with the next candidate peers.
1201              let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1202  
1203              // Try adding other peers consistent with this one to the sync peer set.
1204              for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1205                  // Check if these two peers have a common ancestor above the latest ledger height.
1206                  if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1207                      // If so, then check that their block locators are consistent.
1208                      if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1209                          // If their common ancestor is less than the minimum common ancestor, then update it.
1210                          min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1211  
1212                          // Add the other peer to the list of sync peers.
1213                          sync_peers.push((*other_ip, other_locators.clone()));
1214                      }
1215                  }
1216              }
1217  
1218              // If we have enough sync peers above the latest ledger height, finish and return them.
1219              if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1220                  // Shuffle the sync peers prior to returning. This ensures the rest of the stack
1221                  // does not rely on the order of the sync peers, and that the sync peers are not biased.
1222                  sync_peers.shuffle(&mut rand::thread_rng());
1223  
1224                  // Collect into an IndexMap and return.
1225                  return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1226              }
1227          }
1228  
1229          // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None.
1230          None
1231      }
1232  
1233      /// Given the sync peers and their minimum common ancestor, return a list of block requests.
1234      fn construct_requests(
1235          &self,
1236          sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1237          sync_height: u32,
1238          min_common_ancestor: u32,
1239          max_blocks_to_request: u32,
1240          greatest_peer_height: u32,
1241      ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1242          // Compute the start height for the block requests.
1243          let start_height = {
1244              let requests = self.requests.read();
1245              let ledger_height = self.ledger.latest_block_height();
1246  
1247              // Do not issue requests for blocks already contained in the ledger.
1248              let mut start_height = ledger_height.max(sync_height + 1);
1249  
1250              // Do not issue requests that already exist.
1251              while requests.contains_key(&start_height) {
1252                  start_height += 1;
1253              }
1254  
1255              start_height
1256          };
1257  
1258          // If the minimum common ancestor is below the start height, then return early.
1259          if min_common_ancestor < start_height {
1260              if start_height < greatest_peer_height {
1261                  trace!(
1262                      "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1263                  );
1264              }
1265              return Default::default();
1266          }
1267  
1268          // Compute the end height for the block request.
1269          let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1270  
1271          // Construct the block hashes to request.
1272          let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1273          // Track the largest number of sync IPs required for any block request in the sequence of requests.
1274          let mut max_num_sync_ips = 1;
1275  
1276          for height in start_height..end_height {
1277              // Ensure the current height is not in the ledger or already requested.
1278              if let Err(err) = self.check_block_request(height) {
1279                  trace!("{err}");
1280  
1281                  // If the sequence of block requests is interrupted, then return early.
1282                  // Otherwise, continue until the first start height that is new.
1283                  match request_hashes.is_empty() {
1284                      true => continue,
1285                      false => break,
1286                  }
1287              }
1288  
1289              // Construct the block request.
1290              let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1291  
1292              // Handle the dishonest case.
1293              if !is_honest {
1294                  // TODO (howardwu): Consider performing an integrity check on peers (to disconnect).
1295                  warn!("Detected dishonest peer(s) when preparing block request");
1296                  // If there are not enough peers in the dishonest case, then return early.
1297                  if sync_peers.len() < num_sync_ips {
1298                      break;
1299                  }
1300              }
1301  
1302              // Update the maximum number of sync IPs.
1303              max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1304  
1305              // Append the request.
1306              request_hashes.insert(height, (hash, previous_hash));
1307          }
1308  
1309          // Construct the requests with the same sync ips.
1310          request_hashes
1311              .into_iter()
1312              .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1313              .collect()
1314      }
1315  }
1316  
1317  /// If any peer is detected to be dishonest in this function, it will not set the hash or previous hash,
1318  /// in order to allow the caller to determine what to do.
1319  fn construct_request<N: Network>(
1320      height: u32,
1321      sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1322  ) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1323      let mut hash = None;
1324      let mut hash_redundancy: usize = 0;
1325      let mut previous_hash = None;
1326      let mut is_honest = true;
1327  
1328      for peer_locators in sync_peers.values() {
1329          if let Some(candidate_hash) = peer_locators.get_hash(height) {
1330              match hash {
1331                  // Increment the redundancy count if the hash matches.
1332                  Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1333                  // Some peer is dishonest.
1334                  Some(_) => {
1335                      hash = None;
1336                      hash_redundancy = 0;
1337                      previous_hash = None;
1338                      is_honest = false;
1339                      break;
1340                  }
1341                  // Set the hash if it is not set.
1342                  None => {
1343                      hash = Some(candidate_hash);
1344                      hash_redundancy = 1;
1345                  }
1346              }
1347          }
1348          if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1349              match previous_hash {
1350                  // Increment the redundancy count if the previous hash matches.
1351                  Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1352                  // Some peer is dishonest.
1353                  Some(_) => {
1354                      hash = None;
1355                      hash_redundancy = 0;
1356                      previous_hash = None;
1357                      is_honest = false;
1358                      break;
1359                  }
1360                  // Set the previous hash if it is not set.
1361                  None => previous_hash = Some(candidate_previous_hash),
1362              }
1363          }
1364      }
1365  
1366      // Note that we intentionally do not just pick the peers that have the hash we have chosen,
1367      // to give stronger confidence that we are syncing during times when the network is consistent/stable.
1368      let num_sync_ips = {
1369          // Extra redundant peers - as the block hash was dishonest.
1370          if !is_honest {
1371              // Choose up to the extra redundancy factor in sync peers.
1372              EXTRA_REDUNDANCY_FACTOR
1373          }
1374          // No redundant peers - as we have redundancy on the block hash.
1375          else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1376              // Choose one sync peer.
1377              1
1378          }
1379          // Redundant peers - as we do not have redundancy on the block hash.
1380          else {
1381              // Choose up to the redundancy factor in sync peers.
1382              REDUNDANCY_FACTOR
1383          }
1384      };
1385  
1386      (hash, previous_hash, num_sync_ips, is_honest)
1387  }
1388  
1389  #[cfg(test)]
1390  mod tests {
1391      use super::*;
1392      use crate::locators::{
1393          test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1394          CHECKPOINT_INTERVAL,
1395          NUM_RECENT_BLOCKS,
1396      };
1397  
1398      use alphaos_node_bft_ledger_service::MockLedgerService;
1399      use alphaos_node_network::{NodeType, Peer, Resolver};
1400      use alphaos_node_tcp::{Tcp, P2P};
1401      use alphavm::{
1402          ledger::committee::Committee,
1403          prelude::{Field, TestRng},
1404      };
1405  
1406      use indexmap::{indexset, IndexSet};
1407      #[cfg(feature = "locktick")]
1408      use locktick::parking_lot::RwLock;
1409      #[cfg(not(feature = "locktick"))]
1410      use parking_lot::RwLock;
1411      use rand::Rng;
1412      use std::net::{IpAddr, Ipv4Addr};
1413  
1414      type CurrentNetwork = alphavm::prelude::MainnetV0;
1415  
1416      #[derive(Default)]
1417      struct DummyPeerPoolHandler {
1418          peers_to_ban: RwLock<Vec<SocketAddr>>,
1419      }
1420  
1421      impl P2P for DummyPeerPoolHandler {
1422          fn tcp(&self) -> &Tcp {
1423              unreachable!();
1424          }
1425      }
1426  
1427      impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1428          const MAXIMUM_POOL_SIZE: usize = 10;
1429          const OWNER: &str = "[DummyPeerPoolHandler]";
1430          const PEER_SLASHING_COUNT: usize = 0;
1431  
1432          fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1433              unreachable!();
1434          }
1435  
1436          fn resolver(&self) -> &RwLock<Resolver<N>> {
1437              unreachable!();
1438          }
1439  
1440          fn is_dev(&self) -> bool {
1441              true
1442          }
1443  
1444          fn trusted_peers_only(&self) -> bool {
1445              false
1446          }
1447  
1448          fn node_type(&self) -> NodeType {
1449              NodeType::Client
1450          }
1451  
1452          fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1453              self.peers_to_ban.write().push(listener_addr);
1454          }
1455      }
1456  
1457      /// Returns the peer IP for the sync pool.
1458      fn sample_peer_ip(id: u16) -> SocketAddr {
1459          assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1460          SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1461      }
1462  
1463      /// Returns a sample committee.
1464      fn sample_committee() -> Committee<CurrentNetwork> {
1465          let rng = &mut TestRng::default();
1466          alphavm::ledger::committee::test_helpers::sample_committee(rng)
1467      }
1468  
1469      /// Returns the ledger service, initialized to the given height.
1470      fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1471          MockLedgerService::new_at_height(sample_committee(), height)
1472      }
1473  
1474      /// Returns the sync pool, with the ledger initialized to the given height.
1475      fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1476          BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1477      }
1478  
1479      /// Returns a vector of randomly sampled block heights in [0, max_height].
1480      ///
1481      /// The maximum value will always be included in the result.
1482      fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1483          assert!(num_values > 0, "Cannot generate an empty vector");
1484          assert!((max_height as usize) >= num_values);
1485  
1486          let mut rng = TestRng::default();
1487  
1488          let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1489  
1490          heights.push(max_height);
1491  
1492          heights
1493      }
1494  
1495      /// Returns a duplicate (deep copy) of the sync pool with a different ledger height.
1496      fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1497          BlockSync::<CurrentNetwork> {
1498              peer_notify: Notify::new(),
1499              response_notify: Default::default(),
1500              ledger: Arc::new(sample_ledger_service(height)),
1501              locators: RwLock::new(sync.locators.read().clone()),
1502              common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1503              requests: RwLock::new(sync.requests.read().clone()),
1504              sync_state: RwLock::new(sync.sync_state.read().clone()),
1505              advance_with_sync_blocks_lock: Default::default(),
1506              metrics: Default::default(),
1507          }
1508      }
1509  
1510      /// Checks that the sync pool (starting at genesis) returns the correct requests.
1511      fn check_prepare_block_requests(
1512          sync: BlockSync<CurrentNetwork>,
1513          min_common_ancestor: u32,
1514          peers: IndexSet<SocketAddr>,
1515      ) {
1516          let rng = &mut TestRng::default();
1517  
1518          // Check test assumptions are met.
1519          assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1520  
1521          // Determine the number of peers within range of this sync pool.
1522          let num_peers_within_recent_range_of_ledger = {
1523              // If no peers are within range, then set to 0.
1524              if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1525                  0
1526              }
1527              // Otherwise, manually check the number of peers within range.
1528              else {
1529                  peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1530              }
1531          };
1532  
1533          // Prepare the block requests.
1534          let (requests, sync_peers) = sync.prepare_block_requests();
1535  
1536          // If there are no peers, then there should be no requests.
1537          if peers.is_empty() {
1538              assert!(requests.is_empty());
1539              return;
1540          }
1541  
1542          // Otherwise, there should be requests.
1543          let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1544          assert_eq!(requests.len(), expected_num_requests);
1545  
1546          for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1547              // Construct the sync IPs.
1548              let sync_ips: IndexSet<_> =
1549                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1550              assert_eq!(height, 1 + idx as u32);
1551              assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1552              assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1553  
1554              if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1555                  assert_eq!(sync_ips.len(), 1);
1556              } else {
1557                  assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1558                  assert_eq!(sync_ips, peers);
1559              }
1560          }
1561      }
1562  
1563      /// Tests that height and hash values are set correctly using many different maximum block heights.
1564      #[test]
1565      fn test_latest_block_height() {
1566          for height in generate_block_heights(100_001, 5000) {
1567              let sync = sample_sync_at_height(height);
1568              // Check that the latest block height is the maximum height.
1569              assert_eq!(sync.ledger.latest_block_height(), height);
1570  
1571              // Check the hash to height mapping
1572              assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1573              assert_eq!(
1574                  sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1575                  height
1576              );
1577          }
1578      }
1579  
1580      #[test]
1581      fn test_get_block_hash() {
1582          for height in generate_block_heights(100_001, 5000) {
1583              let sync = sample_sync_at_height(height);
1584  
1585              // Check the height to hash mapping
1586              assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1587              assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1588          }
1589      }
1590  
1591      #[test]
1592      fn test_prepare_block_requests() {
1593          for num_peers in 0..111 {
1594              println!("Testing with {num_peers} peers");
1595  
1596              let sync = sample_sync_at_height(0);
1597  
1598              let mut peers = indexset![];
1599  
1600              for peer_id in 1..=num_peers {
1601                  // Add a peer.
1602                  sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1603                  // Add the peer to the set of peers.
1604                  peers.insert(sample_peer_ip(peer_id));
1605              }
1606  
1607              // If all peers are ahead, then requests should be prepared.
1608              check_prepare_block_requests(sync, 10, peers);
1609          }
1610      }
1611  
1612      #[test]
1613      fn test_prepare_block_requests_with_leading_fork_at_11() {
1614          let sync = sample_sync_at_height(0);
1615  
1616          // Intuitively, peer 1's fork is above peer 2 and peer 3's height.
1617          // So from peer 2 and peer 3's perspective, they don't even realize that peer 1 is on a fork.
1618          // Thus, you can sync up to block 10 from any of the 3 peers.
1619  
1620          // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 11,
1621          // then the sync pool should request blocks 1..=10 from the NUM_REDUNDANCY peers.
1622          // This is safe because the leading fork is at 11, and the sync pool is at 0,
1623          // so all candidate peers are at least 10 blocks ahead of the sync pool.
1624  
1625          // Add a peer (fork).
1626          let peer_1 = sample_peer_ip(1);
1627          sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1628  
1629          // Add a peer.
1630          let peer_2 = sample_peer_ip(2);
1631          sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1632  
1633          // Add a peer.
1634          let peer_3 = sample_peer_ip(3);
1635          sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1636  
1637          // Prepare the block requests.
1638          let (requests, _) = sync.prepare_block_requests();
1639          assert_eq!(requests.len(), 10);
1640  
1641          // Check the requests.
1642          for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1643              assert_eq!(height, 1 + idx as u32);
1644              assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1645              assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1646              assert_eq!(num_sync_ips, 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1647          }
1648      }
1649  
1650      #[test]
1651      fn test_prepare_block_requests_with_leading_fork_at_10() {
1652          let rng = &mut TestRng::default();
1653          let sync = sample_sync_at_height(0);
1654  
1655          // Intuitively, peer 1's fork is at peer 2 and peer 3's height.
1656          // So from peer 2 and peer 3's perspective, they recognize that peer 1 has forked.
1657          // Thus, you don't have NUM_REDUNDANCY peers to sync to block 10.
1658          //
1659          // Now, while you could in theory sync up to block 9 from any of the 3 peers,
1660          // we choose not to do this as either side is likely to disconnect from us,
1661          // and we would rather wait for enough redundant peers before syncing.
1662  
1663          // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 10,
1664          // then the sync pool should not request blocks as 1 peer conflicts with the other NUM_REDUNDANCY-1 peers.
1665          // We choose to sync with a cohort of peers that are *consistent* with each other,
1666          // and prioritize from descending heights (so the highest peer gets priority).
1667  
1668          // Add a peer (fork).
1669          let peer_1 = sample_peer_ip(1);
1670          sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1671  
1672          // Add a peer.
1673          let peer_2 = sample_peer_ip(2);
1674          sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1675  
1676          // Add a peer.
1677          let peer_3 = sample_peer_ip(3);
1678          sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1679  
1680          // Prepare the block requests.
1681          let (requests, _) = sync.prepare_block_requests();
1682          assert_eq!(requests.len(), 0);
1683  
1684          // When there are NUM_REDUNDANCY+1 peers ahead, and 1 is on a fork, then there should be block requests.
1685  
1686          // Add a peer.
1687          let peer_4 = sample_peer_ip(4);
1688          sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1689  
1690          // Prepare the block requests.
1691          let (requests, sync_peers) = sync.prepare_block_requests();
1692          assert_eq!(requests.len(), 10);
1693  
1694          // Check the requests.
1695          for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1696              // Construct the sync IPs.
1697              let sync_ips: IndexSet<_> =
1698                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1699              assert_eq!(height, 1 + idx as u32);
1700              assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1701              assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1702              assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1703              assert_ne!(sync_ips[0], peer_1); // It should never be the forked peer.
1704          }
1705      }
1706  
1707      #[test]
1708      fn test_prepare_block_requests_with_trailing_fork_at_9() {
1709          let rng = &mut TestRng::default();
1710          let sync = sample_sync_at_height(0);
1711  
1712          // Peer 1 and 2 diverge from peer 3 at block 10. We only sync when there are NUM_REDUNDANCY peers
1713          // who are *consistent* with each other. So if you add a 4th peer that is consistent with peer 1 and 2,
1714          // then you should be able to sync up to block 10, thereby biasing away from peer 3.
1715  
1716          // Add a peer (fork).
1717          let peer_1 = sample_peer_ip(1);
1718          sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1719  
1720          // Add a peer.
1721          let peer_2 = sample_peer_ip(2);
1722          sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1723  
1724          // Add a peer.
1725          let peer_3 = sample_peer_ip(3);
1726          sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1727  
1728          // Prepare the block requests.
1729          let (requests, _) = sync.prepare_block_requests();
1730          assert_eq!(requests.len(), 0);
1731  
1732          // When there are NUM_REDUNDANCY+1 peers ahead, and peer 3 is on a fork, then there should be block requests.
1733  
1734          // Add a peer.
1735          let peer_4 = sample_peer_ip(4);
1736          sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1737  
1738          // Prepare the block requests.
1739          let (requests, sync_peers) = sync.prepare_block_requests();
1740          assert_eq!(requests.len(), 10);
1741  
1742          // Check the requests.
1743          for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1744              // Construct the sync IPs.
1745              let sync_ips: IndexSet<_> =
1746                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1747              assert_eq!(height, 1 + idx as u32);
1748              assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1749              assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1750              assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1751              assert_ne!(sync_ips[0], peer_3); // It should never be the forked peer.
1752          }
1753      }
1754  
1755      #[test]
1756      fn test_insert_block_requests() {
1757          let rng = &mut TestRng::default();
1758          let sync = sample_sync_at_height(0);
1759  
1760          // Add a peer.
1761          sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1762  
1763          // Prepare the block requests.
1764          let (requests, sync_peers) = sync.prepare_block_requests();
1765          assert_eq!(requests.len(), 10);
1766  
1767          for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1768              // Construct the sync IPs.
1769              let sync_ips: IndexSet<_> =
1770                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1771              // Insert the block request.
1772              sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1773              // Check that the block requests were inserted.
1774              assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1775              assert!(sync.get_block_request_timestamp(height).is_some());
1776          }
1777  
1778          for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1779              // Construct the sync IPs.
1780              let sync_ips: IndexSet<_> =
1781                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1782              // Check that the block requests are still inserted.
1783              assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1784              assert!(sync.get_block_request_timestamp(height).is_some());
1785          }
1786  
1787          for (height, (hash, previous_hash, num_sync_ips)) in requests {
1788              // Construct the sync IPs.
1789              let sync_ips: IndexSet<_> =
1790                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1791              // Ensure that the block requests cannot be inserted twice.
1792              sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1793              // Check that the block requests are still inserted.
1794              assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1795              assert!(sync.get_block_request_timestamp(height).is_some());
1796          }
1797      }
1798  
1799      #[test]
1800      fn test_insert_block_requests_fails() {
1801          let sync = sample_sync_at_height(9);
1802  
1803          // Add a peer.
1804          sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1805  
1806          // Inserting a block height that is already in the ledger should fail.
1807          sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1808          // Inserting a block height that is not in the ledger should succeed.
1809          sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1810      }
1811  
1812      #[test]
1813      fn test_update_peer_locators() {
1814          let sync = sample_sync_at_height(0);
1815  
1816          // Test 2 peers.
1817          let peer1_ip = sample_peer_ip(1);
1818          for peer1_height in 0..500u32 {
1819              sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1820              assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1821  
1822              let peer2_ip = sample_peer_ip(2);
1823              for peer2_height in 0..500u32 {
1824                  println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1825  
1826                  sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1827                  assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1828  
1829                  // Compute the distance between the peers.
1830                  let distance = peer1_height.abs_diff(peer2_height);
1831  
1832                  // Check the common ancestor.
1833                  if distance < NUM_RECENT_BLOCKS as u32 {
1834                      let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1835                      assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1836                      assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1837                  } else {
1838                      let min_checkpoints =
1839                          core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1840                      let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1841                      assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1842                      assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1843                  }
1844              }
1845          }
1846      }
1847  
1848      #[test]
1849      fn test_remove_peer() {
1850          let sync = sample_sync_at_height(0);
1851  
1852          let peer_ip = sample_peer_ip(1);
1853          sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1854          assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1855  
1856          sync.remove_peer(&peer_ip);
1857          assert_eq!(sync.get_peer_height(&peer_ip), None);
1858  
1859          sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1860          assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1861  
1862          sync.remove_peer(&peer_ip);
1863          assert_eq!(sync.get_peer_height(&peer_ip), None);
1864      }
1865  
1866      #[test]
1867      fn test_locators_insert_remove_insert() {
1868          let sync = sample_sync_at_height(0);
1869  
1870          let peer_ip = sample_peer_ip(1);
1871          sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1872          assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1873  
1874          sync.remove_peer(&peer_ip);
1875          assert_eq!(sync.get_peer_height(&peer_ip), None);
1876  
1877          sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1878          assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1879      }
1880  
1881      #[test]
1882      fn test_requests_insert_remove_insert() {
1883          let rng = &mut TestRng::default();
1884          let sync = sample_sync_at_height(0);
1885  
1886          // Add a peer.
1887          let peer_ip = sample_peer_ip(1);
1888          sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1889  
1890          // Prepare the block requests.
1891          let (requests, sync_peers) = sync.prepare_block_requests();
1892          assert_eq!(requests.len(), 10);
1893  
1894          for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1895              // Construct the sync IPs.
1896              let sync_ips: IndexSet<_> =
1897                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1898              // Insert the block request.
1899              sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1900              // Check that the block requests were inserted.
1901              assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1902              assert!(sync.get_block_request_timestamp(height).is_some());
1903          }
1904  
1905          // Remove the peer.
1906          sync.remove_peer(&peer_ip);
1907  
1908          for (height, _) in requests {
1909              // Check that the block requests were removed.
1910              assert_eq!(sync.get_block_request(height), None);
1911              assert!(sync.get_block_request_timestamp(height).is_none());
1912          }
1913  
1914          // As there is no peer, it should not be possible to prepare block requests.
1915          let (requests, _) = sync.prepare_block_requests();
1916          assert_eq!(requests.len(), 0);
1917  
1918          // Add the peer again.
1919          sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1920  
1921          // Prepare the block requests.
1922          let (requests, _) = sync.prepare_block_requests();
1923          assert_eq!(requests.len(), 10);
1924  
1925          for (height, (hash, previous_hash, num_sync_ips)) in requests {
1926              // Construct the sync IPs.
1927              let sync_ips: IndexSet<_> =
1928                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1929              // Insert the block request.
1930              sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1931              // Check that the block requests were inserted.
1932              assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1933              assert!(sync.get_block_request_timestamp(height).is_some());
1934          }
1935      }
1936  
1937      #[test]
1938      fn test_obsolete_block_requests() {
1939          let rng = &mut TestRng::default();
1940          let sync = sample_sync_at_height(0);
1941  
1942          let locator_height = rng.gen_range(0..50);
1943  
1944          // Add a peer.
1945          let locators = sample_block_locators(locator_height);
1946          sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1947  
1948          // Construct block requests
1949          let (requests, sync_peers) = sync.prepare_block_requests();
1950          assert_eq!(requests.len(), locator_height as usize);
1951  
1952          // Add the block requests to the sync module.
1953          for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1954              // Construct the sync IPs.
1955              let sync_ips: IndexSet<_> =
1956                  sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1957              // Insert the block request.
1958              sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1959              // Check that the block requests were inserted.
1960              assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1961              assert!(sync.get_block_request_timestamp(height).is_some());
1962          }
1963  
1964          // Duplicate a new sync module with a different height to simulate block advancement.
1965          // This range needs to be inclusive, so that the range is never empty,
1966          // even with a locator height of 0.
1967          let ledger_height = rng.gen_range(0..=locator_height);
1968          let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
1969  
1970          // Check that the number of requests is the same.
1971          assert_eq!(new_sync.requests.read().len(), requests.len());
1972  
1973          // Remove timed out block requests.
1974          let c = DummyPeerPoolHandler::default();
1975          new_sync.handle_block_request_timeouts(&c).unwrap();
1976  
1977          // Check that the number of requests is reduced based on the ledger height.
1978          assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
1979      }
1980  
1981      #[test]
1982      fn test_timed_out_block_request() {
1983          let sync = sample_sync_at_height(0);
1984          let peer_ip = sample_peer_ip(1);
1985          let locators = sample_block_locators(10);
1986          let block_hash = locators.get_hash(1);
1987  
1988          sync.update_peer_locators(peer_ip, &locators).unwrap();
1989  
1990          let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1991  
1992          // Add a timed-out request
1993          sync.requests.write().insert(1, OutstandingRequest {
1994              request: (block_hash, None, [peer_ip].into()),
1995              timestamp,
1996              response: None,
1997          });
1998  
1999          assert_eq!(sync.requests.read().len(), 1);
2000          assert_eq!(sync.locators.read().len(), 1);
2001  
2002          // Remove timed out block requests.
2003          let c = DummyPeerPoolHandler::default();
2004          sync.handle_block_request_timeouts(&c).unwrap();
2005  
2006          // let ban_list = c.peers_to_ban.write();
2007          // assert_eq!(ban_list.len(), 1);
2008          // assert_eq!(ban_list.iter().next(), Some(&peer_ip));
2009  
2010          assert!(sync.requests.read().is_empty());
2011          assert!(sync.locators.read().is_empty());
2012      }
2013  
2014      #[test]
2015      fn test_reissue_timed_out_block_request() {
2016          let sync = sample_sync_at_height(0);
2017          let peer_ip1 = sample_peer_ip(1);
2018          let peer_ip2 = sample_peer_ip(2);
2019          let peer_ip3 = sample_peer_ip(3);
2020  
2021          let locators = sample_block_locators(10);
2022          let block_hash1 = locators.get_hash(1);
2023          let block_hash2 = locators.get_hash(2);
2024  
2025          sync.update_peer_locators(peer_ip1, &locators).unwrap();
2026          sync.update_peer_locators(peer_ip2, &locators).unwrap();
2027          sync.update_peer_locators(peer_ip3, &locators).unwrap();
2028  
2029          assert_eq!(sync.locators.read().len(), 3);
2030  
2031          let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2032  
2033          // Add a timed-out request
2034          sync.requests.write().insert(1, OutstandingRequest {
2035              request: (block_hash1, None, [peer_ip1].into()),
2036              timestamp,
2037              response: None,
2038          });
2039  
2040          // Add a timed-out request
2041          sync.requests.write().insert(2, OutstandingRequest {
2042              request: (block_hash2, None, [peer_ip2].into()),
2043              timestamp: Instant::now(),
2044              response: None,
2045          });
2046  
2047          assert_eq!(sync.requests.read().len(), 2);
2048  
2049          // Remove timed out block requests.
2050          let c = DummyPeerPoolHandler::default();
2051  
2052          let re_requests = sync.handle_block_request_timeouts(&c).unwrap();
2053  
2054          // let ban_list = c.peers_to_ban.write();
2055          // assert_eq!(ban_list.len(), 1);
2056          // assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
2057  
2058          assert_eq!(sync.requests.read().len(), 1);
2059          assert_eq!(sync.locators.read().len(), 2);
2060  
2061          let (new_requests, new_sync_ips) = re_requests.unwrap();
2062          assert_eq!(new_requests.len(), 1);
2063  
2064          let (height, (hash, _, _)) = new_requests.first().unwrap();
2065          assert_eq!(*height, 1);
2066          assert_eq!(*hash, block_hash1);
2067          assert_eq!(new_sync_ips.len(), 2);
2068  
2069          // Make sure the removed peer is not in the sync_peer set.
2070          let mut iter = new_sync_ips.iter();
2071          assert_ne!(iter.next().unwrap().0, &peer_ip1);
2072          assert_ne!(iter.next().unwrap().0, &peer_ip1);
2073      }
2074  
2075      #[test]
2076      fn test_processing_timeout_logic() {
2077          // Test that a completed request (all peers responded) times out after BLOCK_PROCESSING_TIMEOUT
2078          let now = Instant::now();
2079          let old_timestamp = now - Duration::from_secs(150); // 150s ago (exceeds 120s processing timeout)
2080  
2081          // Simulate a completed request (empty sync_ips means all peers responded)
2082          let is_complete = true; // sync_ips().is_empty()
2083          let peer_timeout_elapsed = old_timestamp.elapsed() > BLOCK_REQUEST_TIMEOUT; // false (150s < 600s)
2084          let processing_timeout_elapsed = old_timestamp.elapsed() > BLOCK_PROCESSING_TIMEOUT; // true (150s > 120s)
2085  
2086          let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed);
2087  
2088          assert!(!peer_timeout_elapsed, "Should not exceed peer timeout yet");
2089          assert!(processing_timeout_elapsed, "Should exceed processing timeout");
2090          assert!(is_timeout, "Should timeout due to processing delay");
2091      }
2092  
2093      #[test]
2094      fn test_peer_timeout_logic() {
2095          // Test that an incomplete request (waiting for peers) times out after BLOCK_REQUEST_TIMEOUT
2096          let now = Instant::now();
2097          let old_timestamp = now - Duration::from_secs(700); // 700s ago (exceeds 600s peer timeout)
2098  
2099          // Simulate an incomplete request (non-empty sync_ips means waiting for peers)
2100          let is_complete = false; // !sync_ips().is_empty()
2101          let peer_timeout_elapsed = old_timestamp.elapsed() > BLOCK_REQUEST_TIMEOUT; // true (700s > 600s)
2102          let processing_timeout_elapsed = old_timestamp.elapsed() > BLOCK_PROCESSING_TIMEOUT; // true (700s > 120s)
2103  
2104          let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed);
2105  
2106          assert!(peer_timeout_elapsed, "Should exceed peer timeout");
2107          assert!(processing_timeout_elapsed, "Should also exceed processing timeout");
2108          assert!(is_timeout, "Should timeout due to peer delay");
2109      }
2110  
2111      #[test]
2112      fn test_no_timeout_when_fresh() {
2113          // Test that a recent request does not timeout
2114          let now = Instant::now();
2115          let recent_timestamp = now - Duration::from_secs(10); // 10s ago (fresh)
2116  
2117          // Test both completed and incomplete states
2118          for is_complete in [true, false] {
2119              let peer_timeout_elapsed = recent_timestamp.elapsed() > BLOCK_REQUEST_TIMEOUT; // false
2120              let processing_timeout_elapsed = recent_timestamp.elapsed() > BLOCK_PROCESSING_TIMEOUT; // false
2121  
2122              let is_timeout = (!is_complete && peer_timeout_elapsed) || (is_complete && processing_timeout_elapsed);
2123  
2124              assert!(!peer_timeout_elapsed, "Should not exceed peer timeout");
2125              assert!(!processing_timeout_elapsed, "Should not exceed processing timeout");
2126              assert!(!is_timeout, "Fresh request should not timeout (is_complete={})", is_complete);
2127          }
2128      }
2129  
2130      #[test]
2131      fn test_timeout_boundary_conditions() {
2132          // Test boundary conditions for both timeouts
2133          let now = Instant::now();
2134  
2135          // Just before processing timeout (119s)
2136          let just_before_processing = now - Duration::from_secs(119);
2137          let is_complete = true;
2138          let processing_timeout_elapsed = just_before_processing.elapsed() > BLOCK_PROCESSING_TIMEOUT;
2139          let is_timeout = is_complete && processing_timeout_elapsed;
2140          assert!(!is_timeout, "Should not timeout at 119s for processing");
2141  
2142          // Just after processing timeout (121s)
2143          let just_after_processing = now - Duration::from_secs(121);
2144          let processing_timeout_elapsed = just_after_processing.elapsed() > BLOCK_PROCESSING_TIMEOUT;
2145          let is_timeout = is_complete && processing_timeout_elapsed;
2146          assert!(is_timeout, "Should timeout at 121s for processing");
2147  
2148          // Just before peer timeout (599s)
2149          let just_before_peer = now - Duration::from_secs(599);
2150          let is_complete = false;
2151          let peer_timeout_elapsed = just_before_peer.elapsed() > BLOCK_REQUEST_TIMEOUT;
2152          let is_timeout = !is_complete && peer_timeout_elapsed;
2153          assert!(!is_timeout, "Should not timeout at 599s for peer response");
2154  
2155          // Just after peer timeout (601s)
2156          let just_after_peer = now - Duration::from_secs(601);
2157          let peer_timeout_elapsed = just_after_peer.elapsed() > BLOCK_REQUEST_TIMEOUT;
2158          let is_timeout = !is_complete && peer_timeout_elapsed;
2159          assert!(is_timeout, "Should timeout at 601s for peer response");
2160      }
2161  }