/ node / bft / src / sync / mod.rs
mod.rs
   1  // Copyright (c) 2025-2026 ACDC Network
   2  // This file is part of the alphaos library.
   3  //
   4  // Alpha Chain | Delta Chain Protocol
   5  // International Monetary Graphite.
   6  //
   7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
   8  // They built world-class ZK infrastructure. We installed the EASY button.
   9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
  10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
  11  //
  12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
  13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
  14  // No rights reserved. No permission required. No warranty. No refunds.
  15  //
  16  // https://creativecommons.org/publicdomain/zero/1.0/
  17  // SPDX-License-Identifier: CC0-1.0
  18  
  19  use crate::{
  20      events::DataBlocks,
  21      helpers::{fmt_id, max_redundant_requests, BFTSender, Pending, Storage, SyncReceiver},
  22      spawn_blocking,
  23      Gateway,
  24      Transport,
  25      MAX_FETCH_TIMEOUT_IN_MS,
  26  };
  27  use alphaos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
  28  use alphaos_node_bft_ledger_service::LedgerService;
  29  use alphaos_node_network::PeerPoolHandling;
  30  use alphaos_node_sync::{locators::BlockLocators, BlockSync, Ping, PrepareSyncRequest, BLOCK_REQUEST_BATCH_DELAY};
  31  
  32  use alphavm::{
  33      console::{
  34          network::{ConsensusVersion, Network},
  35          types::Field,
  36      },
  37      ledger::{authority::Authority, block::Block, narwhal::BatchCertificate, PendingBlock},
  38      utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
  39  };
  40  
  41  use anyhow::{anyhow, bail, ensure, Context, Result};
  42  use indexmap::IndexMap;
  43  #[cfg(feature = "locktick")]
  44  use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex};
  45  #[cfg(not(feature = "locktick"))]
  46  use parking_lot::Mutex;
  47  #[cfg(not(feature = "serial"))]
  48  use rayon::prelude::*;
  49  use std::{
  50      collections::{HashMap, VecDeque},
  51      future::Future,
  52      net::SocketAddr,
  53      sync::Arc,
  54      time::Duration,
  55  };
  56  #[cfg(not(feature = "locktick"))]
  57  use tokio::sync::Mutex as TMutex;
  58  use tokio::{
  59      sync::{oneshot, OnceCell},
  60      task::JoinHandle,
  61  };
  62  
  63  /// Block synchronization logic for validators.
  64  ///
  65  /// Synchronization works differently for nodes that act as validators in AlphaBFT;
  66  /// In the common case, validators generate blocks after receiving an anchor block that has been accepted
  67  /// by a supermajority of the committee instead of fetching entire blocks from other nodes.
  68  /// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes.
  69  ///
  70  /// This struct also manages fetching certificates from other validators during normal operation,
  71  /// and blocks when falling behind.
  72  ///
  73  /// Finally, `Sync` handles synchronization of blocks with the validator's local storage:
  74  /// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them.
  75  #[derive(Clone)]
  76  pub struct Sync<N: Network> {
  77      /// The gateway enables communication with other validators.
  78      gateway: Gateway<N>,
  79      /// The storage.
  80      storage: Storage<N>,
  81      /// The ledger service.
  82      ledger: Arc<dyn LedgerService<N>>,
  83      /// The block synchronization logic.
  84      block_sync: Arc<BlockSync<N>>,
  85      /// The pending certificates queue.
  86      pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
  87      /// The BFT sender.
  88      bft_sender: Arc<OnceCell<BFTSender<N>>>,
  89      /// Handles to the spawned background tasks.
  90      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
  91      /// The response lock.
  92      response_lock: Arc<TMutex<()>>,
  93      /// The sync lock. Ensures that only one task syncs the ledger at a time.
  94      sync_lock: Arc<TMutex<()>>,
  95      /// The latest block responses.
  96      ///
  97      /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks
  98      /// whose addition to the ledger is deferred until certain checks pass.
  99      /// Blocks need to be processed in order, hence a BTree map.
 100      ///
 101      /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called.
 102      pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>,
 103  }
 104  
 105  impl<N: Network> Sync<N> {
 106      /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
 107      /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
 108      const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
 109  
 110      /// Initializes a new sync instance.
 111      pub fn new(
 112          gateway: Gateway<N>,
 113          storage: Storage<N>,
 114          ledger: Arc<dyn LedgerService<N>>,
 115          block_sync: Arc<BlockSync<N>>,
 116      ) -> Self {
 117          // Return the sync instance.
 118          Self {
 119              gateway,
 120              storage,
 121              ledger,
 122              block_sync,
 123              pending: Default::default(),
 124              bft_sender: Default::default(),
 125              handles: Default::default(),
 126              response_lock: Default::default(),
 127              sync_lock: Default::default(),
 128              pending_blocks: Default::default(),
 129          }
 130      }
 131  
 132      /// Initializes the sync module and sync the storage with the ledger at bootup.
 133      pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
 134          // If a BFT sender was provided, set it.
 135          if let Some(bft_sender) = bft_sender {
 136              self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
 137          }
 138  
 139          info!("Syncing storage with the ledger...");
 140  
 141          // Sync the storage with the ledger.
 142          self.sync_storage_with_ledger_at_bootup()
 143              .await
 144              .with_context(|| "Syncing storage with the ledger at bootup failed")?;
 145  
 146          debug!("Finished initial block synchronization at startup");
 147          Ok(())
 148      }
 149  
 150      /// Sends the given batch of block requests to peers.
 151      ///
 152      /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
 153      #[inline]
 154      async fn send_block_requests(
 155          &self,
 156  
 157          block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
 158          sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
 159      ) {
 160          trace!("Prepared {num_requests} block requests", num_requests = block_requests.len());
 161  
 162          // Sends the block requests to the sync peers.
 163          for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
 164              if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await {
 165                  // Stop if we fail to process a batch of requests.
 166                  break;
 167              }
 168  
 169              // Sleep to avoid triggering spam detection.
 170              tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
 171          }
 172      }
 173  
 174      /// Starts the sync module.
 175      ///
 176      /// When this function returns successfully, the sync module will have spawned background tasks
 177      /// that fetch blocks from other validators.
 178      pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> {
 179          info!("Starting the sync module...");
 180  
 181          // Start the block request generation loop (outgoing).
 182          let self_ = self.clone();
 183          self.spawn(async move {
 184              loop {
 185                  // Wait for peer updates or timeout
 186                  let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await;
 187  
 188                  // Issue block requests to peers.
 189                  self_.try_issuing_block_requests().await;
 190  
 191                  // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here.
 192              }
 193          });
 194  
 195          // Start the block response processing loop (incoming).
 196          let self_ = self.clone();
 197          let ping = ping.clone();
 198          self.spawn(async move {
 199              loop {
 200                  // Wait until there is something to do or until the timeout.
 201                  let _ =
 202                      tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await;
 203  
 204                  let ping = ping.clone();
 205                  let self_ = self_.clone();
 206                  let hdl = tokio::spawn(async move {
 207                      self_.try_advancing_block_synchronization(&ping).await;
 208                  });
 209  
 210                  if let Err(err) = hdl.await {
 211                      if let Ok(panic) = err.try_into_panic() {
 212                          error!("Sync block advancement panicked: {panic:?}");
 213                      }
 214                  }
 215  
 216                  // We perform no additional rate limiting here as
 217                  // requests are already rate-limited.
 218              }
 219          });
 220  
 221          // Start the pending queue expiration loop.
 222          let self_ = self.clone();
 223          self.spawn(async move {
 224              loop {
 225                  // Sleep briefly.
 226                  tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
 227  
 228                  // Remove the expired pending transmission requests.
 229                  let self__ = self_.clone();
 230                  let _ = spawn_blocking!({
 231                      self__.pending.clear_expired_callbacks();
 232                      Ok(())
 233                  });
 234              }
 235          });
 236  
 237          /* Set up callbacks for events from the Gateway */
 238  
 239          // Retrieve the sync receiver.
 240          let SyncReceiver {
 241              mut rx_block_sync_insert_block_response,
 242              mut rx_block_sync_remove_peer,
 243              mut rx_block_sync_update_peer_locators,
 244              mut rx_certificate_request,
 245              mut rx_certificate_response,
 246          } = sync_receiver;
 247  
 248          // Process the block sync request to advance with sync blocks.
 249          // Each iteration of this loop is triggered by an incoming [`BlockResponse`],
 250          // which is initially handled by [`Gateway::inbound()`],
 251          // which calls [`SyncSender::advance_with_sync_blocks()`],
 252          // which calls [`tx_block_sync_advance_with_sync_blocks.send()`],
 253          // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return.
 254          let self_ = self.clone();
 255          self.spawn(async move {
 256              while let Some((peer_ip, blocks, latest_consensus_version, callback)) =
 257                  rx_block_sync_insert_block_response.recv().await
 258              {
 259                  let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await;
 260                  //TODO remove this once channels are gone
 261                  if let Err(err) = &result {
 262                      warn!("Failed to insret block response: {err:?}");
 263                  }
 264  
 265                  callback.send(result).ok();
 266              }
 267          });
 268  
 269          // Process the block sync request to remove the peer.
 270          let self_ = self.clone();
 271          self.spawn(async move {
 272              while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
 273                  self_.remove_peer(peer_ip);
 274              }
 275          });
 276  
 277          // Process each block sync request to update peer locators.
 278          // Each iteration of this loop is triggered by an incoming [`PrimaryPing`],
 279          // which is initially handled by [`Gateway::inbound()`],
 280          // which calls [`SyncSender::update_peer_locators()`],
 281          // which calls [`tx_block_sync_update_peer_locators.send()`],
 282          // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return.
 283          let self_ = self.clone();
 284          self.spawn(async move {
 285              while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
 286                  let self_clone = self_.clone();
 287                  tokio::spawn(async move {
 288                      callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok();
 289                  });
 290              }
 291          });
 292  
 293          // Process each certificate request.
 294          // Each iteration of this loop is triggered by an incoming [`CertificateRequest`],
 295          // which is initially handled by [`Gateway::inbound()`],
 296          // which calls [`tx_certificate_request.send()`],
 297          // which causes the `rx_certificate_request.recv()` call below to return.
 298          let self_ = self.clone();
 299          self.spawn(async move {
 300              while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
 301                  self_.send_certificate_response(peer_ip, certificate_request);
 302              }
 303          });
 304  
 305          // Process each certificate response.
 306          // Each iteration of this loop is triggered by an incoming [`CertificateResponse`],
 307          // which is initially handled by [`Gateway::inbound()`],
 308          // which calls [`tx_certificate_response.send()`],
 309          // which causes the `rx_certificate_response.recv()` call below to return.
 310          let self_ = self.clone();
 311          self.spawn(async move {
 312              while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
 313                  self_.finish_certificate_request(peer_ip, certificate_response);
 314              }
 315          });
 316  
 317          Ok(())
 318      }
 319  
 320      /// BFT-specific version of `Client::try_issuing_block_requests()`.
 321      ///
 322      /// This method handles timeout removal, checks if block sync is possible,
 323      /// and issues block requests to peers.
 324      async fn try_issuing_block_requests(&self) {
 325          // Update the sync height to the latest ledger height.
 326          // (if the ledger height is lower or equal to the current sync height, this is a noop)
 327          self.block_sync.set_sync_height(self.ledger.latest_block_height());
 328  
 329          // Check if any existing requests can be removed.
 330          // We should do this even if we cannot block sync, to ensure
 331          // there are no dangling block requests.
 332          match self.block_sync.handle_block_request_timeouts(&self.gateway) {
 333              Ok(Some((requests, sync_peers))) => {
 334                  // Re-request blocks instead of performing regular block sync.
 335                  self.send_block_requests(requests, sync_peers).await;
 336                  return;
 337              }
 338              Ok(None) => {}
 339              Err(err) => {
 340                  // Abort and retry later.
 341                  error!("{}", &flatten_error(err));
 342                  return;
 343              }
 344          }
 345  
 346          // Do not attempt to sync if there are no blocks to sync.
 347          // This prevents redundant log messages and performing unnecessary computation.
 348          if !self.block_sync.can_block_sync() {
 349              return;
 350          }
 351  
 352          // Prepare the block requests, if any.
 353          // In the process, we update the state of `is_block_synced` for the sync module.
 354          let (requests, sync_peers) = self.block_sync.prepare_block_requests();
 355  
 356          // If there are no block requests, return early.
 357          if requests.is_empty() {
 358              return;
 359          }
 360  
 361          // Send the block requests to peers.
 362          self.send_block_requests(requests, sync_peers).await;
 363      }
 364  
 365      /// Test-only method to manually trigger block synchronization.
 366      /// This combines both request generation and response processing for testing purposes.
 367      #[cfg(test)]
 368      pub(crate) async fn testing_only_try_block_sync_testing_only(&self) {
 369          // First try issuing block requests
 370          self.try_issuing_block_requests().await;
 371  
 372          // Then try advancing with any available responses
 373          self.try_advancing_block_synchronization(&None).await;
 374      }
 375  }
 376  
 377  // Callbacks used when receiving messages from the Gateway
 378  impl<N: Network> Sync<N> {
 379      /// We received a block response and can (possibly) advance synchronization.
 380      async fn insert_block_response(
 381          &self,
 382          peer_ip: SocketAddr,
 383          blocks: Vec<Block<N>>,
 384          latest_consensus_version: Option<ConsensusVersion>,
 385      ) -> Result<()> {
 386          // Verify that the response is valid and add it to block sync.
 387          self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
 388  
 389          // No need to advance block sync here, as the new response will
 390          // notify the incoming task.
 391      }
 392  
 393      /// We received new peer locators during a Ping.
 394      fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> {
 395          self.block_sync.update_peer_locators(peer_ip, &locators)
 396      }
 397  
 398      /// A peer disconnected.
 399      fn remove_peer(&self, peer_ip: SocketAddr) {
 400          self.block_sync.remove_peer(&peer_ip);
 401      }
 402  
 403      #[cfg(test)]
 404      pub fn testing_only_update_peer_locators_testing_only(
 405          &self,
 406          peer_ip: SocketAddr,
 407          locators: BlockLocators<N>,
 408      ) -> Result<()> {
 409          self.update_peer_locators(peer_ip, locators)
 410      }
 411  }
 412  
 413  // Methods to manage storage.
 414  impl<N: Network> Sync<N> {
 415      /// Syncs the storage with the ledger at bootup.
 416      ///
 417      /// This is called when starting the validator and after finishing a sync without BFT.
 418      async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
 419          // Retrieve the latest block in the ledger.
 420          let latest_block = self.ledger.latest_block();
 421  
 422          // Retrieve the block height.
 423          let block_height = latest_block.height();
 424          // Determine the maximum number of blocks corresponding to rounds
 425          // that would not have been garbage collected, i.e. that would be kept in storage.
 426          // Since at most one block is created every two rounds,
 427          // this is half of the maximum number of rounds kept in storage.
 428          let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
 429          // Determine the earliest height of blocks corresponding to rounds kept in storage,
 430          // conservatively set to the block height minus the maximum number of blocks calculated above.
 431          // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
 432          let gc_height = block_height.saturating_sub(max_gc_blocks);
 433          // Retrieve the blocks.
 434          let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
 435  
 436          // Acquire the sync lock.
 437          let _lock = self.sync_lock.lock().await;
 438  
 439          debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
 440  
 441          /* Sync storage */
 442  
 443          // Sync the height with the block.
 444          self.storage.sync_height_with_block(latest_block.height());
 445          // Sync the round with the block.
 446          self.storage.sync_round_with_block(latest_block.round());
 447          // Perform GC on the latest block round.
 448          self.storage.garbage_collect_certificates(latest_block.round());
 449          // Iterate over the blocks.
 450          for block in &blocks {
 451              // If the block authority is a sub-DAG, then sync the batch certificates with the block.
 452              // Note that the block authority is always a sub-DAG in production;
 453              // beacon signatures are only used for testing,
 454              // and as placeholder (irrelevant) block authority in the genesis block.
 455              if let Authority::Quorum(subdag) = block.authority() {
 456                  // Reconstruct the unconfirmed transactions.
 457                  let unconfirmed_transactions = cfg_iter!(block.transactions())
 458                      .filter_map(|tx| {
 459                          tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
 460                      })
 461                      .collect::<HashMap<_, _>>();
 462  
 463                  // Iterate over the certificates.
 464                  for certificates in subdag.values().cloned() {
 465                      cfg_into_iter!(certificates).for_each(|certificate| {
 466                          self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
 467                      });
 468                  }
 469  
 470                  // Update the validator telemetry.
 471                  #[cfg(feature = "telemetry")]
 472                  self.gateway.validator_telemetry().insert_subdag(subdag);
 473              }
 474          }
 475  
 476          /* Sync the BFT DAG */
 477  
 478          // Construct a list of the certificates.
 479          let certificates = blocks
 480              .iter()
 481              .flat_map(|block| {
 482                  match block.authority() {
 483                      // If the block authority is a beacon, then skip the block.
 484                      Authority::Beacon(_) => None,
 485                      // If the block authority is a subdag, then retrieve the certificates.
 486                      Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
 487                  }
 488              })
 489              .flatten()
 490              .collect::<Vec<_>>();
 491  
 492          // If a BFT sender was provided, send the certificates to the BFT.
 493          if let Some(bft_sender) = self.bft_sender.get() {
 494              // Await the callback to continue.
 495              bft_sender
 496                  .tx_sync_bft_dag_at_bootup
 497                  .send(certificates)
 498                  .await
 499                  .with_context(|| "Failed to update the BFT DAG from sync")?;
 500          }
 501  
 502          self.block_sync.set_sync_height(block_height);
 503  
 504          Ok(())
 505      }
 506  
 507      /// Returns which height we are synchronized to.
 508      /// If there are queued block responses, this might be higher than the latest block in the ledger.
 509      async fn compute_sync_height(&self) -> u32 {
 510          let ledger_height = self.ledger.latest_block_height();
 511          let mut pending_blocks = self.pending_blocks.lock().await;
 512  
 513          // Remove any old responses.
 514          while let Some(b) = pending_blocks.front() {
 515              if b.height() <= ledger_height {
 516                  pending_blocks.pop_front();
 517              } else {
 518                  break;
 519              }
 520          }
 521  
 522          // Ensure the returned value is always greater or equal than ledger height.
 523          pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height)
 524      }
 525  
 526      /// BFT-version of [`alphaos_node_client::Client::try_advancing_block_synchronization`].
 527      async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) {
 528          // Process block responses and advance the ledger.
 529          let new_blocks = match self
 530              .try_advancing_block_synchronization_inner()
 531              .await
 532              .with_context(|| "Block synchronization failed")
 533          {
 534              Ok(new_blocks) => new_blocks,
 535              Err(err) => {
 536                  error!("{}", &flatten_error(err));
 537                  false
 538              }
 539          };
 540  
 541          if let Some(ping) = &ping {
 542              if new_blocks {
 543                  match self.get_block_locators() {
 544                      Ok(locators) => ping.update_block_locators(locators),
 545                      Err(err) => error!("Failed to update block locators: {err}"),
 546                  }
 547              }
 548          }
 549      }
 550  
 551      /// Aims to advance synchronization using any recent block responses received from peers.
 552      ///
 553      /// This is the validator's version of `BlockSync::try_advancing_block_synchronization`
 554      /// and is called periodically at runtime.
 555      ///
 556      /// This returns Ok(true) if we successfully advanced the ledger by at least one new block.
 557      ///
 558      /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
 559      /// If blocks are not confirmed yet, they will be kept in [`Self::pending_blocks`].
 560      /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
 561      /// (see [`Self::sync_storage_with_block`] for more details).
 562      ///
 563      /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
 564      /// which syncs without updating the BFT state.
 565      async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> {
 566          // Acquire the response lock.
 567          let _lock = self.response_lock.lock().await;
 568  
 569          // For sanity, set the sync height again.
 570          // (if the sync height is already larger or equal, this is a noop)
 571          let ledger_height = self.ledger.latest_block_height();
 572          self.block_sync.set_sync_height(ledger_height);
 573  
 574          // Retrieve the maximum block height of the peers.
 575          let tip = self
 576              .block_sync
 577              .find_sync_peers()
 578              .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0))
 579              .unwrap_or(0);
 580  
 581          // Determine the maximum number of blocks corresponding to rounds
 582          // that would not have been garbage collected, i.e. that would be kept in storage.
 583          // Since at most one block is created every two rounds,
 584          // this is half of the maximum number of rounds kept in storage.
 585          let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
 586  
 587          // Updates sync state and returns the error (if any).
 588          let cleanup = |start_height, current_height, error| {
 589              let new_blocks = current_height > start_height;
 590  
 591              // Make the underlying `BlockSync` instance aware of the new sync height.
 592              if new_blocks {
 593                  self.block_sync.set_sync_height(current_height);
 594              }
 595  
 596              if let Some(err) = error {
 597                  Err(err)
 598              } else {
 599                  Ok(new_blocks)
 600              }
 601          };
 602  
 603          // Determine the earliest height of blocks corresponding to rounds kept in storage,
 604          // conservatively set to the block height minus the maximum number of blocks calculated above.
 605          // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
 606          let max_gc_height = tip.saturating_sub(max_gc_blocks);
 607          let within_gc = (ledger_height + 1) > max_gc_height;
 608  
 609          if within_gc {
 610              // Retrieve the current height, based on the ledger height and the
 611              // (unconfirmed) blocks that are already queued up.
 612              let start_height = self.compute_sync_height().await;
 613  
 614              // The height is incremented as blocks are added.
 615              let mut current_height = start_height;
 616              trace!(
 617                  "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})",
 618                  self.block_sync.get_sync_speed()
 619              );
 620  
 621              // If we already were within GC or successfully caught up with GC, try to advance BFT normally again.
 622              loop {
 623                  let next_height = current_height + 1;
 624                  let Some(block) = self.block_sync.peek_next_block(next_height) else {
 625                      break;
 626                  };
 627                  info!("Syncing the BFT to block {}...", block.height());
 628                  // Sync the storage with the block.
 629                  match self.sync_storage_with_block(block).await {
 630                      Ok(_) => {
 631                          // Update the current height if sync succeeds.
 632                          current_height = next_height;
 633                      }
 634                      Err(err) => {
 635                          // Mark the current height as processed in block_sync.
 636                          self.block_sync.remove_block_response(next_height);
 637                          return cleanup(start_height, current_height, Some(err));
 638                      }
 639                  }
 640              }
 641  
 642              cleanup(start_height, current_height, None)
 643          } else {
 644              // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately
 645              // added to it and not queue up in `latest_block_responses`.
 646              let start_height = ledger_height;
 647              let mut current_height = start_height;
 648  
 649              trace!("Try advancing block responses without BFT (starting at block {current_height})");
 650  
 651              // Try to advance the ledger *to tip* without updating the BFT.
 652              // TODO(kaimast): why to tip and not to tip-GC?
 653              loop {
 654                  let next_height = current_height + 1;
 655  
 656                  let Some(block) = self.block_sync.peek_next_block(next_height) else {
 657                      break;
 658                  };
 659                  info!("Syncing the ledger to block {}...", block.height());
 660  
 661                  // Sync the ledger with the block without BFT.
 662                  match self.sync_ledger_with_block_without_bft(block).await {
 663                      Ok(_) => {
 664                          // Update the current height if sync succeeds.
 665                          current_height = next_height;
 666                          self.block_sync.count_request_completed();
 667                      }
 668                      Err(err) => {
 669                          // Mark the current height as processed in block_sync.
 670                          self.block_sync.remove_block_response(next_height);
 671                          return cleanup(start_height, current_height, Some(err));
 672                      }
 673                  }
 674              }
 675  
 676              // Sync the storage with the ledger if we should transition to the BFT sync.
 677              let within_gc = (current_height + 1) > max_gc_height;
 678              if within_gc {
 679                  info!("Finished catching up with the network. Switching back to BFT sync.");
 680                  self.sync_storage_with_ledger_at_bootup()
 681                      .await
 682                      .with_context(|| "BFT sync (with bootup routine) failed")?;
 683              }
 684  
 685              cleanup(start_height, current_height, None)
 686          }
 687      }
 688  
 689      /// Syncs the ledger with the given block without updating the BFT.
 690      ///
 691      /// This is only used by `[Self::try_advancing_block_synchronization`].
 692      async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
 693          // Acquire the sync lock.
 694          let _lock = self.sync_lock.lock().await;
 695  
 696          let self_ = self.clone();
 697          spawn_blocking!({
 698              // Check the next block.
 699              self_.ledger.check_next_block(&block)?;
 700              // Attempt to advance to the next block.
 701              self_.ledger.advance_to_next_block(&block)?;
 702  
 703              // Sync the height with the block.
 704              self_.storage.sync_height_with_block(block.height());
 705              // Sync the round with the block.
 706              self_.storage.sync_round_with_block(block.round());
 707              // Mark the block height as processed in block_sync.
 708              self_.block_sync.remove_block_response(block.height());
 709  
 710              Ok(())
 711          })
 712      }
 713  
 714      /// Helper function for [`Self::sync_storage_with_block`].
 715      /// It syncs the batch certificates with the BFT, if the block's authority is a sub-DAG.
 716      ///
 717      /// Note that the block authority is always a sub-DAG in production; beacon signatures are only used for testing,
 718      /// and as placeholder (irrelevant) block authority in the genesis block.
 719      async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> {
 720          // Nothing to do if this is a beacon block
 721          let Authority::Quorum(subdag) = block.authority() else {
 722              return Ok(());
 723          };
 724  
 725          // Reconstruct the unconfirmed transactions.
 726          let unconfirmed_transactions = cfg_iter!(block.transactions())
 727              .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok())
 728              .collect::<HashMap<_, _>>();
 729  
 730          // Iterate over the certificates.
 731          for certificates in subdag.values().cloned() {
 732              cfg_into_iter!(certificates.clone()).for_each(|certificate| {
 733                  // Sync the batch certificate with the block.
 734                  self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions);
 735              });
 736  
 737              // Sync the BFT DAG with the certificates.
 738              for certificate in certificates {
 739                  // If a BFT sender was provided, send the certificate to the BFT.
 740                  // For validators, BFT spawns a receiver task in `BFT::start_handlers`.
 741                  if let Some(bft_sender) = self.bft_sender.get() {
 742                      let (callback_tx, callback_rx) = oneshot::channel();
 743                      bft_sender
 744                          .tx_sync_bft
 745                          .send((certificate, callback_tx))
 746                          .await
 747                          .with_context(|| "Failed to sync certificate")?;
 748                      callback_rx.await?.with_context(|| "Failed to sync certificate")?;
 749                  }
 750              }
 751          }
 752          Ok(())
 753      }
 754  
 755      /// Helper function for [`Self::sync_storage_with_block`].
 756      ///
 757      /// It checks that successor of a given block contains enough votes to commit it.
 758      /// This can only return `Ok(true)` if the certificates of the block's successor were added to the storage.
 759      fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> {
 760          // Fetch the leader certificate and the relevant rounds.
 761          let leader_certificate = match block.authority() {
 762              Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
 763              _ => bail!("Received a block with an unexpected authority type."),
 764          };
 765          let commit_round = leader_certificate.round();
 766          let certificate_round =
 767              commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?;
 768  
 769          // Get the committee lookback for the round just after the leader.
 770          let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
 771          // Retrieve all of the certificates for the round just after the leader.
 772          let certificates = self.storage.get_certificates_for_round(certificate_round);
 773          // Construct a set over the authors, at the round just after the leader,
 774          // who included the leader's certificate in their previous certificate IDs.
 775          let authors = certificates
 776              .iter()
 777              .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
 778                  true => Some(c.author()),
 779                  false => None,
 780              })
 781              .collect();
 782  
 783          // Check if the leader is ready to be committed.
 784          if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
 785              trace!(
 786                  "Block {hash} at height {height} has reached availability threshold",
 787                  hash = block.hash(),
 788                  height = block.height()
 789              );
 790              Ok(true)
 791          } else {
 792              Ok(false)
 793          }
 794      }
 795  
 796      /// Advances the ledger by the given block and updates the storage accordingly.
 797      ///
 798      /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
 799      /// meets the voter availability threshold (i.e. > f voting stake)
 800      /// or is reachable via a DAG path from a later leader certificate that does.
 801      /// Since performing this check requires DAG certificates from later blocks,
 802      /// the block is stored in `Sync::pending_blocks`,
 803      /// and its addition to the ledger is deferred until the check passes.
 804      /// Several blocks may be stored in `Sync::pending_blocks`
 805      /// before they can be all checked and added to the ledger.
 806      ///
 807      /// # Usage
 808      /// This function assumes that blocks are passed in order, i.e.,
 809      /// that the given block is a direct successor of the block that was last passed to this function.
 810      async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> {
 811          // Acquire the sync lock.
 812          let _lock = self.sync_lock.lock().await;
 813          let new_block_height = new_block.height();
 814  
 815          // If this block has already been processed, return early.
 816          // TODO(kaimast): Should we remove the response here?
 817          if self.ledger.contains_block_height(new_block.height()) {
 818              debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
 819              return Ok(());
 820          }
 821  
 822          // Acquire the pending blocks lock.
 823          let mut pending_blocks = self.pending_blocks.lock().await;
 824  
 825          // Append the certificates to the storage.
 826          self.add_block_subdag_to_bft(&new_block).await?;
 827  
 828          // Fetch the latest block height.
 829          let ledger_block_height = self.ledger.latest_block_height();
 830  
 831          // First, clear any older pending blocks.
 832          // TODO(kaimast): ensure there are no dangling block requests
 833          while let Some(pending_block) = pending_blocks.front() {
 834              if pending_block.height() > ledger_block_height {
 835                  break;
 836              }
 837  
 838              pending_blocks.pop_front();
 839          }
 840  
 841          if let Some(tail) = pending_blocks.back() {
 842              if tail.height() >= new_block.height() {
 843                  debug!(
 844                      "A unconfirmed block is queued already for height {height}. \
 845                      Will not sync.",
 846                      height = new_block.height()
 847                  );
 848                  return Ok(());
 849              }
 850  
 851              ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block");
 852          }
 853  
 854          // Fetch the latest block height.
 855          let ledger_block_height = self.ledger.latest_block_height();
 856  
 857          // Clear any older pending blocks.
 858          // TODO(kaimast): ensure there are no dangling block requests
 859          while let Some(pending_block) = pending_blocks.front() {
 860              if pending_block.height() > ledger_block_height {
 861                  break;
 862              }
 863  
 864              trace!(
 865                  "Pending block {hash} at height {height} became obsolete",
 866                  hash = pending_block.hash(),
 867                  height = pending_block.height()
 868              );
 869              pending_blocks.pop_front();
 870          }
 871  
 872          // Check the block against the chain of pending blocks and append it on success.
 873          let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) {
 874              Ok(new_block) => new_block,
 875              Err(err) => {
 876                  // TODO(kaimast): this shoud not return an error on the alphavm side.
 877                  if err.to_string().contains("already in the ledger") {
 878                      debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",);
 879  
 880                      return Ok(());
 881                  } else {
 882                      return Err(err);
 883                  }
 884              }
 885          };
 886  
 887          trace!(
 888              "Adding new pending block {hash} at height {height}",
 889              hash = new_block.hash(),
 890              height = new_block.height()
 891          );
 892          pending_blocks.push_back(new_block);
 893  
 894          // Now, figure out if and which pending block we can commit.
 895          // To do this effectively and because commits are transitive,
 896          // we iterate in reverse so that we can stop at the first successful check.
 897          //
 898          // Note, that if the storage already contains certificates for the round after new block,
 899          // the availability threshold for the new block could also be reached.
 900          let mut commit_height = None;
 901          for block in pending_blocks.iter().rev() {
 902              // This check assumes that the pending blocks are properly linked together, based on the fact that,
 903              // to generate the sequence of `PendingBlocks`, each block needs to successfully be processed by `Ledger::check_block_subdag`.
 904              // As a result, the safety of this piece of code relies on the correctness `Ledger::check_block_subdag`,
 905              // which is tested in `alphavm/ledger/tests/pending_block.rs`.
 906              if self
 907                  .is_block_availability_threshold_reached(block)
 908                  .with_context(|| "Availability threshold check failed")?
 909              {
 910                  commit_height = Some(block.height());
 911                  break;
 912              }
 913          }
 914  
 915          if let Some(commit_height) = commit_height {
 916              let start_height = ledger_block_height + 1;
 917              ensure!(commit_height >= start_height, "Invalid commit height");
 918              let num_blocks = (commit_height - start_height + 1) as usize;
 919  
 920              // Create a more detailed log message if we are committing more than one block at a time.
 921              if num_blocks > 1 {
 922                  trace!(
 923                      "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.",
 924                      chain_length = pending_blocks.len(),
 925                  );
 926              }
 927  
 928              for pending_block in pending_blocks.drain(0..num_blocks) {
 929                  let hash = pending_block.hash();
 930                  let height = pending_block.height();
 931                  let ledger = self.ledger.clone();
 932                  let storage = self.storage.clone();
 933  
 934                  spawn_blocking!({
 935                      let block = ledger.check_block_content(pending_block).with_context(|| {
 936                          format!("Failed to check contents of pending block {hash} at height {height}")
 937                      })?;
 938  
 939                      trace!("Adding pending block {hash} at height {height} to the ledger");
 940                      ledger.advance_to_next_block(&block)?;
 941                      // Sync the height with the block.
 942                      storage.sync_height_with_block(block.height());
 943                      // Sync the round with the block.
 944                      storage.sync_round_with_block(block.round());
 945  
 946                      Ok(())
 947                  })?
 948              }
 949          } else {
 950              trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len());
 951          }
 952  
 953          Ok(())
 954      }
 955  }
 956  
 957  // Methods to assist with the block sync module.
 958  impl<N: Network> Sync<N> {
 959      /// Returns `true` if the node is synced and has connected peers.
 960      pub fn is_synced(&self) -> bool {
 961          // Ensure the validator is connected to other validators,
 962          // not just clients.
 963          if self.gateway.number_of_connected_peers() == 0 {
 964              return false;
 965          }
 966  
 967          self.block_sync.is_block_synced()
 968      }
 969  
 970      /// Returns the number of blocks the node is behind the greatest peer height.
 971      pub fn num_blocks_behind(&self) -> Option<u32> {
 972          self.block_sync.num_blocks_behind()
 973      }
 974  
 975      /// Returns the current block locators of the node.
 976      pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
 977          self.block_sync.get_block_locators()
 978      }
 979  }
 980  
 981  // Methods to assist with fetching batch certificates from peers.
 982  impl<N: Network> Sync<N> {
 983      /// Sends a certificate request to the specified peer.
 984      pub async fn send_certificate_request(
 985          &self,
 986          peer_ip: SocketAddr,
 987          certificate_id: Field<N>,
 988      ) -> Result<BatchCertificate<N>> {
 989          // Initialize a oneshot channel.
 990          let (callback_sender, callback_receiver) = oneshot::channel();
 991          // Determine how many sent requests are pending.
 992          let num_sent_requests = self.pending.num_sent_requests(certificate_id);
 993          // Determine if we've already sent a request to the peer.
 994          let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
 995          // Determine the maximum number of redundant requests.
 996          let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
 997          // Determine if we should send a certificate request to the peer.
 998          // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
 999          let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
1000  
1001          // Insert the certificate ID into the pending queue.
1002          self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
1003  
1004          // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
1005          if should_send_request {
1006              // Send the certificate request to the peer.
1007              if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
1008                  bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)")
1009              }
1010          } else {
1011              debug!(
1012                  "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
1013                  fmt_id(certificate_id)
1014              );
1015          }
1016          // Wait for the certificate to be fetched.
1017          // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators.
1018          tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
1019              .await
1020              .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))?
1021              .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id)))
1022      }
1023  
1024      /// Handles the incoming certificate request.
1025      fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
1026          // Attempt to retrieve the certificate.
1027          if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
1028              // Send the certificate response to the peer.
1029              let self_ = self.clone();
1030              tokio::spawn(async move {
1031                  let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
1032              });
1033          }
1034      }
1035  
1036      /// Handles the incoming certificate response.
1037      /// This method ensures the certificate response is well-formed and matches the certificate ID.
1038      fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
1039          let certificate = response.certificate;
1040          // Check if the peer IP exists in the pending queue for the given certificate ID.
1041          let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
1042          // If the peer IP exists, finish the pending request.
1043          if exists {
1044              // TODO: Validate the certificate.
1045              // Remove the certificate ID from the pending queue.
1046              self.pending.remove(certificate.id(), Some(certificate));
1047          }
1048      }
1049  }
1050  
1051  impl<N: Network> Sync<N> {
1052      /// Spawns a task with the given future; it should only be used for long-running tasks.
1053      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1054          self.handles.lock().push(tokio::spawn(future));
1055      }
1056  
1057      /// Shuts down the primary.
1058      pub async fn shut_down(&self) {
1059          info!("Shutting down the sync module...");
1060          // Acquire the response lock.
1061          let _lock = self.response_lock.lock().await;
1062          // Acquire the sync lock.
1063          let _lock = self.sync_lock.lock().await;
1064          // Abort the tasks.
1065          self.handles.lock().iter().for_each(|handle| handle.abort());
1066      }
1067  }
1068  
1069  #[cfg(test)]
1070  mod tests {
1071      use super::*;
1072  
1073      use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
1074  
1075      use alphaos_account::Account;
1076      use alphaos_node_sync::BlockSync;
1077      use alphaos_utilities::SimpleStoppable;
1078      use alphavm::{
1079          console::{
1080              account::{Address, PrivateKey},
1081              network::MainnetV0,
1082          },
1083          ledger::{
1084              narwhal::{BatchCertificate, BatchHeader, Subdag},
1085              store::{helpers::memory::ConsensusMemory, ConsensusStore},
1086          },
1087          prelude::{Ledger, VM},
1088          utilities::TestRng,
1089      };
1090  
1091      use alphastd::StorageMode;
1092      use indexmap::IndexSet;
1093      use rand::Rng;
1094      use std::collections::BTreeMap;
1095  
1096      type CurrentNetwork = MainnetV0;
1097      type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1098      type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
1099  
1100      /// Tests that commits work as expected when some anchors are not committed immediately.
1101      #[tokio::test]
1102      #[tracing_test::traced_test]
1103      async fn test_commit_chain() -> anyhow::Result<()> {
1104          let rng = &mut TestRng::default();
1105          // Initialize the round parameters.
1106          let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1107  
1108          // The first round of the first block.
1109          let first_round = 1;
1110          // The total number of blocks we test
1111          let num_blocks = 3;
1112          // The number of certificate rounds needed.
1113          // There is one additional round to provide availability for the inal block.
1114          let num_rounds = first_round + num_blocks * 2 + 1;
1115          // The first round that has at least N-f certificates referencing the anchor from the previous round.
1116          // This is also the last round we use in the test.
1117          let first_committed_round = num_rounds - 1;
1118  
1119          // Initialize the store.
1120          let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1121          let account: Account<CurrentNetwork> = Account::new(rng)?;
1122  
1123          // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1124          let seed: u64 = rng.r#gen();
1125          let vm = VM::from(store).unwrap();
1126          let genesis_pk = *account.private_key();
1127          let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1128  
1129          // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1130          let genesis_rng = &mut TestRng::from_seed(seed);
1131          let private_keys = [
1132              *account.private_key(),
1133              PrivateKey::new(genesis_rng)?,
1134              PrivateKey::new(genesis_rng)?,
1135              PrivateKey::new(genesis_rng)?,
1136          ];
1137  
1138          // Initialize the ledger with the genesis block.
1139          let genesis_clone = genesis.clone();
1140          let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap();
1141          // Initialize the ledger.
1142          let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1143  
1144          // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1145          let (round_to_certificates_map, committee) = {
1146              let addresses = [
1147                  Address::try_from(private_keys[0])?,
1148                  Address::try_from(private_keys[1])?,
1149                  Address::try_from(private_keys[2])?,
1150                  Address::try_from(private_keys[3])?,
1151              ];
1152  
1153              let committee = ledger.latest_committee().unwrap();
1154  
1155              // Initialize a mapping from the round number to the set of batch certificates in the round.
1156              let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1157                  HashMap::new();
1158              let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1159  
1160              for round in first_round..=first_committed_round {
1161                  let mut current_certificates = IndexSet::new();
1162                  let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1163                      IndexSet::new()
1164                  } else {
1165                      previous_certificates.iter().map(|c| c.id()).collect()
1166                  };
1167  
1168                  let committee_id = committee.id();
1169                  let prev_leader = committee.get_leader(round - 1).unwrap();
1170  
1171                  // For the first two blocks non-leaders will not reference the leader certificate.
1172                  // This means, while there is an anchor, it is isn't committed
1173                  // until later.
1174                  for (i, private_key) in private_keys.iter().enumerate() {
1175                      let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap();
1176                      let is_certificate_round = round % 2 == 1;
1177                      let is_leader = i == leader_index;
1178  
1179                      let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader {
1180                          previous_certificate_ids
1181                              .iter()
1182                              .cloned()
1183                              .enumerate()
1184                              .filter(|(idx, _)| *idx != leader_index)
1185                              .map(|(_, id)| id)
1186                              .collect()
1187                      } else {
1188                          previous_certificate_ids.clone()
1189                      };
1190  
1191                      let batch_header = BatchHeader::new(
1192                          private_key,
1193                          round,
1194                          now(),
1195                          committee_id,
1196                          Default::default(),
1197                          previous_certs,
1198                          rng,
1199                      )
1200                      .unwrap();
1201  
1202                      // Sign the batch header.
1203                      let mut signatures = IndexSet::with_capacity(4);
1204                      for (j, private_key_2) in private_keys.iter().enumerate() {
1205                          if i != j {
1206                              signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1207                          }
1208                      }
1209                      current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1210                  }
1211  
1212                  // Update the map of certificates.
1213                  round_to_certificates_map.insert(round, current_certificates.clone());
1214                  previous_certificates = current_certificates;
1215              }
1216              (round_to_certificates_map, committee)
1217          };
1218  
1219          // Initialize the storage.
1220          let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1221          // Insert all certificates into storage.
1222          let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1223          for i in first_round..=first_committed_round {
1224              let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1225              certificates.extend(c);
1226          }
1227          for certificate in certificates.clone().iter() {
1228              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1229          }
1230  
1231          // Create the blocks
1232          let mut previous_leader_cert = None;
1233          let mut blocks = vec![];
1234  
1235          for block_height in 1..=num_blocks {
1236              let leader_round = block_height * 2;
1237  
1238              let leader = committee.get_leader(leader_round).unwrap();
1239              let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap();
1240  
1241              let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1242              let mut leader_cert_map = IndexSet::new();
1243              leader_cert_map.insert(leader_certificate.clone());
1244  
1245              let previous_cert_map = storage.get_certificates_for_round(leader_round - 1);
1246  
1247              subdag_map.insert(leader_round, leader_cert_map.clone());
1248              subdag_map.insert(leader_round - 1, previous_cert_map.clone());
1249  
1250              if leader_round > 2 {
1251                  let previous_commit_cert_map: IndexSet<_> = storage
1252                      .get_certificates_for_round(leader_round - 2)
1253                      .into_iter()
1254                      .filter(|cert| {
1255                          if let Some(previous_leader_cert) = &previous_leader_cert {
1256                              cert != previous_leader_cert
1257                          } else {
1258                              true
1259                          }
1260                      })
1261                      .collect();
1262                  subdag_map.insert(leader_round - 2, previous_commit_cert_map);
1263              }
1264  
1265              let subdag = Subdag::from(subdag_map.clone())?;
1266              previous_leader_cert = Some(leader_certificate);
1267  
1268              let core_ledger = core_ledger.clone();
1269              let block = spawn_blocking!({
1270                  let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?;
1271                  core_ledger.advance_to_next_block(&block)?;
1272                  Ok(block)
1273              })?;
1274  
1275              blocks.push(block);
1276          }
1277  
1278          // ### Test that sync works as expected ###
1279          let storage_mode = StorageMode::new_test(None);
1280  
1281          // Create a new ledger to test with, but use the existing storage
1282          // so that the certificates exist.
1283          let syncing_ledger = {
1284              let storage_mode = storage_mode.clone();
1285              Arc::new(CoreLedgerService::new(
1286                  spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(),
1287                  SimpleStoppable::new(),
1288              ))
1289          };
1290  
1291          // Set up sync and its dependencies.
1292          let gateway = Gateway::new(
1293              account.clone(),
1294              storage.clone(),
1295              syncing_ledger.clone(),
1296              None,
1297              &[],
1298              false,
1299              StorageMode::new_test(None),
1300              None,
1301          )?;
1302          let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
1303          let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync);
1304  
1305          let mut block_iter = blocks.into_iter();
1306  
1307          // Insert the blocks into the new sync module
1308          for _ in 0..num_blocks - 1 {
1309              let block = block_iter.next().unwrap();
1310              sync.sync_storage_with_block(block).await?;
1311  
1312              // Availability threshold is not met, so we should not advance yet.
1313              assert_eq!(syncing_ledger.latest_block_height(), 0);
1314          }
1315  
1316          // Only for the final block, the availability threshold is met,
1317          // because certificates for the subsequent round are already in storage.
1318          sync.sync_storage_with_block(block_iter.next().unwrap()).await?;
1319          assert_eq!(syncing_ledger.latest_block_height(), 3);
1320  
1321          // Ensure blocks 1 and 2 were added to the ledger.
1322          assert!(syncing_ledger.contains_block_height(1));
1323          assert!(syncing_ledger.contains_block_height(2));
1324  
1325          Ok(())
1326      }
1327  
1328      #[tokio::test]
1329      #[tracing_test::traced_test]
1330      async fn test_pending_certificates() -> anyhow::Result<()> {
1331          let rng = &mut TestRng::default();
1332          // Initialize the round parameters.
1333          let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1334          let commit_round = 2;
1335  
1336          // Initialize the store.
1337          let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap();
1338          let account: Account<CurrentNetwork> = Account::new(rng)?;
1339  
1340          // Create a genesis block with a seeded RNG to reproduce the same genesis private keys.
1341          let seed: u64 = rng.r#gen();
1342          let vm = VM::from(store).unwrap();
1343          let genesis_pk = *account.private_key();
1344          let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap();
1345  
1346          // Extract the private keys from the genesis committee by using the same RNG to sample private keys.
1347          let genesis_rng = &mut TestRng::from_seed(seed);
1348          let private_keys = [
1349              *account.private_key(),
1350              PrivateKey::new(genesis_rng)?,
1351              PrivateKey::new(genesis_rng)?,
1352              PrivateKey::new(genesis_rng)?,
1353          ];
1354          // Initialize the ledger with the genesis block.
1355          let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap();
1356          // Initialize the ledger.
1357          let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new()));
1358          // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1359          let (round_to_certificates_map, committee) = {
1360              // Initialize the committee.
1361              let committee = core_ledger.current_committee().unwrap();
1362              // Initialize a mapping from the round number to the set of batch certificates in the round.
1363              let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
1364                  HashMap::new();
1365              let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
1366  
1367              for round in 0..=commit_round + 8 {
1368                  let mut current_certificates = IndexSet::new();
1369                  let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1370                      IndexSet::new()
1371                  } else {
1372                      previous_certificates.iter().map(|c| c.id()).collect()
1373                  };
1374                  let committee_id = committee.id();
1375                  // Create a certificate for each validator.
1376                  for (i, private_key_1) in private_keys.iter().enumerate() {
1377                      let batch_header = BatchHeader::new(
1378                          private_key_1,
1379                          round,
1380                          now(),
1381                          committee_id,
1382                          Default::default(),
1383                          previous_certificate_ids.clone(),
1384                          rng,
1385                      )
1386                      .unwrap();
1387                      // Sign the batch header.
1388                      let mut signatures = IndexSet::with_capacity(4);
1389                      for (j, private_key_2) in private_keys.iter().enumerate() {
1390                          if i != j {
1391                              signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1392                          }
1393                      }
1394                      current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
1395                  }
1396  
1397                  // Update the map of certificates.
1398                  round_to_certificates_map.insert(round, current_certificates.clone());
1399                  previous_certificates = current_certificates.clone();
1400              }
1401              (round_to_certificates_map, committee)
1402          };
1403  
1404          // Initialize the storage.
1405          let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1406          // Insert certificates into storage.
1407          let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
1408          for i in 1..=commit_round + 8 {
1409              let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
1410              certificates.extend(c);
1411          }
1412          for certificate in certificates.clone().iter() {
1413              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1414          }
1415          // Create block 1.
1416          let leader_round_1 = commit_round;
1417          let leader_1 = committee.get_leader(leader_round_1).unwrap();
1418          let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
1419          let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1420          let block_1 = {
1421              let mut leader_cert_map = IndexSet::new();
1422              leader_cert_map.insert(leader_certificate.clone());
1423              let mut previous_cert_map = IndexSet::new();
1424              for cert in storage.get_certificates_for_round(commit_round - 1) {
1425                  previous_cert_map.insert(cert);
1426              }
1427              subdag_map.insert(commit_round, leader_cert_map.clone());
1428              subdag_map.insert(commit_round - 1, previous_cert_map.clone());
1429              let subdag = Subdag::from(subdag_map.clone())?;
1430              let ledger = core_ledger.clone();
1431              spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))?
1432          };
1433          // Insert block 1.
1434          let ledger = core_ledger.clone();
1435          let block = block_1.clone();
1436          spawn_blocking!(ledger.advance_to_next_block(&block))?;
1437  
1438          // Create block 2.
1439          let leader_round_2 = commit_round + 2;
1440          let leader_2 = committee.get_leader(leader_round_2).unwrap();
1441          let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
1442          let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1443          let block_2 = {
1444              let mut leader_cert_map_2 = IndexSet::new();
1445              leader_cert_map_2.insert(leader_certificate_2.clone());
1446              let mut previous_cert_map_2 = IndexSet::new();
1447              for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
1448                  previous_cert_map_2.insert(cert);
1449              }
1450              subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
1451              subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
1452              let subdag_2 = Subdag::from(subdag_map_2.clone())?;
1453              let ledger = core_ledger.clone();
1454              spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))?
1455          };
1456          // Insert block 2.
1457          let ledger = core_ledger.clone();
1458          let block = block_2.clone();
1459          spawn_blocking!(ledger.advance_to_next_block(&block))?;
1460  
1461          // Create block 3
1462          let leader_round_3 = commit_round + 4;
1463          let leader_3 = committee.get_leader(leader_round_3).unwrap();
1464          let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
1465          let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
1466          let block_3 = {
1467              let mut leader_cert_map_3 = IndexSet::new();
1468              leader_cert_map_3.insert(leader_certificate_3.clone());
1469              let mut previous_cert_map_3 = IndexSet::new();
1470              for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
1471                  previous_cert_map_3.insert(cert);
1472              }
1473              subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
1474              subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
1475              let subdag_3 = Subdag::from(subdag_map_3.clone())?;
1476              let ledger = core_ledger.clone();
1477              spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))?
1478          };
1479          // Insert block 3.
1480          let ledger = core_ledger.clone();
1481          let block = block_3.clone();
1482          spawn_blocking!(ledger.advance_to_next_block(&block))?;
1483  
1484          /*
1485              Check that the pending certificates are computed correctly.
1486          */
1487  
1488          // Retrieve the pending certificates.
1489          let pending_certificates = storage.get_pending_certificates();
1490          // Check that all of the pending certificates are not contained in the ledger.
1491          for certificate in pending_certificates.clone() {
1492              assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
1493          }
1494          // Initialize an empty set to be populated with the committed certificates in the block subdags.
1495          let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1496          {
1497              let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
1498              for subdag in subdag_maps.iter() {
1499                  for subdag_certificates in subdag.values() {
1500                      committed_certificates.extend(subdag_certificates.iter().cloned());
1501                  }
1502              }
1503          };
1504          // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates.
1505          let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
1506          for certificate in certificates.clone() {
1507              if !committed_certificates.contains(&certificate) {
1508                  candidate_pending_certificates.insert(certificate);
1509              }
1510          }
1511          // Check that the set of pending certificates is equal to the set of candidate pending certificates.
1512          assert_eq!(pending_certificates, candidate_pending_certificates);
1513  
1514          Ok(())
1515      }
1516  }