/ node / bft / src / sync / mod.rs
mod.rs
   1  // Copyright (c) 2025 ADnet Contributors
   2  // This file is part of the AlphaOS library.
   3  
   4  // Licensed under the Apache License, Version 2.0 (the "License");
   5  // you may not use this file except in compliance with the License.
   6  // You may obtain a copy of the License at:
   7  
   8  // http://www.apache.org/licenses/LICENSE-2.0
   9  
  10  // Unless required by applicable law or agreed to in writing, software
  11  // distributed under the License is distributed on an "AS IS" BASIS,
  12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13  // See the License for the specific language governing permissions and
  14  // limitations under the License.
  15  
  16  use crate::{
  17      Gateway,
  18      MAX_FETCH_TIMEOUT_IN_MS,
  19      Transport,
  20      events::DataBlocks,
  21      helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
  22      spawn_blocking,
  23  };
  24  use alphaos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
  25  use alphaos_node_bft_ledger_service::LedgerService;
  26  use alphaos_node_network::PeerPoolHandling;
  27  use alphaos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
  28  
  29  use alphavm::{
  30      console::{
  31          network::{ConsensusVersion, Network},
  32          types::Field,
  33      },
  34      ledger::{PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
  35      utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
  36  };
  37  
  38  use anyhow::{Context, Result, anyhow, bail, ensure};
  39  use indexmap::IndexMap;
  40  #[cfg(feature = "locktick")]
  41  use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
  42  #[cfg(not(feature = "locktick"))]
  43  use parking_lot::Mutex;
  44  #[cfg(not(feature = "serial"))]
  45  use rayon::prelude::*;
  46  use std::{
  47      collections::{HashMap, VecDeque},
  48      future::Future,
  49      net::SocketAddr,
  50      sync::Arc,
  51      time::Duration,
  52  };
  53  #[cfg(not(feature = "locktick"))]
  54  use tokio::sync::Mutex as TMutex;
  55  use tokio::{
  56      sync::{OnceCell, oneshot},
  57      task::JoinHandle,
  58  };
  59  
  60  /// Block synchronization logic for validators.
  61  ///
  62  /// Synchronization works differently for nodes that act as validators in AleoBFT;
  63  /// In the common case, validators generate blocks after receiving an anchor block that has been accepted
  64  /// by a supermajority of the committee instead of fetching entire blocks from other nodes.
  65  /// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
  66  ///
  67  /// This struct also manages fetching certificates from other validators during normal operation,
  68  /// and blocks when falling behind.
  69  ///
  70  /// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
  71  /// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
  72  #[derive(Clone)]
  73  pub struct Sync<N: Network> {
  74      /// The gateway enables communication with other validators.
  75      gateway: Gateway<N>,
  76      /// The storage.
  77      storage: Storage<N>,
  78      /// The ledger service.
  79      ledger: Arc<dyn LedgerService<N>>,
  80      /// The block synchronization logic.
  81      block_sync: Arc<BlockSync<N>>,
  82      /// The pending certificates queue.
  83      pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
  84      /// The BFT sender.
  85      bft_sender: Arc<OnceCell<BFTSender<N>>>,
  86      /// Handles to the spawned background tasks.
  87      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
  88      /// The response lock.
  89      response_lock: Arc<TMutex<()>>,
  90      /// The sync lock. Ensures that only one task syncs the ledger at a time.
  91      sync_lock: Arc<TMutex<()>>,
  92      /// The latest block responses.
  93      ///
  94      /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks
  95      /// whose addition to the ledger is deferred until certain checks pass.
  96      /// Blocks need to be processed in order, hence a BTree map.
  97      ///
  98      /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
  99      pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>,
 100  }
 101  
 102  impl<N: Network> Sync<N> {
 103      /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
 104      /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
 105      const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
 106  
 107      /// Initializes a new sync instance.
 108      pub fn new(
 109          gateway: Gateway<N>,
 110          storage: Storage<N>,
 111          ledger: Arc<dyn LedgerService<N>>,
 112          block_sync: Arc<BlockSync<N>>,
 113      ) -> Self {
 114          // Return the sync instance.
 115          Self {
 116              gateway,
 117              storage,
 118              ledger,
 119              block_sync,
 120              pending: Default::default(),
 121              bft_sender: Default::default(),
 122              handles: Default::default(),
 123              response_lock: Default::default(),
 124              sync_lock: Default::default(),
 125              pending_blocks: Default::default(),
 126          }
 127      }
 128  
 129      /// Initializes the sync module and sync the storage with the ledger at bootup.
 130      pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
 131          // If a BFT sender was provided, set it.
 132          if let Some(bft_sender) = bft_sender {
 133              self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
 134          }
 135  
 136          info!("Syncing storage with the ledger...");
 137  
 138          // Sync the storage with the ledger.
 139          self.sync_storage_with_ledger_at_bootup()
 140              .await
 141              .with_context(|| "Syncing storage with the ledger at bootup failed")?;
 142  
 143          debug!("Finished initial block synchronization at startup");
 144          Ok(())
 145      }
 146  
 147      /// Sends the given batch of block requests to peers.
 148      ///
 149      /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
 150      #[inline]
 151      async fn send_block_requests(
 152          &self,
 153  
 154          block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
 155          sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
 156      ) {
 157          trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
 158  
 159          // Sends the block requests to the sync peers.
 160          for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
 161              if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
 162                  // Stop if we fail to process a batch of requests.
 163                  break;
 164              }
 165  
 166              // Sleep to avoid triggering spam detection.
 167              tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
 168          }
 169      }
 170  
 171      /// Starts the sync module.
 172      ///
 173      /// When this function returns successfully, the sync module will have spawned background tasks
 174      /// that fetch blocks from other validators.
 175      pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
 176          info!("Starting the sync module...");
 177  
 178          // Start the block request generation loop (outgoing).
 179          let self_ = self.clone();
 180          self.spawn(async move {
 181              loop {
 182                  // Wait for peer updates or timeout
 183                  let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
 184  
 185                  // Issue block requests to peers.
 186                  self_.try_issuing_block_requests().await;
 187  
 188                  // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here.
 189              }
 190          });
 191  
 192          // Start the block response processing loop (incoming).
 193          let self_ = self.clone();
 194          let ping = ping.clone();
 195          self.spawn(async move {
 196              loop {
 197                  // Wait until there is something to do or until the timeout.
 198                  let _ =
 199                      tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
 200  
 201                  let ping = ping.clone();
 202                  let self_ = self_.clone();
 203                  let hdl = tokio::spawn(async move {
 204                      self_.try_advancing_block_synchronization(&ping).await;
 205                  });
 206  
 207                  if let Err(err) = hdl.await
 208                      && let Ok(panic) = err.try_into_panic()
 209                  {
 210                      error!("Sync block advancement panicked: {panic:?}");
 211                  }
 212  
 213                  // We perform no additional rate limiting here as
 214                  // requests are already rate-limited.
 215              }
 216          });
 217  
 218          // Start the pending queue expiration loop.
 219          let self_ = self.clone();
 220          self.spawn(async move {
 221              loop {
 222                  // Sleep briefly.
 223                  tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
 224  
 225                  // Remove the expired pending transmission requests.
 226                  let self__ = self_.clone();
 227                  let _ = spawn_blocking!({
 228                      self__.pending.clear_expired_callbacks();
 229                      Ok(())
 230                  });
 231              }
 232          });
 233  
 234          /* Set up callbacks for events from the Gateway */
 235  
 236          // Retrieve the sync receiver.
 237          let SyncReceiver {
 238              mut rx_block_sync_insert_block_response,
 239              mut rx_block_sync_remove_peer,
 240              mut rx_block_sync_update_peer_locators,
 241              mut rx_certificate_request,
 242              mut rx_certificate_response,
 243          } = sync_receiver;
 244  
 245          // Process the block sync request to advance with sync blocks.
 246          // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
 247          // which is initially handled by [`Gateway::inbound()`],
 248          // which calls [`SyncSender::advance_with_sync_blocks()`],
 249          // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
 250          // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
 251          let self_ = self.clone();
 252          self.spawn(async move {
 253              while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
 254                  rx_block_sync_insert_block_response.recv().await
 255              {
 256                  let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
 257                  //TODO remove this once channels are gone
 258                  if let Err(err) = &result {
 259                      warn!("Failed to insret block response: {err:?}");
 260                  }
 261  
 262                  callback.send(result).ok();
 263              }
 264          });
 265  
 266          // Process the block sync request to remove the peer.
 267          let self_ = self.clone();
 268          self.spawn(async move {
 269              while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
 270                  self_.remove_peer(peer_ip);
 271              }
 272          });
 273  
 274          // Process each block sync request to update peer locators.
 275          // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
 276          // which is initially handled by [`Gateway::inbound()`],
 277          // which calls [`SyncSender::update_peer_locators()`],
 278          // which calls [`tx_block_sync_update_peer_locators.send()`],
 279          // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
 280          let self_ = self.clone();
 281          self.spawn(async move {
 282              while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
 283                  let self_clone = self_.clone();
 284                  tokio::spawn(async move {
 285                      callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
 286                  });
 287              }
 288          });
 289  
 290          // Process each certificate request.
 291          // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
 292          // which is initially handled by [`Gateway::inbound()`],
 293          // which calls [`tx_certificate_request.send()`],
 294          // which causes the `rx_certificate_request.recv()` call below to return.
 295          let self_ = self.clone();
 296          self.spawn(async move {
 297              while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
 298                  self_.send_certificate_response(peer_ip, certificate_request);
 299              }
 300          });
 301  
 302          // Process each certificate response.
 303          // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
 304          // which is initially handled by [`Gateway::inbound()`],
 305          // which calls [`tx_certificate_response.send()`],
 306          // which causes the `rx_certificate_response.recv()` call below to return.
 307          let self_ = self.clone();
 308          self.spawn(async move {
 309              while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
 310                  self_.finish_certificate_request(peer_ip, certificate_response);
 311              }
 312          });
 313  
 314          Ok(())
 315      }
 316  
 317      /// BFT-specific version of `Client::try_issuing_block_requests()`.
 318      ///
 319      /// This method handles timeout removal, checks if block sync is possible,
 320      /// and issues block requests to peers.
 321      async fn try_issuing_block_requests(&self) {
 322          // Update the sync height to the latest ledger height.
 323          // (if the ledger height is lower or equal to the current sync height, this is a noop)
 324          self.block_sync.set_sync_height(self.ledger.latest_block_height());
 325  
 326          // Check if any existing requests can be removed.
 327          // We should do this even if we cannot block sync, to ensure
 328          // there are no dangling block requests.
 329          match self.block_sync.handle_block_request_timeouts(&self.gateway) {
 330              Ok(Some((requests, sync_peers))) => {
 331                  // Re-request blocks instead of performing regular block sync.
 332                  self.send_block_requests(requests, sync_peers).await;
 333                  return;
 334              }
 335              Ok(None) => {}
 336              Err(err) => {
 337                  // Abort and retry later.
 338                  error!("{}", &flatten_error(err));
 339                  return;
 340              }
 341          }
 342  
 343          // Do not attempt to sync if there are no blocks to sync.
 344          // This prevents redundant log messages and performing unnecessary computation.
 345          if !self.block_sync.can_block_sync() {
 346              return;
 347          }
 348  
 349          // Prepare the block requests, if any.
 350          // In the process, we update the state of `is_block_synced` for the sync module.
 351          let (requests, sync_peers) = self.block_sync.prepare_block_requests();
 352  
 353          // If there are no block requests, return early.
 354          if requests.is_empty() {
 355              return;
 356          }
 357  
 358          // Send the block requests to peers.
 359          self.send_block_requests(requests, sync_peers).await;
 360      }
 361  
 362      /// Test-only method to manually trigger block synchronization.
 363      /// This combines both request generation and response processing for testing purposes.
 364      #[cfg(test)]
 365      pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
 366          // First try issuing block requests
 367          self.try_issuing_block_requests().await;
 368  
 369          // Then try advancing with any available responses
 370          self.try_advancing_block_synchronization(&None).await;
 371      }
 372  }
 373  
 374  // Callbacks used when receiving messages from the Gateway
 375  impl<N: Network> Sync<N> {
 376      /// We received a block response and can (possibly) advance synchronization.
 377      async fn insert_block_response(
 378          &self,
 379          peer_ip: SocketAddr,
 380          blocks: Vec<Block<N>>,
 381          latest_consensus_version: Option<ConsensusVersion>,
 382      ) -> Result<()> {
 383          // Verify that the response is valid and add it to block sync.
 384          self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
 385  
 386          // No need to advance block sync here, as the new response will
 387          // notify the incoming task.
 388      }
 389  
 390      /// We received new peer locators during a Ping.
 391      fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
 392          self.block_sync.update_peer_locators(peer_ip, &locators)
 393      }
 394  
 395      /// A peer disconnected.
 396      fn remove_peer(&self, peer_ip: SocketAddr) {
 397          self.block_sync.remove_peer(&peer_ip);
 398      }
 399  
 400      #[cfg(test)]
 401      pub fn testing_only_update_peer_locators_testing_only(
 402          &self,
 403          peer_ip: SocketAddr,
 404          locators: BlockLocators<N>,
 405      ) -> Result<()> {
 406          self.update_peer_locators(peer_ip, locators)
 407      }
 408  }
 409  
 410  // Methods to manage storage.
 411  impl<N: Network> Sync<N> {
 412      /// Syncs the storage with the ledger at bootup.
 413      ///
 414      /// This is called when starting the validator and after finishing a sync without BFT.
 415      async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
 416          // Retrieve the latest block in the ledger.
 417          let latest_block = self.ledger.latest_block();
 418  
 419          // Retrieve the block height.
 420          let block_height = latest_block.height();
 421          // Determine the maximum number of blocks corresponding to rounds
 422          // that would not have been garbage collected, i.e. that would be kept in storage.
 423          // Since at most one block is created every two rounds,
 424          // this is half of the maximum number of rounds kept in storage.
 425          let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
 426          // Determine the earliest height of blocks corresponding to rounds kept in storage,
 427          // conservatively set to the block height minus the maximum number of blocks calculated above.
 428          // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
 429          let gc_height = block_height.saturating_sub(max_gc_blocks);
 430          // Retrieve the blocks.
 431          let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
 432  
 433          // Acquire the sync lock.
 434          let _lock = self.sync_lock.lock().await;
 435  
 436          debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
 437  
 438          /* Sync storage */
 439  
 440          // Sync the height with the block.
 441          self.storage.sync_height_with_block(latest_block.height());
 442          // Sync the round with the block.
 443          self.storage.sync_round_with_block(latest_block.round());
 444          // Perform GC on the latest block round.
 445          self.storage.garbage_collect_certificates(latest_block.round());
 446          // Iterate over the blocks.
 447          for block in &blocks {
 448              // If the block authority is a sub-DAG, then sync the batch certificates with the block.
 449              // Note that the block authority is always a sub-DAG in production;
 450              // beacon signatures are only used for testing,
 451              // and as placeholder (irrelevant) block authority in the genesis block.
 452              if let Authority::Quorum(subdag) = block.authority() {
 453                  // Reconstruct the unconfirmed transactions.
 454                  let unconfirmed_transactions = cfg_iter!(block.transactions())
 455                      .filter_map(|tx| {
 456                          tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
 457                      })
 458                      .collect::<HashMap<_, _>>();
 459  
 460                  // Iterate over the certificates.
 461                  for certificates in subdag.values().cloned() {
 462                      cfg_into_iter!(certificates).for_each(|certificate| {
 463                          self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
 464                      });
 465                  }
 466  
 467                  // Update the validator telemetry.
 468                  #[cfg(feature = "telemetry")]
 469                  self.gateway.validator_telemetry().insert_subdag(subdag);
 470              }
 471          }
 472  
 473          /* Sync the BFT DAG */
 474  
 475          // Construct a list of the certificates.
 476          let certificates = blocks
 477              .iter()
 478              .flat_map(|block| {
 479                  match block.authority() {
 480                      // If the block authority is a beacon, then skip the block.
 481                      Authority::Beacon(_) => None,
 482                      // If the block authority is a subdag, then retrieve the certificates.
 483                      Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
 484                  }
 485              })
 486              .flatten()
 487              .collect::<Vec<_>>();
 488  
 489          // If a BFT sender was provided, send the certificates to the BFT.
 490          if let Some(bft_sender) = self.bft_sender.get() {
 491              // Await the callback to continue.
 492              bft_sender
 493                  .tx_sync_bft_dag_at_bootup
 494                  .send(certificates)
 495                  .await
 496                  .with_context(|| "Failed to update the BFT DAG from sync")?;
 497          }
 498  
 499          self.block_sync.set_sync_height(block_height);
 500  
 501          Ok(())
 502      }
 503  
 504      /// Returns which height we are synchronized to.
 505      /// If there are queued block responses, this might be higher than the latest block in the ledger.
 506      async fn compute_sync_height(&self) -> u32 {
 507          let ledger_height = self.ledger.latest_block_height();
 508          let mut pending_blocks = self.pending_blocks.lock().await;
 509  
 510          // Remove any old responses.
 511          while let Some(b) = pending_blocks.front()
 512              && b.height() <= ledger_height
 513          {
 514              pending_blocks.pop_front();
 515          }
 516  
 517          // Ensure the returned value is always greater or equal than ledger height.
 518          pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
 519      }
 520  
 521      /// BFT-version of [`alphaos_node_client::Client::try_advancing_block_synchronization`].
 522      async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
 523          // Process block responses and advance the ledger.
 524          let new_blocks = match self
 525              .try_advancing_block_synchronization_inner()
 526              .await
 527              .with_context(|| "Block synchronization failed")
 528          {
 529              Ok(new_blocks) => new_blocks,
 530              Err(err) => {
 531                  error!("{}", &flatten_error(err));
 532                  false
 533              }
 534          };
 535  
 536          if let Some(ping) = &ping
 537              && new_blocks
 538          {
 539              match self.get_block_locators() {
 540                  Ok(locators) => ping.update_block_locators(locators),
 541                  Err(err) => error!("Failed to update block locators: {err}"),
 542              }
 543          }
 544      }
 545  
 546      /// Aims to advance synchronization using any recent block responses received from peers.
 547      ///
 548      /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
 549      /// and is called periodically at runtime.
 550      ///
 551      /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
 552      ///
 553      /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
 554      /// If blocks are not confirmed yet, they will be kept in [`Self::pending_blocks`].
 555      /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
 556      /// (see [`Self::sync_storage_with_block`] for more details).
 557      ///
 558      /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
 559      /// which syncs without updating the BFT state.
 560      async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
 561          // Acquire the response lock.
 562          let _lock = self.response_lock.lock().await;
 563  
 564          // For sanity, set the sync height again.
 565          // (if the sync height is already larger or equal, this is a noop)
 566          let ledger_height = self.ledger.latest_block_height();
 567          self.block_sync.set_sync_height(ledger_height);
 568  
 569          // Retrieve the maximum block height of the peers.
 570          let tip = self
 571              .block_sync
 572              .find_sync_peers()
 573              .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
 574              .unwrap_or(0);
 575  
 576          // Determine the maximum number of blocks corresponding to rounds
 577          // that would not have been garbage collected, i.e. that would be kept in storage.
 578          // Since at most one block is created every two rounds,
 579          // this is half of the maximum number of rounds kept in storage.
 580          let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
 581  
 582          // Updates sync state and returns the error (if any).
 583          let cleanup = |start_height, current_height, error| {
 584              let new_blocks = current_height > start_height;
 585  
 586              // Make the underlying `BlockSync` instance aware of the new sync height.
 587              if new_blocks {
 588                  self.block_sync.set_sync_height(current_height);
 589              }
 590  
 591              if let Some(err) = error { Err(err) } else { Ok(new_blocks) }
 592          };
 593  
 594          // Determine the earliest height of blocks corresponding to rounds kept in storage,
 595          // conservatively set to the block height minus the maximum number of blocks calculated above.
 596          // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
 597          let max_gc_height = tip.saturating_sub(max_gc_blocks);
 598          let within_gc = (ledger_height + 1) > max_gc_height;
 599  
 600          if within_gc {
 601              // Retrieve the current height, based on the ledger height and the
 602              // (unconfirmed) blocks that are already queued up.
 603              let start_height = self.compute_sync_height().await;
 604  
 605              // The height is incremented as blocks are added.
 606              let mut current_height = start_height;
 607              trace!(
 608                  "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})",
 609                  self.block_sync.get_sync_speed()
 610              );
 611  
 612              // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
 613              loop {
 614                  let next_height = current_height + 1;
 615                  let Some(block) = self.block_sync.peek_next_block(next_height) else {
 616                      break;
 617                  };
 618                  info!("Syncing the BFT to block {}...", block.height());
 619                  // Sync the storage with the block.
 620                  match self.sync_storage_with_block(block).await {
 621                      Ok(_) => {
 622                          // Update the current height if sync succeeds.
 623                          current_height = next_height;
 624                      }
 625                      Err(err) => {
 626                          // Mark the current height as processed in block_sync.
 627                          self.block_sync.remove_block_response(next_height);
 628                          return cleanup(start_height, current_height, Some(err));
 629                      }
 630                  }
 631              }
 632  
 633              cleanup(start_height, current_height, None)
 634          } else {
 635              // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately
 636              // added to it and not queue up in `latest_block_responses`.
 637              let start_height = ledger_height;
 638              let mut current_height = start_height;
 639  
 640              trace!("Try advancing block responses without BFT (starting at block {current_height})");
 641  
 642              // Try to advance the ledger *to tip* without updating the BFT.
 643              // TODO(kaimast): why to tip and not to tip-GC?
 644              loop {
 645                  let next_height = current_height + 1;
 646  
 647                  let Some(block) = self.block_sync.peek_next_block(next_height) else {
 648                      break;
 649                  };
 650                  info!("Syncing the ledger to block {}...", block.height());
 651  
 652                  // Sync the ledger with the block without BFT.
 653                  match self.sync_ledger_with_block_without_bft(block).await {
 654                      Ok(_) => {
 655                          // Update the current height if sync succeeds.
 656                          current_height = next_height;
 657                          self.block_sync.count_request_completed();
 658                      }
 659                      Err(err) => {
 660                          // Mark the current height as processed in block_sync.
 661                          self.block_sync.remove_block_response(next_height);
 662                          return cleanup(start_height, current_height, Some(err));
 663                      }
 664                  }
 665              }
 666  
 667              // Sync the storage with the ledger if we should transition to the BFT sync.
 668              let within_gc = (current_height + 1) > max_gc_height;
 669              if within_gc {
 670                  info!("Finished catching up with the network. Switching back to BFT sync.");
 671                  self.sync_storage_with_ledger_at_bootup()
 672                      .await
 673                      .with_context(|| "BFT sync (with bootup routine) failed")?;
 674              }
 675  
 676              cleanup(start_height, current_height, None)
 677          }
 678      }
 679  
 680      /// Syncs the ledger with the given block without updating the BFT.
 681      ///
 682      /// This is only used by `[Self::try_advancing_block_synchronization`].
 683      async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
 684          // Acquire the sync lock.
 685          let _lock = self.sync_lock.lock().await;
 686  
 687          let self_ = self.clone();
 688          spawn_blocking!({
 689              // Check the next block.
 690              self_.ledger.check_next_block(&block)?;
 691              // Attempt to advance to the next block.
 692              self_.ledger.advance_to_next_block(&block)?;
 693  
 694              // Sync the height with the block.
 695              self_.storage.sync_height_with_block(block.height());
 696              // Sync the round with the block.
 697              self_.storage.sync_round_with_block(block.round());
 698              // Mark the block height as processed in block_sync.
 699              self_.block_sync.remove_block_response(block.height());
 700  
 701              Ok(())
 702          })
 703      }
 704  
 705      /// Helper function for [`Self::sync_storage_with_block`].
 706      /// It syncs the batch certificates with the BFT, if the block's authority is a sub-DAG.
 707      ///
 708      /// Note that the block authority is always a sub-DAG in production; beacon signatures are only used for testing,
 709      /// and as placeholder (irrelevant) block authority in the genesis block.
 710      async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
 711          // Nothing to do if this is a beacon block
 712          let Authority::Quorum(subdag) = block.authority() else {
 713              return Ok(());
 714          };
 715  
 716          // Reconstruct the unconfirmed transactions.
 717          let unconfirmed_transactions = cfg_iter!(block.transactions())
 718              .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
 719              .collect::<HashMap<_, _>>();
 720  
 721          // Iterate over the certificates.
 722          for certificates in subdag.values().cloned() {
 723              cfg_into_iter!(certificates.clone()).for_each(|certificate| {
 724                  // Sync the batch certificate with the block.
 725                  self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions);
 726              });
 727  
 728              // Sync the BFT DAG with the certificates.
 729              for certificate in certificates {
 730                  // If a BFT sender was provided, send the certificate to the BFT.
 731                  // For validators, BFT spawns a receiver task in `BFT::start_handlers`.
 732                  if let Some(bft_sender) = self.bft_sender.get() {
 733                      let (callback_tx, callback_rx) = oneshot::channel();
 734                      bft_sender
 735                          .tx_sync_bft
 736                          .send((certificate, callback_tx))
 737                          .await
 738                          .with_context(|| "Failed to sync certificate")?;
 739                      callback_rx.await?.with_context(|| "Failed to sync certificate")?;
 740                  }
 741              }
 742          }
 743          Ok(())
 744      }
 745  
 746      /// Helper function for [`Self::sync_storage_with_block`].
 747      ///
 748      /// It checks that successor of a given block contains enough votes to commit it.
 749      /// This can only return `Ok(true)` if the certificates of the block's successor were added to the storage.
 750      fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> {
 751          // Fetch the leader certificate and the relevant rounds.
 752          let leader_certificate = match block.authority() {
 753              Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
 754              _ => bail!("Received a block with an unexpected authority type."),
 755          };
 756          let commit_round = leader_certificate.round();
 757          let certificate_round =
 758              commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
 759  
 760          // Get the committee lookback for the round just after the leader.
 761          let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
 762          // Retrieve all of the certificates for the round just after the leader.
 763          let certificates = self.storage.get_certificates_for_round(certificate_round);
 764          // Construct a set over the authors, at the round just after the leader,
 765          // who included the leader's certificate in their previous certificate IDs.
 766          let authors = certificates
 767              .iter()
 768              .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
 769                  true => Some(c.author()),
 770                  false => None,
 771              })
 772              .collect();
 773  
 774          // Check if the leader is ready to be committed.
 775          if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
 776              trace!(
 777                  "Block {hash} at height {height} has reached availability threshold",
 778                  hash = block.hash(),
 779                  height = block.height()
 780              );
 781              Ok(true)
 782          } else {
 783              Ok(false)
 784          }
 785      }
 786  
 787      /// Advances the ledger by the given block and updates the storage accordingly.
 788      ///
 789      /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
 790      /// meets the voter availability threshold (i.e. > f voting stake)
 791      /// or is reachable via a DAG path from a later leader certificate that does.
 792      /// Since performing this check requires DAG certificates from later blocks,
 793      /// the block is stored in `Sync::pending_blocks`,
 794      /// and its addition to the ledger is deferred until the check passes.
 795      /// Several blocks may be stored in `Sync::pending_blocks`
 796      /// before they can be all checked and added to the ledger.
 797      ///
 798      /// # Usage
 799      /// This function assumes that blocks are passed in order, i.e.,
 800      /// that the given block is a direct successor of the block that was last passed to this function.
 801      async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> {
 802          // Acquire the sync lock.
 803          let _lock = self.sync_lock.lock().await;
 804          let new_block_height = new_block.height();
 805  
 806          // If this block has already been processed, return early.
 807          // TODO(kaimast): Should we remove the response here?
 808          if self.ledger.contains_block_height(new_block.height()) {
 809              debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
 810              return Ok(());
 811          }
 812  
 813          // Acquire the pending blocks lock.
 814          let mut pending_blocks = self.pending_blocks.lock().await;
 815  
 816          // Append the certificates to the storage.
 817          self.add_block_subdag_to_bft(&new_block).await?;
 818  
 819          // Fetch the latest block height.
 820          let ledger_block_height = self.ledger.latest_block_height();
 821  
 822          // First, clear any older pending blocks.
 823          // TODO(kaimast): ensure there are no dangling block requests
 824          while let Some(pending_block) = pending_blocks.front() {
 825              if pending_block.height() > ledger_block_height {
 826                  break;
 827              }
 828  
 829              pending_blocks.pop_front();
 830          }
 831  
 832          if let Some(tail) = pending_blocks.back() {
 833              if tail.height() >= new_block.height() {
 834                  debug!(
 835                      "A unconfirmed block is queued already for height {height}. \
 836                      Will not sync.",
 837                      height = new_block.height()
 838                  );
 839                  return Ok(());
 840              }
 841  
 842              ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
 843          }
 844  
 845          // Fetch the latest block height.
 846          let ledger_block_height = self.ledger.latest_block_height();
 847  
 848          // Clear any older pending blocks.
 849          // TODO(kaimast): ensure there are no dangling block requests
 850          while let Some(pending_block) = pending_blocks.front() {
 851              if pending_block.height() > ledger_block_height {
 852                  break;
 853              }
 854  
 855              trace!(
 856                  "Pending block {hash} at height {height} became obsolete",
 857                  hash = pending_block.hash(),
 858                  height = pending_block.height()
 859              );
 860              pending_blocks.pop_front();
 861          }
 862  
 863          // Check the block against the chain of pending blocks and append it on success.
 864          let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) {
 865              Ok(new_block) => new_block,
 866              Err(err) => {
 867                  // TODO(kaimast): this shoud not return an error on the snarkVM side.
 868                  if err.to_string().contains("already in the ledger") {
 869                      debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
 870  
 871                      return Ok(());
 872                  } else {
 873                      return Err(err);
 874                  }
 875              }
 876          };
 877  
 878          trace!(
 879              "Adding new pending block {hash} at height {height}",
 880              hash = new_block.hash(),
 881              height = new_block.height()
 882          );
 883          pending_blocks.push_back(new_block);
 884  
 885          // Now, figure out if and which pending block we can commit.
 886          // To do this effectively and because commits are transitive,
 887          // we iterate in reverse so that we can stop at the first successful check.
 888          //
 889          // Note, that if the storage already contains certificates for the round after new block,
 890          // the availability threshold for the new block could also be reached.
 891          let mut commit_height = None;
 892          for block in pending_blocks.iter().rev() {
 893              // This check assumes that the pending blocks are properly linked together, based on the fact that,
 894              // to generate the sequence of `PendingBlocks`, each block needs to successfully be processed by `Ledger::check_block_subdag`.
 895              // As a result, the safety of this piece of code relies on the correctness `Ledger::check_block_subdag`,
 896              // which is tested in `snarkvm/ledger/tests/pending_block.rs`.
 897              if self
 898                  .is_block_availability_threshold_reached(block)
 899                  .with_context(|| "Availability threshold check failed")?
 900              {
 901                  commit_height = Some(block.height());
 902                  break;
 903              }
 904          }
 905  
 906          if let Some(commit_height) = commit_height {
 907              let start_height = ledger_block_height + 1;
 908              ensure!(commit_height >= start_height, "Invalid commit height");
 909              let num_blocks = (commit_height - start_height + 1) as usize;
 910  
 911              // Create a more detailed log message if we are committing more than one block at a time.
 912              if num_blocks > 1 {
 913                  trace!(
 914                      "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
 915                      chain_length = pending_blocks.len(),
 916                  );
 917              }
 918  
 919              for pending_block in pending_blocks.drain(0..num_blocks) {
 920                  let hash = pending_block.hash();
 921                  let height = pending_block.height();
 922                  let ledger = self.ledger.clone();
 923                  let storage = self.storage.clone();
 924  
 925                  spawn_blocking!({
 926                      let block = ledger.check_block_content(pending_block).with_context(|| {
 927                          format!("Failed to check contents of pending block {hash} at height {height}")
 928                      })?;
 929  
 930                      trace!("Adding pending block {hash} at height {height} to the ledger");
 931                      ledger.advance_to_next_block(&block)?;
 932                      // Sync the height with the block.
 933                      storage.sync_height_with_block(block.height());
 934                      // Sync the round with the block.
 935                      storage.sync_round_with_block(block.round());
 936  
 937                      Ok(())
 938                  })?
 939              }
 940          } else {
 941              trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
 942          }
 943  
 944          Ok(())
 945      }
 946  }
 947  
 948  // Methods to assist with the block sync module.
 949  impl<N: Network> Sync<N> {
 950      /// Returns `true` if the node is synced and has connected peers.
 951      pub fn is_synced(&self) -> bool {
 952          // Ensure the validator is connected to other validators,
 953          // not just clients.
 954          if self.gateway.number_of_connected_peers() == 0 {
 955              return false;
 956          }
 957  
 958          self.block_sync.is_block_synced()
 959      }
 960  
 961      /// Returns the number of blocks the node is behind the greatest peer height.
 962      pub fn num_blocks_behind(&self) -> Option<u32> {
 963          self.block_sync.num_blocks_behind()
 964      }
 965  
 966      /// Returns the current block locators of the node.
 967      pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
 968          self.block_sync.get_block_locators()
 969      }
 970  }
 971  
 972  // Methods to assist with fetching batch certificates from peers.
 973  impl<N: Network> Sync<N> {
 974      /// Sends a certificate request to the specified peer.
 975      pub async fn send_certificate_request(
 976          &self,
 977          peer_ip: SocketAddr,
 978          certificate_id: Field<N>,
 979      ) -> Result<BatchCertificate<N>> {
 980          // Initialize a oneshot channel.
 981          let (callback_sender, callback_receiver) = oneshot::channel();
 982          // Determine how many sent requests are pending.
 983          let num_sent_requests = self.pending.num_sent_requests(certificate_id);
 984          // Determine if we've already sent a request to the peer.
 985          let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
 986          // Determine the maximum number of redundant requests.
 987          let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
 988          // Determine if we should send a certificate request to the peer.
 989          // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
 990          let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
 991  
 992          // Insert the certificate ID into the pending queue.
 993          self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
 994  
 995          // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
 996          if should_send_request {
 997              // Send the certificate request to the peer.
 998              if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
 999                  bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1000              }
1001          } else {
1002              debug!(
1003                  "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1004                  fmt_id(certificate_id)
1005              );
1006          }
1007          // Wait for the certificate to be fetched.
1008          // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
1009          tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
1010              .await
1011              .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1012              .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1013      }
1014  
1015      /// Handles the incoming certificate request.
1016      fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1017          // Attempt to retrieve the certificate.
1018          if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1019              // Send the certificate response to the peer.
1020              let self_ = self.clone();
1021              tokio::spawn(async move {
1022                  let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1023              });
1024          }
1025      }
1026  
1027      /// Handles the incoming certificate response.
1028      /// This method ensures the certificate response is well-formed and matches the certificate ID.
1029      fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1030          let certificate = response.certificate;
1031          // Check if the peer IP exists in the pending queue for the given certificate ID.
1032          let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1033          // If the peer IP exists, finish the pending request.
1034          if exists {
1035              // TODO: Validate the certificate.
1036              // Remove the certificate ID from the pending queue.
1037              self.pending.remove(certificate.id(), Some(certificate));
1038          }
1039      }
1040  }
1041  
1042  impl<N: Network> Sync<N> {
1043      /// Spawns a task with the given future; it should only be used for long-running tasks.
1044      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1045          self.handles.lock().push(tokio::spawn(future));
1046      }
1047  
1048      /// Shuts down the primary.
1049      pub async fn shut_down(&self) {
1050          info!("Shutting down the sync module...");
1051          // Acquire the response lock.
1052          let _lock = self.response_lock.lock().await;
1053          // Acquire the sync lock.
1054          let _lock = self.sync_lock.lock().await;
1055          // Abort the tasks.
1056          self.handles.lock().iter().for_each(|handle| handle.abort());
1057      }
1058  }
1059  
1060  #[cfg(test)]
1061  mod tests {
1062      use super::*;
1063  
1064      use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1065  
1066      use alphaos_account::Account;
1067      use alphaos_node_sync::BlockSync;
1068      use alphaos_utilities::SimpleStoppable;
1069      use alphavm::{
1070          console::{
1071              account::{Address, PrivateKey},
1072              network::MainnetV0,
1073          },
1074          ledger::{
1075              narwhal::{BatchCertificate, BatchHeader, Subdag},
1076              store::{ConsensusStore, helpers::memory::ConsensusMemory},
1077          },
1078          prelude::{Ledger, VM},
1079          utilities::TestRng,
1080      };
1081  
1082      use alpha_std::StorageMode;
1083      use indexmap::IndexSet;
1084      use rand::Rng;
1085      use std::collections::BTreeMap;
1086  
1087      type CurrentNetwork = MainnetV0;
1088      type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1089      type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1090  
1091      /// Tests that commits work as expected when some anchors are not committed immediately.
1092      #[tokio::test]
1093      #[tracing_test::traced_test]
1094      async fn test_commit_chain() -> anyhow::Result<()> {
1095          let rng = &mut TestRng::default();
1096          // Initialize the round parameters.
1097          let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1098  
1099          // The first round of the first block.
1100          let first_round = 1;
1101          // The total number of blocks we test
1102          let num_blocks = 3;
1103          // The number of certificate rounds needed.
1104          // There is one additional round to provide availability for the inal block.
1105          let num_rounds = first_round + num_blocks * 2 + 1;
1106          // The first round that has at least N-f certificates referencing the anchor from the previous round.
1107          // This is also the last round we use in the test.
1108          let first_committed_round = num_rounds - 1;
1109  
1110          // Initialize the store.
1111          let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1112          let account: Account<CurrentNetwork> = Account::new(rng)?;
1113  
1114          // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1115          let seed: u64 = rng.r#gen();
1116          let vm = VM::from(store).unwrap();
1117          let genesis_pk = *account.private_key();
1118          let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1119  
1120          // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1121          let genesis_rng = &mut TestRng::from_seed(seed);
1122          let private_keys = [
1123              *account.private_key(),
1124              PrivateKey::new(genesis_rng)?,
1125              PrivateKey::new(genesis_rng)?,
1126              PrivateKey::new(genesis_rng)?,
1127          ];
1128  
1129          // Initialize the ledger with the genesis block.
1130          let genesis_clone = genesis.clone();
1131          let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1132          // Initialize the ledger.
1133          let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1134  
1135          // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1136          let (round_to_certificates_map, committee) = {
1137              let addresses = vec![
1138                  Address::try_from(private_keys[0])?,
1139                  Address::try_from(private_keys[1])?,
1140                  Address::try_from(private_keys[2])?,
1141                  Address::try_from(private_keys[3])?,
1142              ];
1143  
1144              let committee = ledger.latest_committee().unwrap();
1145  
1146              // Initialize a mapping from the round number to the set of batch certificates in the round.
1147              let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1148                  HashMap::new();
1149              let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1150  
1151              for round in first_round..=first_committed_round {
1152                  let mut current_certificates = IndexSet::new();
1153                  let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1154                      IndexSet::new()
1155                  } else {
1156                      previous_certificates.iter().map(|c| c.id()).collect()
1157                  };
1158  
1159                  let committee_id = committee.id();
1160                  let prev_leader = committee.get_leader(round - 1).unwrap();
1161  
1162                  // For the first two blocks non-leaders will not reference the leader certificate.
1163                  // This means, while there is an anchor, it is isn't committed
1164                  // until later.
1165                  for (i, private_key) in private_keys.iter().enumerate() {
1166                      let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap();
1167                      let is_certificate_round = round % 2 == 1;
1168                      let is_leader = i == leader_index;
1169  
1170                      let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader {
1171                          previous_certificate_ids
1172                              .iter()
1173                              .cloned()
1174                              .enumerate()
1175                              .filter(|(idx, _)| *idx != leader_index)
1176                              .map(|(_, id)| id)
1177                              .collect()
1178                      } else {
1179                          previous_certificate_ids.clone()
1180                      };
1181  
1182                      let batch_header = BatchHeader::new(
1183                          private_key,
1184                          round,
1185                          now(),
1186                          committee_id,
1187                          Default::default(),
1188                          previous_certs,
1189                          rng,
1190                      )
1191                      .unwrap();
1192  
1193                      // Sign the batch header.
1194                      let mut signatures = IndexSet::with_capacity(4);
1195                      for (j, private_key_2) in private_keys.iter().enumerate() {
1196                          if i != j {
1197                              signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1198                          }
1199                      }
1200                      current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1201                  }
1202  
1203                  // Update the map of certificates.
1204                  round_to_certificates_map.insert(round, current_certificates.clone());
1205                  previous_certificates = current_certificates;
1206              }
1207              (round_to_certificates_map, committee)
1208          };
1209  
1210          // Initialize the storage.
1211          let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1212          // Insert all certificates into storage.
1213          let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1214          for i in first_round..=first_committed_round {
1215              let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1216              certificates.extend(c);
1217          }
1218          for certificate in certificates.clone().iter() {
1219              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1220          }
1221  
1222          // Create the blocks
1223          let mut previous_leader_cert = None;
1224          let mut blocks = vec![];
1225  
1226          for block_height in 1..=num_blocks {
1227              let leader_round = block_height * 2;
1228  
1229              let leader = committee.get_leader(leader_round).unwrap();
1230              let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1231  
1232              let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1233              let mut leader_cert_map = IndexSet::new();
1234              leader_cert_map.insert(leader_certificate.clone());
1235  
1236              let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1237  
1238              subdag_map.insert(leader_round, leader_cert_map.clone());
1239              subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1240  
1241              if leader_round > 2 {
1242                  let previous_commit_cert_map: IndexSet<_> = storage
1243                      .get_certificates_for_round(leader_round - 2)
1244                      .into_iter()
1245                      .filter(|cert| {
1246                          if let Some(previous_leader_cert) = &previous_leader_cert {
1247                              cert != previous_leader_cert
1248                          } else {
1249                              true
1250                          }
1251                      })
1252                      .collect();
1253                  subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1254              }
1255  
1256              let subdag = Subdag::from(subdag_map.clone())?;
1257              previous_leader_cert = Some(leader_certificate);
1258  
1259              let core_ledger = core_ledger.clone();
1260              let block = spawn_blocking!({
1261                  let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1262                  core_ledger.advance_to_next_block(&block)?;
1263                  Ok(block)
1264              })?;
1265  
1266              blocks.push(block);
1267          }
1268  
1269          // ### Test that sync works as expected ###
1270          let storage_mode = StorageMode::new_test(None);
1271  
1272          // Create a new ledger to test with, but use the existing storage
1273          // so that the certificates exist.
1274          let syncing_ledger = {
1275              let storage_mode = storage_mode.clone();
1276              Arc::new(CoreLedgerService::new(
1277                  spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1278                  SimpleStoppable::new(),
1279              ))
1280          };
1281  
1282          // Set up sync and its dependencies.
1283          let gateway = Gateway::new(
1284              account.clone(),
1285              storage.clone(),
1286              syncing_ledger.clone(),
1287              None,
1288              &[],
1289              false,
1290              StorageMode::new_test(None),
1291              None,
1292          )?;
1293          let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1294          let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1295  
1296          let mut block_iter = blocks.into_iter();
1297  
1298          // Insert the blocks into the new sync module
1299          for _ in 0..num_blocks - 1 {
1300              let block = block_iter.next().unwrap();
1301              sync.sync_storage_with_block(block).await?;
1302  
1303              // Availability threshold is not met, so we should not advance yet.
1304              assert_eq!(syncing_ledger.latest_block_height(), 0);
1305          }
1306  
1307          // Only for the final block, the availability threshold is met,
1308          // because certificates for the subsequent round are already in storage.
1309          sync.sync_storage_with_block(block_iter.next().unwrap()).await?;
1310          assert_eq!(syncing_ledger.latest_block_height(), 3);
1311  
1312          // Ensure blocks 1 and 2 were added to the ledger.
1313          assert!(syncing_ledger.contains_block_height(1));
1314          assert!(syncing_ledger.contains_block_height(2));
1315  
1316          Ok(())
1317      }
1318  
1319      #[tokio::test]
1320      #[tracing_test::traced_test]
1321      async fn test_pending_certificates() -> anyhow::Result<()> {
1322          let rng = &mut TestRng::default();
1323          // Initialize the round parameters.
1324          let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1325          let commit_round = 2;
1326  
1327          // Initialize the store.
1328          let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1329          let account: Account<CurrentNetwork> = Account::new(rng)?;
1330  
1331          // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1332          let seed: u64 = rng.r#gen();
1333          let vm = VM::from(store).unwrap();
1334          let genesis_pk = *account.private_key();
1335          let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1336  
1337          // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1338          let genesis_rng = &mut TestRng::from_seed(seed);
1339          let private_keys = [
1340              *account.private_key(),
1341              PrivateKey::new(genesis_rng)?,
1342              PrivateKey::new(genesis_rng)?,
1343              PrivateKey::new(genesis_rng)?,
1344          ];
1345          // Initialize the ledger with the genesis block.
1346          let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1347          // Initialize the ledger.
1348          let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1349          // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1350          let (round_to_certificates_map, committee) = {
1351              // Initialize the committee.
1352              let committee = core_ledger.current_committee().unwrap();
1353              // Initialize a mapping from the round number to the set of batch certificates in the round.
1354              let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1355                  HashMap::new();
1356              let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1357  
1358              for round in 0..=commit_round + 8 {
1359                  let mut current_certificates = IndexSet::new();
1360                  let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1361                      IndexSet::new()
1362                  } else {
1363                      previous_certificates.iter().map(|c| c.id()).collect()
1364                  };
1365                  let committee_id = committee.id();
1366                  // Create a certificate for each validator.
1367                  for (i, private_key_1) in private_keys.iter().enumerate() {
1368                      let batch_header = BatchHeader::new(
1369                          private_key_1,
1370                          round,
1371                          now(),
1372                          committee_id,
1373                          Default::default(),
1374                          previous_certificate_ids.clone(),
1375                          rng,
1376                      )
1377                      .unwrap();
1378                      // Sign the batch header.
1379                      let mut signatures = IndexSet::with_capacity(4);
1380                      for (j, private_key_2) in private_keys.iter().enumerate() {
1381                          if i != j {
1382                              signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1383                          }
1384                      }
1385                      current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1386                  }
1387  
1388                  // Update the map of certificates.
1389                  round_to_certificates_map.insert(round, current_certificates.clone());
1390                  previous_certificates = current_certificates.clone();
1391              }
1392              (round_to_certificates_map, committee)
1393          };
1394  
1395          // Initialize the storage.
1396          let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1397          // Insert certificates into storage.
1398          let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1399          for i in 1..=commit_round + 8 {
1400              let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1401              certificates.extend(c);
1402          }
1403          for certificate in certificates.clone().iter() {
1404              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1405          }
1406          // Create block 1.
1407          let leader_round_1 = commit_round;
1408          let leader_1 = committee.get_leader(leader_round_1).unwrap();
1409          let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1410          let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1411          let block_1 = {
1412              let mut leader_cert_map = IndexSet::new();
1413              leader_cert_map.insert(leader_certificate.clone());
1414              let mut previous_cert_map = IndexSet::new();
1415              for cert in storage.get_certificates_for_round(commit_round - 1) {
1416                  previous_cert_map.insert(cert);
1417              }
1418              subdag_map.insert(commit_round, leader_cert_map.clone());
1419              subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1420              let subdag = Subdag::from(subdag_map.clone())?;
1421              let ledger = core_ledger.clone();
1422              spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))?
1423          };
1424          // Insert block 1.
1425          let ledger = core_ledger.clone();
1426          let block = block_1.clone();
1427          spawn_blocking!(ledger.advance_to_next_block(&block))?;
1428  
1429          // Create block 2.
1430          let leader_round_2 = commit_round + 2;
1431          let leader_2 = committee.get_leader(leader_round_2).unwrap();
1432          let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1433          let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1434          let block_2 = {
1435              let mut leader_cert_map_2 = IndexSet::new();
1436              leader_cert_map_2.insert(leader_certificate_2.clone());
1437              let mut previous_cert_map_2 = IndexSet::new();
1438              for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1439                  previous_cert_map_2.insert(cert);
1440              }
1441              subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1442              subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1443              let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1444              let ledger = core_ledger.clone();
1445              spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))?
1446          };
1447          // Insert block 2.
1448          let ledger = core_ledger.clone();
1449          let block = block_2.clone();
1450          spawn_blocking!(ledger.advance_to_next_block(&block))?;
1451  
1452          // Create block 3
1453          let leader_round_3 = commit_round + 4;
1454          let leader_3 = committee.get_leader(leader_round_3).unwrap();
1455          let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1456          let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1457          let block_3 = {
1458              let mut leader_cert_map_3 = IndexSet::new();
1459              leader_cert_map_3.insert(leader_certificate_3.clone());
1460              let mut previous_cert_map_3 = IndexSet::new();
1461              for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1462                  previous_cert_map_3.insert(cert);
1463              }
1464              subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1465              subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1466              let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1467              let ledger = core_ledger.clone();
1468              spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))?
1469          };
1470          // Insert block 3.
1471          let ledger = core_ledger.clone();
1472          let block = block_3.clone();
1473          spawn_blocking!(ledger.advance_to_next_block(&block))?;
1474  
1475          /*
1476              Check that the pending certificates are computed correctly.
1477          */
1478  
1479          // Retrieve the pending certificates.
1480          let pending_certificates = storage.get_pending_certificates();
1481          // Check that all of the pending certificates are not contained in the ledger.
1482          for certificate in pending_certificates.clone() {
1483              assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1484          }
1485          // Initialize an empty set to be populated with the committed certificates in the block subdags.
1486          let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1487          {
1488              let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1489              for subdag in subdag_maps.iter() {
1490                  for subdag_certificates in subdag.values() {
1491                      committed_certificates.extend(subdag_certificates.iter().cloned());
1492                  }
1493              }
1494          };
1495          // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1496          let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1497          for certificate in certificates.clone() {
1498              if !committed_certificates.contains(&certificate) {
1499                  candidate_pending_certificates.insert(certificate);
1500              }
1501          }
1502          // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1503          assert_eq!(pending_certificates, candidate_pending_certificates);
1504  
1505          Ok(())
1506      }
1507  }