/ node / bft / src / bft.rs
bft.rs
   1  // Copyright (c) 2025 ADnet Contributors
   2  // This file is part of the AlphaOS library.
   3  
   4  // Licensed under the Apache License, Version 2.0 (the "License");
   5  // you may not use this file except in compliance with the License.
   6  // You may obtain a copy of the License at:
   7  
   8  // http://www.apache.org/licenses/LICENSE-2.0
   9  
  10  // Unless required by applicable law or agreed to in writing, software
  11  // distributed under the License is distributed on an "AS IS" BASIS,
  12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13  // See the License for the specific language governing permissions and
  14  // limitations under the License.
  15  
  16  use crate::{
  17      MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
  18      Primary,
  19      helpers::{
  20          BFTReceiver,
  21          ConsensusSender,
  22          DAG,
  23          PrimaryReceiver,
  24          PrimarySender,
  25          Storage,
  26          fmt_id,
  27          init_bft_channels,
  28          now,
  29      },
  30  };
  31  use alphaos_account::Account;
  32  use alphaos_node_bft_ledger_service::LedgerService;
  33  use alphaos_node_sync::{BlockSync, Ping};
  34  use alphavm::{
  35      console::account::Address,
  36      ledger::{
  37          block::Transaction,
  38          committee::Committee,
  39          narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
  40          puzzle::{Solution, SolutionID},
  41      },
  42      prelude::{Field, Network, Result, bail, ensure},
  43      utilities::flatten_error,
  44  };
  45  
  46  use alpha_std::StorageMode;
  47  use anyhow::Context;
  48  use colored::Colorize;
  49  use indexmap::{IndexMap, IndexSet};
  50  #[cfg(feature = "locktick")]
  51  use locktick::{
  52      parking_lot::{Mutex, RwLock},
  53      tokio::Mutex as TMutex,
  54  };
  55  #[cfg(not(feature = "locktick"))]
  56  use parking_lot::{Mutex, RwLock};
  57  use std::{
  58      collections::{BTreeMap, HashSet},
  59      future::Future,
  60      net::SocketAddr,
  61      sync::{
  62          Arc,
  63          atomic::{AtomicI64, Ordering},
  64      },
  65  };
  66  #[cfg(not(feature = "locktick"))]
  67  use tokio::sync::Mutex as TMutex;
  68  use tokio::{
  69      sync::{OnceCell, oneshot},
  70      task::JoinHandle,
  71  };
  72  
  73  #[derive(Clone)]
  74  pub struct BFT<N: Network> {
  75      /// The primary for this node.
  76      primary: Primary<N>,
  77      /// The DAG of batches from which we build the blockchain.
  78      dag: Arc<RwLock<DAG<N>>>,
  79      /// The batch certificate of the leader from the current even round, if one was present.
  80      leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
  81      /// The timer for the leader certificate to be received.
  82      leader_certificate_timer: Arc<AtomicI64>,
  83      /// The consensus sender.
  84      consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
  85      /// Handles for all spawned tasks.
  86      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
  87      /// The BFT lock.
  88      lock: Arc<TMutex<()>>,
  89  }
  90  
  91  impl<N: Network> BFT<N> {
  92      /// Initializes a new instance of the BFT.
  93      #[allow(clippy::too_many_arguments)]
  94      pub fn new(
  95          account: Account<N>,
  96          storage: Storage<N>,
  97          ledger: Arc<dyn LedgerService<N>>,
  98          block_sync: Arc<BlockSync<N>>,
  99          ip: Option<SocketAddr>,
 100          trusted_validators: &[SocketAddr],
 101          trusted_peers_only: bool,
 102          storage_mode: StorageMode,
 103          dev: Option<u16>,
 104      ) -> Result<Self> {
 105          Ok(Self {
 106              primary: Primary::new(
 107                  account,
 108                  storage,
 109                  ledger,
 110                  block_sync,
 111                  ip,
 112                  trusted_validators,
 113                  trusted_peers_only,
 114                  storage_mode,
 115                  dev,
 116              )?,
 117              dag: Default::default(),
 118              leader_certificate: Default::default(),
 119              leader_certificate_timer: Default::default(),
 120              consensus_sender: Default::default(),
 121              handles: Default::default(),
 122              lock: Default::default(),
 123          })
 124      }
 125  
 126      /// Run the BFT instance.
 127      ///
 128      /// This will return as soon as all required tasks are spawned.
 129      /// The function must not be called more than once per instance.
 130      pub async fn run(
 131          &mut self,
 132          ping: Option<Arc<Ping<N>>>,
 133          consensus_sender: Option<ConsensusSender<N>>,
 134          primary_sender: PrimarySender<N>,
 135          primary_receiver: PrimaryReceiver<N>,
 136      ) -> Result<()> {
 137          info!("Starting the BFT instance...");
 138          // Initialize the BFT channels.
 139          let (bft_sender, bft_receiver) = init_bft_channels::<N>();
 140          // First, start the BFT handlers.
 141          self.start_handlers(bft_receiver);
 142          // Next, run the primary instance.
 143          self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
 144          // Lastly, set the consensus sender.
 145          // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
 146          if let Some(consensus_sender) = consensus_sender {
 147              self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
 148          }
 149          Ok(())
 150      }
 151  
 152      /// Returns `true` if the primary is synced.
 153      pub fn is_synced(&self) -> bool {
 154          self.primary.is_synced()
 155      }
 156  
 157      /// Returns the primary.
 158      pub const fn primary(&self) -> &Primary<N> {
 159          &self.primary
 160      }
 161  
 162      /// Returns the storage.
 163      pub const fn storage(&self) -> &Storage<N> {
 164          self.primary.storage()
 165      }
 166  
 167      /// Returns the ledger.
 168      pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
 169          self.primary.ledger()
 170      }
 171  
 172      /// Returns the leader of the current even round, if one was present.
 173      pub fn leader(&self) -> Option<Address<N>> {
 174          self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
 175      }
 176  
 177      /// Returns the certificate of the leader from the current even round, if one was present.
 178      pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
 179          &self.leader_certificate
 180      }
 181  }
 182  
 183  impl<N: Network> BFT<N> {
 184      /// Returns the number of unconfirmed transmissions.
 185      pub fn num_unconfirmed_transmissions(&self) -> usize {
 186          self.primary.num_unconfirmed_transmissions()
 187      }
 188  
 189      /// Returns the number of unconfirmed ratifications.
 190      pub fn num_unconfirmed_ratifications(&self) -> usize {
 191          self.primary.num_unconfirmed_ratifications()
 192      }
 193  
 194      /// Returns the number of solutions.
 195      pub fn num_unconfirmed_solutions(&self) -> usize {
 196          self.primary.num_unconfirmed_solutions()
 197      }
 198  
 199      /// Returns the number of unconfirmed transactions.
 200      pub fn num_unconfirmed_transactions(&self) -> usize {
 201          self.primary.num_unconfirmed_transactions()
 202      }
 203  }
 204  
 205  impl<N: Network> BFT<N> {
 206      /// Returns the worker transmission IDs.
 207      pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
 208          self.primary.worker_transmission_ids()
 209      }
 210  
 211      /// Returns the worker transmissions.
 212      pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
 213          self.primary.worker_transmissions()
 214      }
 215  
 216      /// Returns the worker solutions.
 217      pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
 218          self.primary.worker_solutions()
 219      }
 220  
 221      /// Returns the worker transactions.
 222      pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
 223          self.primary.worker_transactions()
 224      }
 225  }
 226  
 227  impl<N: Network> BFT<N> {
 228      /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
 229      fn update_to_next_round(&self, current_round: u64) -> bool {
 230          // Ensure the current round is at least the storage round (this is a sanity check).
 231          let storage_round = self.storage().current_round();
 232          if current_round < storage_round {
 233              debug!(
 234                  "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
 235              );
 236              return false;
 237          }
 238  
 239          // Determine if the BFT is ready to update to the next round.
 240          let is_ready = match current_round % 2 == 0 {
 241              true => self.update_leader_certificate_to_even_round(current_round),
 242              false => self.is_leader_quorum_or_nonleaders_available(current_round),
 243          };
 244  
 245          #[cfg(feature = "metrics")]
 246          {
 247              let start = self.leader_certificate_timer.load(Ordering::SeqCst);
 248              // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
 249              if start > 0 {
 250                  let end = now();
 251                  let elapsed = std::time::Duration::from_secs((end - start) as u64);
 252                  metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
 253              }
 254          }
 255  
 256          // Log whether the round is going to update.
 257          if current_round % 2 == 0 {
 258              // Determine if there is a leader certificate.
 259              if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
 260                  // Ensure the state of the leader certificate is consistent with the BFT being ready.
 261                  if !is_ready {
 262                      trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
 263                  }
 264                  // Log the leader election.
 265                  let leader_round = leader_certificate.round();
 266                  match leader_round == current_round {
 267                      true => {
 268                          info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
 269                          #[cfg(feature = "metrics")]
 270                          metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
 271                      }
 272                      false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
 273                  }
 274              } else {
 275                  match is_ready {
 276                      true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
 277                      false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
 278                  }
 279              }
 280          }
 281  
 282          // If the BFT is ready, then update to the next round.
 283          if is_ready {
 284              // Update to the next round in storage.
 285              if let Err(err) = self
 286                  .storage()
 287                  .increment_to_next_round(current_round)
 288                  .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
 289              {
 290                  warn!("{}", &flatten_error(err));
 291                  return false;
 292              }
 293              // Update the timer for the leader certificate.
 294              self.leader_certificate_timer.store(now(), Ordering::SeqCst);
 295          }
 296  
 297          is_ready
 298      }
 299  
 300      /// Updates the leader certificate to the current even round,
 301      /// returning `true` if the BFT is ready to update to the next round.
 302      ///
 303      /// This method runs on every even round, by determining the leader of the current even round,
 304      /// and setting the leader certificate to their certificate in the round, if they were present.
 305      fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
 306          // Retrieve the current round.
 307          let current_round = self.storage().current_round();
 308          // Ensure the current round matches the given round.
 309          if current_round != even_round {
 310              warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
 311              return false;
 312          }
 313  
 314          // If the current round is odd, return false.
 315          if current_round % 2 != 0 || current_round < 2 {
 316              error!("BFT cannot update the leader certificate in an odd round");
 317              return false;
 318          }
 319  
 320          // Retrieve the certificates for the current round.
 321          let current_certificates = self.storage().get_certificates_for_round(current_round);
 322          // If there are no current certificates, set the leader certificate to 'None', and return early.
 323          if current_certificates.is_empty() {
 324              // Set the leader certificate to 'None'.
 325              *self.leader_certificate.write() = None;
 326              return false;
 327          }
 328  
 329          // Retrieve the committee lookback of the current round.
 330          let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
 331              Ok(committee) => committee,
 332              Err(err) => {
 333                  let err = err.context(format!(
 334                      "BFT failed to retrieve the committee lookback for the even round {current_round}"
 335                  ));
 336                  warn!("{}", &flatten_error(err));
 337                  return false;
 338              }
 339          };
 340          // Determine the leader of the current round.
 341          let leader = match self.ledger().latest_leader() {
 342              Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
 343              _ => {
 344                  // Compute the leader for the current round.
 345                  let computed_leader = match committee_lookback.get_leader(current_round) {
 346                      Ok(leader) => leader,
 347                      Err(err) => {
 348                          let err =
 349                              err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
 350                          error!("{}", &flatten_error(err));
 351                          return false;
 352                      }
 353                  };
 354  
 355                  // Cache the computed leader.
 356                  self.ledger().update_latest_leader(current_round, computed_leader);
 357  
 358                  computed_leader
 359              }
 360          };
 361          // Find and set the leader certificate, if the leader was present in the current even round.
 362          let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
 363          *self.leader_certificate.write() = leader_certificate.cloned();
 364  
 365          self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
 366      }
 367  
 368      /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
 369      ///  - If the leader certificate is set for the current even round.
 370      ///  - The timer for the leader certificate has expired.
 371      fn is_even_round_ready_for_next_round(
 372          &self,
 373          certificates: IndexSet<BatchCertificate<N>>,
 374          committee: Committee<N>,
 375          current_round: u64,
 376      ) -> bool {
 377          // Retrieve the authors for the current round.
 378          let authors = certificates.into_iter().map(|c| c.author()).collect();
 379          // Check if quorum threshold is reached.
 380          if !committee.is_quorum_threshold_reached(&authors) {
 381              trace!("BFT failed to reach quorum threshold in even round {current_round}");
 382              return false;
 383          }
 384          // If the leader certificate is set for the current even round, return 'true'.
 385          if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
 386              if leader_certificate.round() == current_round {
 387                  return true;
 388              }
 389          }
 390          // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
 391          if self.is_timer_expired() {
 392              debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
 393              return true;
 394          }
 395          // Otherwise, return 'false'.
 396          false
 397      }
 398  
 399      /// Returns `true` if the timer for the leader certificate has expired.
 400      ///
 401      /// This is always true for a new BFT instance.
 402      fn is_timer_expired(&self) -> bool {
 403          self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
 404      }
 405  
 406      /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
 407      ///  - The leader certificate is `None`.
 408      ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
 409      ///  - The leader certificate timer has expired.
 410      fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
 411          // Retrieve the current round.
 412          let current_round = self.storage().current_round();
 413          // Ensure the current round matches the given round.
 414          if current_round != odd_round {
 415              warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
 416              return false;
 417          }
 418          // If the current round is even, return false.
 419          if current_round % 2 != 1 {
 420              error!("BFT does not compute stakes for the leader certificate in an even round");
 421              return false;
 422          }
 423          // Retrieve the certificates for the current round.
 424          let current_certificates = self.storage().get_certificates_for_round(current_round);
 425          // Retrieve the committee lookback for the current round.
 426          let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
 427              Ok(committee) => committee,
 428              Err(err) => {
 429                  let err = err.context(format!(
 430                      "BFT failed to retrieve the committee lookback for the odd round {current_round}"
 431                  ));
 432                  error!("{}", &flatten_error(err));
 433                  return false;
 434              }
 435          };
 436          // Retrieve the authors of the current certificates.
 437          let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
 438          // Check if quorum threshold is reached.
 439          if !committee_lookback.is_quorum_threshold_reached(&authors) {
 440              trace!("BFT failed reach quorum threshold in odd round {current_round}.");
 441              return false;
 442          }
 443          // Retrieve the leader certificate.
 444          let Some(leader_certificate) = self.leader_certificate.read().clone() else {
 445              // If there is no leader certificate for the previous round, return 'true'.
 446              return true;
 447          };
 448          // Compute the stake for the leader certificate.
 449          let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
 450              leader_certificate.id(),
 451              current_certificates,
 452              &committee_lookback,
 453          );
 454          // Return 'true' if any of the following conditions hold:
 455          stake_with_leader >= committee_lookback.availability_threshold()
 456              || stake_without_leader >= committee_lookback.quorum_threshold()
 457              || self.is_timer_expired()
 458      }
 459  
 460      /// Computes the amount of stake that has & has not signed for the leader certificate.
 461      fn compute_stake_for_leader_certificate(
 462          &self,
 463          leader_certificate_id: Field<N>,
 464          current_certificates: IndexSet<BatchCertificate<N>>,
 465          current_committee: &Committee<N>,
 466      ) -> (u64, u64) {
 467          // If there are no current certificates, return early.
 468          if current_certificates.is_empty() {
 469              return (0, 0);
 470          }
 471  
 472          // Initialize a tracker for the stake with the leader.
 473          let mut stake_with_leader = 0u64;
 474          // Initialize a tracker for the stake without the leader.
 475          let mut stake_without_leader = 0u64;
 476          // Iterate over the current certificates.
 477          for certificate in current_certificates {
 478              // Retrieve the stake for the author of the certificate.
 479              let stake = current_committee.get_stake(certificate.author());
 480              // Determine if the certificate includes the leader.
 481              match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
 482                  // If the certificate includes the leader, add the stake to the stake with the leader.
 483                  true => stake_with_leader = stake_with_leader.saturating_add(stake),
 484                  // If the certificate does not include the leader, add the stake to the stake without the leader.
 485                  false => stake_without_leader = stake_without_leader.saturating_add(stake),
 486              }
 487          }
 488          // Return the stake with the leader, and the stake without the leader.
 489          (stake_with_leader, stake_without_leader)
 490      }
 491  }
 492  
 493  impl<N: Network> BFT<N> {
 494      /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
 495      async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
 496          &self,
 497          certificate: BatchCertificate<N>,
 498      ) -> Result<()> {
 499          // Acquire the BFT lock.
 500          let _lock = self.lock.lock().await;
 501  
 502          // ### First, insert the certificate into the DAG. ###
 503          // Retrieve the round of the new certificate to add to the DAG.
 504          let certificate_round = certificate.round();
 505  
 506          // Insert the certificate into the DAG.
 507          self.dag.write().insert(certificate);
 508  
 509          // ### Second, determine if a new leader certificate can be committed. ###
 510          let commit_round = certificate_round.saturating_sub(1);
 511  
 512          // Leaders are elected in even rounds.
 513          // If the previous round is odd, the current round cannot commit any leader certs.
 514          // Similarly, no leader certificate can be committed for round zero.
 515          if !commit_round.is_multiple_of(2) || commit_round < 2 {
 516              return Ok(());
 517          }
 518          // If the commit round is at or below the last committed round, return early.
 519          if commit_round <= self.dag.read().last_committed_round() {
 520              return Ok(());
 521          }
 522  
 523          /* Proceeding to check if the leader is ready to be committed. */
 524          trace!("Checking if the leader is ready to be committed for round {commit_round}...");
 525  
 526          // Retrieve the committee lookback for the commit round.
 527          let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
 528              format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
 529          })?;
 530  
 531          // Either retrieve the cached leader or compute it.
 532          let leader = match self.ledger().latest_leader() {
 533              Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
 534              _ => {
 535                  // Compute the leader for the commit round.
 536                  let computed_leader = committee_lookback
 537                      .get_leader(commit_round)
 538                      .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
 539  
 540                  // Cache the computed leader.
 541                  self.ledger().update_latest_leader(commit_round, computed_leader);
 542  
 543                  computed_leader
 544              }
 545          };
 546  
 547          // Retrieve the leader certificate for the commit round.
 548          let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
 549          else {
 550              trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
 551              return Ok(());
 552          };
 553          // Retrieve all of the certificates for the **certificate** round.
 554          let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
 555              format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
 556          })?;
 557  
 558          // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
 559          let certificate_committee_lookback =
 560              self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
 561                  format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
 562              })?;
 563  
 564          // Construct a set over the authors who included the leader's certificate in the certificate round.
 565          let authors = certificates
 566              .values()
 567              .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
 568                  true => Some(c.author()),
 569                  false => None,
 570              })
 571              .collect();
 572          // Check if the leader is ready to be committed.
 573          if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
 574              // If the leader is not ready to be committed, return early.
 575              trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet.");
 576              return Ok(());
 577          }
 578  
 579          if IS_SYNCING {
 580              info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader));
 581          } else {
 582              info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
 583          }
 584  
 585          // Commit the leader certificate, and all previous leader certificates since the last committed round.
 586          self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
 587      }
 588  
 589      /// Commits the leader certificate, and all previous leader certificates since the last committed round.
 590      async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
 591          &self,
 592          leader_certificate: BatchCertificate<N>,
 593      ) -> Result<()> {
 594          #[cfg(debug_assertions)]
 595          trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
 596  
 597          // Fetch the leader round.
 598          let latest_leader_round = leader_certificate.round();
 599          // Determine the list of all previous leader certificates since the last committed round.
 600          // The order of the leader certificates is from **newest** to **oldest**.
 601          let mut leader_certificates = vec![leader_certificate.clone()];
 602          {
 603              // Retrieve the leader round.
 604              let leader_round = leader_certificate.round();
 605  
 606              let mut current_certificate = leader_certificate;
 607              for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
 608              {
 609                  // Retrieve the previous committee for the leader round.
 610                  let previous_committee_lookback =
 611                      self.ledger().get_committee_lookback_for_round(round).with_context(|| {
 612                          format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
 613                      })?;
 614  
 615                  // Either retrieve the cached leader or compute it.
 616                  let leader = match self.ledger().latest_leader() {
 617                      Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
 618                      _ => {
 619                          // Compute the leader for the commit round.
 620                          let computed_leader = previous_committee_lookback
 621                              .get_leader(round)
 622                              .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
 623  
 624                          // Cache the computed leader.
 625                          self.ledger().update_latest_leader(round, computed_leader);
 626  
 627                          computed_leader
 628                      }
 629                  };
 630                  // Retrieve the previous leader certificate.
 631                  let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
 632                  else {
 633                      continue;
 634                  };
 635                  // Determine if there is a path between the previous certificate and the current certificate.
 636                  if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
 637                      // Add the previous leader certificate to the list of certificates to commit.
 638                      leader_certificates.push(previous_certificate.clone());
 639                      // Update the current certificate to the previous leader certificate.
 640                      current_certificate = previous_certificate;
 641                  } else {
 642                      #[cfg(debug_assertions)]
 643                      trace!(
 644                          "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
 645                      );
 646                  }
 647              }
 648          }
 649  
 650          // Iterate over the leader certificates to commit.
 651          for leader_certificate in leader_certificates.into_iter().rev() {
 652              // Retrieve the leader certificate round.
 653              let leader_round = leader_certificate.round();
 654              // Compute the commit subdag.
 655              let commit_subdag = self
 656                  .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate)
 657                  .with_context(|| "BFT failed to order the DAG with DFS")?;
 658              // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
 659              if !IS_SYNCING {
 660                  // Initialize a map for the deduped transmissions.
 661                  let mut transmissions = IndexMap::new();
 662                  // Initialize a map for the deduped transaction ids.
 663                  let mut seen_transaction_ids = IndexSet::new();
 664                  // Initialize a map for the deduped solution ids.
 665                  let mut seen_solution_ids = IndexSet::new();
 666                  // Start from the oldest leader certificate.
 667                  for certificate in commit_subdag.values().flatten() {
 668                      // Retrieve the transmissions.
 669                      for transmission_id in certificate.transmission_ids() {
 670                          // If the transaction ID or solution ID already exists in the map, skip it.
 671                          // Note: This additional check is done to ensure that we do not include duplicate
 672                          // transaction IDs or solution IDs that may have a different transmission ID.
 673                          match transmission_id {
 674                              TransmissionID::Solution(solution_id, _) => {
 675                                  // If the solution already exists, skip it.
 676                                  if seen_solution_ids.contains(&solution_id) {
 677                                      continue;
 678                                  }
 679                              }
 680                              TransmissionID::Transaction(transaction_id, _) => {
 681                                  // If the transaction already exists, skip it.
 682                                  if seen_transaction_ids.contains(transaction_id) {
 683                                      continue;
 684                                  }
 685                              }
 686                              TransmissionID::Ratification => {
 687                                  bail!("Ratifications are currently not supported in the BFT.")
 688                              }
 689                          }
 690                          // If the transmission already exists in the map, skip it.
 691                          if transmissions.contains_key(transmission_id) {
 692                              continue;
 693                          }
 694                          // If the transmission already exists in the ledger, skip it.
 695                          // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
 696                          if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
 697                              continue;
 698                          }
 699                          // Retrieve the transmission.
 700                          let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
 701                              format!(
 702                                  "BFT failed to retrieve transmission '{}.{}' from round {}",
 703                                  fmt_id(transmission_id),
 704                                  fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
 705                                  certificate.round()
 706                              )
 707                          })?;
 708                          // Insert the transaction ID or solution ID into the map.
 709                          match transmission_id {
 710                              TransmissionID::Solution(id, _) => {
 711                                  seen_solution_ids.insert(id);
 712                              }
 713                              TransmissionID::Transaction(id, _) => {
 714                                  seen_transaction_ids.insert(id);
 715                              }
 716                              TransmissionID::Ratification => {}
 717                          }
 718                          // Add the transmission to the set.
 719                          transmissions.insert(*transmission_id, transmission);
 720                      }
 721                  }
 722                  // Trigger consensus, as this will build a new block for the ledger.
 723                  // Construct the subdag.
 724                  let subdag = Subdag::from(commit_subdag.clone())?;
 725                  // Retrieve the anchor round.
 726                  let anchor_round = subdag.anchor_round();
 727                  // Retrieve the number of transmissions.
 728                  let num_transmissions = transmissions.len();
 729                  // Retrieve metadata about the subdag.
 730                  let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
 731  
 732                  // Ensure the subdag anchor round matches the leader round.
 733                  ensure!(
 734                      anchor_round == leader_round,
 735                      "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
 736                  );
 737  
 738                  // Trigger consensus.
 739                  if let Some(consensus_sender) = self.consensus_sender.get() {
 740                      // Initialize a callback sender and receiver.
 741                      let (callback_sender, callback_receiver) = oneshot::channel();
 742                      // Send the subdag and transmissions to consensus.
 743                      consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
 744                      // Await the callback to continue.
 745                      match callback_receiver.await {
 746                          Ok(Ok(())) => (), // continue
 747                          Ok(Err(err)) => {
 748                              let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
 749                              error!("{}", &flatten_error(err));
 750                              return Ok(());
 751                          }
 752                          Err(err) => {
 753                              let err: anyhow::Error = err.into();
 754                              let err =
 755                                  err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
 756                              error!("{}", flatten_error(err));
 757                              return Ok(());
 758                          }
 759                      }
 760                  }
 761  
 762                  info!(
 763                      "\n\nCommitting a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})\n",
 764                  );
 765              }
 766  
 767              // Update the DAG, as the subdag was successfully included into a block.
 768              {
 769                  let mut dag_write = self.dag.write();
 770                  let mut count = 0;
 771                  for certificate in commit_subdag.values().flatten() {
 772                      dag_write.commit(certificate, self.storage().max_gc_rounds());
 773                      count += 1;
 774                  }
 775  
 776                  trace!("Committed {count} certificates to the DAG");
 777              }
 778  
 779              // Update the validator telemetry.
 780              #[cfg(feature = "telemetry")]
 781              self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
 782          }
 783  
 784          // Perform garbage collection based on the latest committed leader round.
 785          // The protocol guarantees that validators commit the same anchors in the same order,
 786          // but they may do so in different chunks of anchors,
 787          // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
 788          // Doing garbage collection at the end of each chunk (as we do here),
 789          // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
 790          // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
 791          // one validator may have more certificates than the other, not yet garbage collected.
 792          // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
 793          // it excludes certificates that are below the GC round,
 794          // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
 795          // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
 796          // so long as garbage collection is done after each chunk.
 797          // If garbage collection were done after each committed certificate,
 798          // that exclusion in `order_dag_with_dfs()` should be unnecessary.
 799          self.storage().garbage_collect_certificates(latest_leader_round);
 800  
 801          Ok(())
 802      }
 803  
 804      /// Returns the subdag of batch certificates to commit.
 805      fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
 806          &self,
 807          leader_certificate: BatchCertificate<N>,
 808      ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
 809          // Initialize a map for the certificates to commit.
 810          let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
 811          // Initialize a set for the already ordered certificates.
 812          let mut already_ordered = HashSet::new();
 813          // Initialize a buffer for the certificates to order.
 814          let mut buffer = vec![leader_certificate];
 815          // Iterate over the certificates to order.
 816          while let Some(certificate) = buffer.pop() {
 817              // Insert the certificate into the map.
 818              commit.entry(certificate.round()).or_default().insert(certificate.clone());
 819  
 820              // Check if the previous certificate is below the GC round.
 821              // This is currently a critical check to prevent forking,
 822              // as explained in the comment at the end of `commit_leader_certificate()`,
 823              // just before the call to garbage collection.
 824              let previous_round = certificate.round().saturating_sub(1);
 825              if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
 826                  continue;
 827              }
 828              // Iterate over the previous certificate IDs.
 829              // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
 830              // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
 831              for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
 832                  // If the previous certificate is already ordered, continue.
 833                  if already_ordered.contains(previous_certificate_id) {
 834                      continue;
 835                  }
 836                  // If the previous certificate was recently committed, continue.
 837                  if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
 838                      continue;
 839                  }
 840                  // If the previous certificate already exists in the ledger, continue.
 841                  if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
 842                      continue;
 843                  }
 844  
 845                  // Retrieve the previous certificate.
 846                  let previous_certificate = {
 847                      // Start by retrieving the previous certificate from the DAG.
 848                      match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
 849                          // If the previous certificate is found, return it.
 850                          Some(previous_certificate) => previous_certificate,
 851                          // If the previous certificate is not found, retrieve it from the storage.
 852                          None => match self.storage().get_certificate(*previous_certificate_id) {
 853                              // If the previous certificate is found, return it.
 854                              Some(previous_certificate) => previous_certificate,
 855                              // Otherwise, the previous certificate is missing, and throw an error.
 856                              None => bail!(
 857                                  "Missing previous certificate {} for round {previous_round}",
 858                                  fmt_id(previous_certificate_id)
 859                              ),
 860                          },
 861                      }
 862                  };
 863                  // Insert the previous certificate into the set of already ordered certificates.
 864                  already_ordered.insert(previous_certificate.id());
 865                  // Insert the previous certificate into the buffer.
 866                  buffer.push(previous_certificate);
 867              }
 868          }
 869          // Ensure we only retain certificates that are above the GC round.
 870          commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
 871          // Return the certificates to commit.
 872          Ok(commit)
 873      }
 874  
 875      /// Returns `true` if there is a path from the previous certificate to the current certificate.
 876      fn is_linked(
 877          &self,
 878          previous_certificate: BatchCertificate<N>,
 879          current_certificate: BatchCertificate<N>,
 880      ) -> Result<bool> {
 881          // Initialize the list containing the traversal.
 882          let mut traversal = vec![current_certificate.clone()];
 883          // Iterate over the rounds from the current certificate to the previous certificate.
 884          for round in (previous_certificate.round()..current_certificate.round()).rev() {
 885              // Retrieve all of the certificates for this past round.
 886              let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
 887                  // This is a critical error, as the traversal should have these certificates.
 888                  // If this error is hit, it is likely that the maximum GC rounds should be increased.
 889                  bail!("BFT failed to retrieve the certificates for past round {round}");
 890              };
 891              // Filter the certificates to only include those that are in the traversal.
 892              traversal = certificates
 893                  .into_values()
 894                  .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
 895                  .collect();
 896          }
 897          Ok(traversal.contains(&previous_certificate))
 898      }
 899  }
 900  
 901  impl<N: Network> BFT<N> {
 902      /// Starts the BFT handlers.
 903      fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
 904          let BFTReceiver {
 905              mut rx_primary_round,
 906              mut rx_primary_certificate,
 907              mut rx_sync_bft_dag_at_bootup,
 908              mut rx_sync_bft,
 909          } = bft_receiver;
 910  
 911          // Process the current round from the primary.
 912          let self_ = self.clone();
 913          self.spawn(async move {
 914              while let Some((current_round, callback)) = rx_primary_round.recv().await {
 915                  callback.send(self_.update_to_next_round(current_round)).ok();
 916              }
 917          });
 918  
 919          // Process the certificate from the primary.
 920          let self_ = self.clone();
 921          self.spawn(async move {
 922              while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
 923                  // Update the DAG with the certificate.
 924                  let result = self_.update_dag::<true, false>(certificate).await;
 925                  // Send the callback **after** updating the DAG.
 926                  // Note: We must await the DAG update before proceeding.
 927                  callback.send(result).ok();
 928              }
 929          });
 930  
 931          // Process the request to sync the BFT DAG at bootup.
 932          let self_ = self.clone();
 933          self.spawn(async move {
 934              while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
 935                  self_.sync_bft_dag_at_bootup(certificates).await;
 936              }
 937          });
 938  
 939          // Handler for new certificates that were fetched by the sync module.
 940          let self_ = self.clone();
 941          self.spawn(async move {
 942              while let Some((certificate, callback)) = rx_sync_bft.recv().await {
 943                  // Update the DAG with the certificate.
 944                  let result = self_.update_dag::<true, true>(certificate).await;
 945                  // Send the callback **after** updating the DAG.
 946                  // Note: We must await the DAG update before proceeding.
 947                  callback.send(result).ok();
 948              }
 949          });
 950      }
 951  
 952      /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
 953      /// already exist in the ledger.
 954      ///
 955      /// This method commits all the certificates into the DAG.
 956      /// Note that there is no need to insert the certificates into the DAG, because these certificates
 957      /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
 958      async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
 959          // Acquire the BFT write lock.
 960          let mut dag = self.dag.write();
 961  
 962          // Commit all the certificates.
 963          for certificate in certificates {
 964              dag.commit(&certificate, self.storage().max_gc_rounds());
 965          }
 966      }
 967  
 968      /// Spawns a task with the given future; it should only be used for long-running tasks.
 969      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
 970          self.handles.lock().push(tokio::spawn(future));
 971      }
 972  
 973      /// Shuts down the BFT.
 974      pub async fn shut_down(&self) {
 975          info!("Shutting down the BFT...");
 976          // Acquire the lock.
 977          let _lock = self.lock.lock().await;
 978          // Shut down the primary.
 979          self.primary.shut_down().await;
 980          // Abort the tasks.
 981          self.handles.lock().iter().for_each(|handle| handle.abort());
 982      }
 983  }
 984  
 985  #[cfg(test)]
 986  mod tests {
 987      use crate::{
 988          BFT,
 989          MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
 990          helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
 991      };
 992  
 993      use alphaos_account::Account;
 994      use alphaos_node_bft_ledger_service::{LedgerService, MockLedgerService};
 995      use alphaos_node_bft_storage_service::BFTMemoryService;
 996      use alphaos_node_sync::BlockSync;
 997      use alphavm::{
 998          console::account::{Address, PrivateKey},
 999          ledger::{
1000              committee::{
1001                  Committee,
1002                  test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
1003              },
1004              narwhal::{
1005                  BatchCertificate,
1006                  batch_certificate::test_helpers::{
1007                      sample_batch_certificate,
1008                      sample_batch_certificate_for_round,
1009                      sample_batch_certificate_for_round_with_committee,
1010                  },
1011              },
1012          },
1013          utilities::TestRng,
1014      };
1015  
1016      use alpha_std::StorageMode;
1017      use anyhow::Result;
1018      use indexmap::{IndexMap, IndexSet};
1019      use std::sync::Arc;
1020  
1021      type CurrentNetwork = alphavm::console::network::MainnetV0;
1022  
1023      /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
1024      fn sample_test_instance(
1025          committee_round: Option<u64>,
1026          max_gc_rounds: u64,
1027          rng: &mut TestRng,
1028      ) -> (
1029          Committee<CurrentNetwork>,
1030          Account<CurrentNetwork>,
1031          Arc<MockLedgerService<CurrentNetwork>>,
1032          Storage<CurrentNetwork>,
1033      ) {
1034          let committee = match committee_round {
1035              Some(round) => sample_committee_for_round(round, rng),
1036              None => sample_committee(rng),
1037          };
1038          let account = Account::new(rng).unwrap();
1039          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1040          let transmissions = Arc::new(BFTMemoryService::new());
1041          let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1042  
1043          (committee, account, ledger, storage)
1044      }
1045  
1046      // Helper function to set up BFT for testing.
1047      fn initialize_bft(
1048          account: Account<CurrentNetwork>,
1049          storage: Storage<CurrentNetwork>,
1050          ledger: Arc<MockLedgerService<CurrentNetwork>>,
1051      ) -> anyhow::Result<BFT<CurrentNetwork>> {
1052          // Create the block synchronization logic.
1053          let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1054          // Initialize the BFT.
1055          BFT::new(
1056              account.clone(),
1057              storage.clone(),
1058              ledger.clone(),
1059              block_sync,
1060              None,
1061              &[],
1062              false,
1063              StorageMode::new_test(None),
1064              None,
1065          )
1066      }
1067  
1068      #[test]
1069      #[tracing_test::traced_test]
1070      fn test_is_leader_quorum_odd() -> Result<()> {
1071          let rng = &mut TestRng::default();
1072  
1073          // Sample batch certificates.
1074          let mut certificates = IndexSet::new();
1075          certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1076          certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1077          certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1078          certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1079  
1080          // Initialize the committee.
1081          let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1082              1,
1083              vec![
1084                  certificates[0].author(),
1085                  certificates[1].author(),
1086                  certificates[2].author(),
1087                  certificates[3].author(),
1088              ],
1089              rng,
1090          );
1091  
1092          // Initialize the ledger.
1093          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1094          // Initialize the storage.
1095          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1096          // Initialize the account.
1097          let account = Account::new(rng)?;
1098          // Initialize the BFT.
1099          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1100          assert!(bft.is_timer_expired());
1101          // Ensure this call succeeds on an odd round.
1102          let result = bft.is_leader_quorum_or_nonleaders_available(1);
1103          // If timer has expired but quorum threshold is not reached, return 'false'.
1104          assert!(!result);
1105          // Insert certificates into storage.
1106          for certificate in certificates.iter() {
1107              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1108          }
1109          // Ensure this call succeeds on an odd round.
1110          let result = bft.is_leader_quorum_or_nonleaders_available(1);
1111          assert!(result); // no previous leader certificate
1112          // Set the leader certificate.
1113          let leader_certificate = sample_batch_certificate(rng);
1114          *bft.leader_certificate.write() = Some(leader_certificate);
1115          // Ensure this call succeeds on an odd round.
1116          let result = bft.is_leader_quorum_or_nonleaders_available(1);
1117          assert!(result); // should now fall through to the end of function
1118  
1119          Ok(())
1120      }
1121  
1122      #[test]
1123      #[tracing_test::traced_test]
1124      fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1125          let rng = &mut TestRng::default();
1126  
1127          // Sample the test instance.
1128          let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1129          assert_eq!(committee.starting_round(), 1);
1130          assert_eq!(storage.current_round(), 1);
1131          assert_eq!(storage.max_gc_rounds(), 10);
1132  
1133          // Set up the BFT logic.
1134          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1135          assert!(bft.is_timer_expired());
1136  
1137          // Store is at round 1, and we are checking for round 2.
1138          // Ensure this call fails on an even round.
1139          let result = bft.is_leader_quorum_or_nonleaders_available(2);
1140          assert!(!result);
1141          Ok(())
1142      }
1143  
1144      #[test]
1145      #[tracing_test::traced_test]
1146      fn test_is_leader_quorum_even() -> Result<()> {
1147          let rng = &mut TestRng::default();
1148  
1149          // Sample the test instance.
1150          let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1151          assert_eq!(committee.starting_round(), 2);
1152          assert_eq!(storage.current_round(), 2);
1153          assert_eq!(storage.max_gc_rounds(), 10);
1154  
1155          // Set up the BFT logic.
1156          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1157          assert!(bft.is_timer_expired());
1158  
1159          // Ensure this call fails on an even round.
1160          let result = bft.is_leader_quorum_or_nonleaders_available(2);
1161          assert!(!result);
1162          Ok(())
1163      }
1164  
1165      #[test]
1166      #[tracing_test::traced_test]
1167      fn test_is_even_round_ready() -> Result<()> {
1168          let rng = &mut TestRng::default();
1169  
1170          // Sample batch certificates.
1171          let mut certificates = IndexSet::new();
1172          certificates.insert(sample_batch_certificate_for_round(2, rng));
1173          certificates.insert(sample_batch_certificate_for_round(2, rng));
1174          certificates.insert(sample_batch_certificate_for_round(2, rng));
1175          certificates.insert(sample_batch_certificate_for_round(2, rng));
1176  
1177          // Initialize the committee.
1178          let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1179              2,
1180              vec![
1181                  certificates[0].author(),
1182                  certificates[1].author(),
1183                  certificates[2].author(),
1184                  certificates[3].author(),
1185              ],
1186              rng,
1187          );
1188  
1189          // Initialize the ledger.
1190          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1191          // Initialize the storage.
1192          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1193          // Initialize the account.
1194          let account = Account::new(rng)?;
1195  
1196          // Set up the BFT logic.
1197          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1198          assert!(bft.is_timer_expired());
1199  
1200          // Set the leader certificate.
1201          let leader_certificate = sample_batch_certificate_for_round(2, rng);
1202          *bft.leader_certificate.write() = Some(leader_certificate);
1203          let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1204          // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1205          assert!(!result);
1206          // Once quorum threshold is reached, we are ready for the next round.
1207          let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1208          assert!(result);
1209  
1210          // Initialize a new BFT.
1211          let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1212          // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1213          let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1214          if !bft_timer.is_timer_expired() {
1215              assert!(!result);
1216          }
1217          // Wait for the timer to expire.
1218          let leader_certificate_timeout =
1219              std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1220          std::thread::sleep(leader_certificate_timeout);
1221          // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1222          let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1223          if bft_timer.is_timer_expired() {
1224              assert!(result);
1225          } else {
1226              assert!(!result);
1227          }
1228  
1229          Ok(())
1230      }
1231  
1232      #[test]
1233      #[tracing_test::traced_test]
1234      fn test_update_leader_certificate_odd() -> Result<()> {
1235          let rng = &mut TestRng::default();
1236  
1237          // Sample the test instance.
1238          let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1239          assert_eq!(storage.max_gc_rounds(), 10);
1240  
1241          // Initialize the BFT.
1242          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1243          assert!(bft.is_timer_expired());
1244  
1245          // Ensure this call fails on an odd round.
1246          let result = bft.update_leader_certificate_to_even_round(1);
1247          assert!(!result);
1248          Ok(())
1249      }
1250  
1251      #[test]
1252      #[tracing_test::traced_test]
1253      fn test_update_leader_certificate_bad_round() -> Result<()> {
1254          let rng = &mut TestRng::default();
1255  
1256          // Sample the test instance.
1257          let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1258          assert_eq!(storage.max_gc_rounds(), 10);
1259  
1260          // Initialize the BFT.
1261          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1262  
1263          // Ensure this call succeeds on an even round.
1264          let result = bft.update_leader_certificate_to_even_round(6);
1265          assert!(!result);
1266          Ok(())
1267      }
1268  
1269      #[test]
1270      #[tracing_test::traced_test]
1271      fn test_update_leader_certificate_even() -> Result<()> {
1272          let rng = &mut TestRng::default();
1273  
1274          // Set the current round.
1275          let current_round = 3;
1276  
1277          // Sample the certificates.
1278          let (_, certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1279              current_round,
1280              rng,
1281          );
1282  
1283          // Initialize the committee.
1284          let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1285              2,
1286              vec![
1287                  certificates[0].author(),
1288                  certificates[1].author(),
1289                  certificates[2].author(),
1290                  certificates[3].author(),
1291              ],
1292              rng,
1293          );
1294  
1295          // Initialize the ledger.
1296          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1297  
1298          // Initialize the storage.
1299          let transmissions = Arc::new(BFTMemoryService::new());
1300          let storage = Storage::new(ledger.clone(), transmissions, 10);
1301          storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1302          storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1303          storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1304          storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1305          assert_eq!(storage.current_round(), 2);
1306  
1307          // Retrieve the leader certificate.
1308          let leader = committee.get_leader(2).unwrap();
1309          let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1310  
1311          // Initialize the BFT.
1312          let account = Account::new(rng)?;
1313          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1314  
1315          // Set the leader certificate.
1316          *bft.leader_certificate.write() = Some(leader_certificate);
1317  
1318          // Update the leader certificate.
1319          // Ensure this call succeeds on an even round.
1320          let result = bft.update_leader_certificate_to_even_round(2);
1321          assert!(result);
1322  
1323          Ok(())
1324      }
1325  
1326      #[tokio::test]
1327      #[tracing_test::traced_test]
1328      async fn test_order_dag_with_dfs() -> Result<()> {
1329          let rng = &mut TestRng::default();
1330  
1331          // Sample the test instance.
1332          let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1333  
1334          // Initialize the round parameters.
1335          let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1336          let current_round = previous_round + 1;
1337  
1338          // Sample the current certificate and previous certificates.
1339          let (certificate, previous_certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1340              current_round,
1341              rng,
1342          );
1343  
1344          /* Test GC */
1345  
1346          // Ensure the function succeeds in returning only certificates above GC.
1347          {
1348              // Initialize the storage.
1349              let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1350              // Initialize the BFT.
1351              let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1352  
1353              // Insert a mock DAG in the BFT.
1354              *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1355  
1356              // Insert the previous certificates into the BFT.
1357              for certificate in previous_certificates.clone() {
1358                  assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1359              }
1360  
1361              // Ensure this call succeeds and returns all given certificates.
1362              let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1363              assert!(result.is_ok());
1364              let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1365              assert_eq!(candidate_certificates.len(), 1);
1366              let expected_certificates = vec![certificate.clone()];
1367              assert_eq!(
1368                  candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1369                  expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1370              );
1371              assert_eq!(candidate_certificates, expected_certificates);
1372          }
1373  
1374          /* Test normal case */
1375  
1376          // Ensure the function succeeds in returning all given certificates.
1377          {
1378              // Initialize the storage.
1379              let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1380              // Initialize the BFT.
1381              let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1382  
1383              // Insert a mock DAG in the BFT.
1384              *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1385  
1386              // Insert the previous certificates into the BFT.
1387              for certificate in previous_certificates.clone() {
1388                  assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1389              }
1390  
1391              // Ensure this call succeeds and returns all given certificates.
1392              let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1393              assert!(result.is_ok());
1394              let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1395              assert_eq!(candidate_certificates.len(), 5);
1396              let expected_certificates = vec![
1397                  previous_certificates[0].clone(),
1398                  previous_certificates[1].clone(),
1399                  previous_certificates[2].clone(),
1400                  previous_certificates[3].clone(),
1401                  certificate,
1402              ];
1403              assert_eq!(
1404                  candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1405                  expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1406              );
1407              assert_eq!(candidate_certificates, expected_certificates);
1408          }
1409  
1410          Ok(())
1411      }
1412  
1413      #[test]
1414      #[tracing_test::traced_test]
1415      fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1416          let rng = &mut TestRng::default();
1417  
1418          // Sample the test instance.
1419          let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1420          assert_eq!(committee.starting_round(), 1);
1421          assert_eq!(storage.current_round(), 1);
1422          assert_eq!(storage.max_gc_rounds(), 1);
1423  
1424          // Initialize the round parameters.
1425          let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1426          let current_round = previous_round + 1;
1427  
1428          // Sample the current certificate and previous certificates.
1429          let (certificate, previous_certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1430              current_round,
1431              rng,
1432          );
1433          // Construct the previous certificate IDs.
1434          let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1435  
1436          /* Test missing previous certificate. */
1437  
1438          // Initialize the BFT.
1439          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1440  
1441          // The expected error message.
1442          let error_msg = format!(
1443              "Missing previous certificate {} for round {previous_round}",
1444              crate::helpers::fmt_id(previous_certificate_ids[3]),
1445          );
1446  
1447          // Ensure this call fails on a missing previous certificate.
1448          let result = bft.order_dag_with_dfs::<false>(certificate);
1449          assert!(result.is_err());
1450          assert_eq!(result.unwrap_err().to_string(), error_msg);
1451          Ok(())
1452      }
1453  
1454      #[tokio::test]
1455      async fn test_bft_gc_on_commit() -> Result<()> {
1456          let rng = &mut TestRng::default();
1457  
1458          // Initialize the round parameters.
1459          let max_gc_rounds = 1;
1460          let committee_round = 0;
1461          let commit_round = 2;
1462          let current_round = commit_round + 1;
1463  
1464          // Sample the certificates.
1465          let (_, certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1466              current_round,
1467              rng,
1468          );
1469  
1470          // Initialize the committee.
1471          let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1472              committee_round,
1473              vec![
1474                  certificates[0].author(),
1475                  certificates[1].author(),
1476                  certificates[2].author(),
1477                  certificates[3].author(),
1478              ],
1479              rng,
1480          );
1481  
1482          // Initialize the ledger.
1483          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1484  
1485          // Initialize the storage.
1486          let transmissions = Arc::new(BFTMemoryService::new());
1487          let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1488          // Insert the certificates into the storage.
1489          for certificate in certificates.iter() {
1490              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1491          }
1492  
1493          // Get the leader certificate.
1494          let leader = committee.get_leader(commit_round).unwrap();
1495          let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1496  
1497          // Initialize the BFT.
1498          let account = Account::new(rng)?;
1499          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1500  
1501          // Create an empty mock DAG with last committed round set to `commit_round`.
1502          *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1503  
1504          // Ensure that the `gc_round` has not been updated yet.
1505          assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1506  
1507          // Insert the certificates into the BFT.
1508          for certificate in certificates {
1509              assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1510          }
1511  
1512          // Commit the leader certificate.
1513          bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1514  
1515          // Ensure that the `gc_round` has been updated.
1516          assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1517  
1518          Ok(())
1519      }
1520  
1521      #[tokio::test]
1522      #[tracing_test::traced_test]
1523      async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1524          let rng = &mut TestRng::default();
1525  
1526          // Initialize the round parameters.
1527          let max_gc_rounds = 1;
1528          let committee_round = 0;
1529          let commit_round = 2;
1530          let current_round = commit_round + 1;
1531  
1532          // Sample the current certificate and previous certificates.
1533          let (_, certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1534              current_round,
1535              rng,
1536          );
1537  
1538          // Initialize the committee.
1539          let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1540              committee_round,
1541              vec![
1542                  certificates[0].author(),
1543                  certificates[1].author(),
1544                  certificates[2].author(),
1545                  certificates[3].author(),
1546              ],
1547              rng,
1548          );
1549  
1550          // Initialize the ledger.
1551          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1552  
1553          // Initialize the storage.
1554          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1555          // Insert the certificates into the storage.
1556          for certificate in certificates.iter() {
1557              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1558          }
1559  
1560          // Get the leader certificate.
1561          let leader = committee.get_leader(commit_round).unwrap();
1562          let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1563  
1564          // Initialize the BFT.
1565          let account = Account::new(rng)?;
1566          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1567  
1568          // Insert a mock DAG in the BFT.
1569          *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1570  
1571          // Insert the previous certificates into the BFT.
1572          for certificate in certificates.clone() {
1573              assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1574          }
1575  
1576          // Commit the leader certificate.
1577          bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1578  
1579          // Simulate a bootup of the BFT.
1580  
1581          // Initialize a new instance of storage.
1582          let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1583          // Initialize a new instance of BFT.
1584          let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1585  
1586          // Sync the BFT DAG at bootup.
1587          bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1588  
1589          // Check that the BFT starts from the same last committed round.
1590          assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1591  
1592          // Ensure that both BFTs have committed the leader certificate.
1593          assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1594          assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1595  
1596          // Check the state of the bootup BFT.
1597          for certificate in certificates {
1598              let certificate_round = certificate.round();
1599              let certificate_id = certificate.id();
1600              // Check that the bootup BFT has committed the certificates.
1601              assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1602              // Check that the bootup BFT does not contain the certificates in its graph, because
1603              // it should not need to order them again in subsequent subdags.
1604              assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1605          }
1606  
1607          Ok(())
1608      }
1609  
1610      #[tokio::test]
1611      #[tracing_test::traced_test]
1612      async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1613          /*
1614          1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1615          2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1616          3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1617          */
1618  
1619          let rng = &mut TestRng::default();
1620  
1621          // Initialize the round parameters.
1622          let max_gc_rounds = alphavm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1623          let committee_round = 0;
1624          let commit_round = 2;
1625          let current_round = commit_round + 1;
1626          let next_round = current_round + 1;
1627  
1628          // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1629          let (round_to_certificates_map, committee) = {
1630              let private_keys = vec![
1631                  PrivateKey::new(rng).unwrap(),
1632                  PrivateKey::new(rng).unwrap(),
1633                  PrivateKey::new(rng).unwrap(),
1634                  PrivateKey::new(rng).unwrap(),
1635              ];
1636              let addresses = vec![
1637                  Address::try_from(private_keys[0])?,
1638                  Address::try_from(private_keys[1])?,
1639                  Address::try_from(private_keys[2])?,
1640                  Address::try_from(private_keys[3])?,
1641              ];
1642              let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1643                  committee_round,
1644                  addresses,
1645                  rng,
1646              );
1647              // Initialize a mapping from the round number to the set of batch certificates in the round.
1648              let mut round_to_certificates_map: IndexMap<
1649                  u64,
1650                  IndexSet<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1651              > = IndexMap::new();
1652              let mut previous_certificates = IndexSet::with_capacity(4);
1653              // Initialize the genesis batch certificates.
1654              for _ in 0..4 {
1655                  previous_certificates.insert(sample_batch_certificate(rng));
1656              }
1657              for round in 0..commit_round + 3 {
1658                  let mut current_certificates = IndexSet::new();
1659                  let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1660                      IndexSet::new()
1661                  } else {
1662                      previous_certificates.iter().map(|c| c.id()).collect()
1663                  };
1664                  let transmission_ids =
1665                      alphavm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1666                          .into_iter()
1667                          .collect::<IndexSet<_>>();
1668                  let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1669                  let committee_id = committee.id();
1670                  for (i, private_key_1) in private_keys.iter().enumerate() {
1671                      let batch_header = alphavm::ledger::narwhal::BatchHeader::new(
1672                          private_key_1,
1673                          round,
1674                          timestamp,
1675                          committee_id,
1676                          transmission_ids.clone(),
1677                          previous_certificate_ids.clone(),
1678                          rng,
1679                      )
1680                      .unwrap();
1681                      let mut signatures = IndexSet::with_capacity(4);
1682                      for (j, private_key_2) in private_keys.iter().enumerate() {
1683                          if i != j {
1684                              signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1685                          }
1686                      }
1687                      let certificate =
1688                          alphavm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1689                      current_certificates.insert(certificate);
1690                  }
1691                  // Update the mapping.
1692                  round_to_certificates_map.insert(round, current_certificates.clone());
1693                  previous_certificates = current_certificates.clone();
1694              }
1695              (round_to_certificates_map, committee)
1696          };
1697  
1698          // Initialize the ledger.
1699          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1700          // Initialize the storage.
1701          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1702          // Get the leaders for the next 2 commit rounds.
1703          let leader = committee.get_leader(commit_round).unwrap();
1704          let next_leader = committee.get_leader(next_round).unwrap();
1705          // Insert the pre shutdown certificates into the storage.
1706          let mut pre_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1707          for i in 1..=commit_round {
1708              let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1709              if i == commit_round {
1710                  // Only insert the leader certificate for the commit round.
1711                  let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1712                  if let Some(c) = leader_certificate {
1713                      pre_shutdown_certificates.push(c.clone());
1714                  }
1715                  continue;
1716              }
1717              pre_shutdown_certificates.extend(certificates);
1718          }
1719          for certificate in pre_shutdown_certificates.iter() {
1720              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1721          }
1722          // Insert the post shutdown certificates into the storage.
1723          let mut post_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1724              Vec::new();
1725          for j in commit_round..=commit_round + 2 {
1726              let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1727              post_shutdown_certificates.extend(certificate);
1728          }
1729          for certificate in post_shutdown_certificates.iter() {
1730              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1731          }
1732          // Get the leader certificates.
1733          let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1734          let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1735  
1736          // Initialize the BFT without bootup.
1737          let account = Account::new(rng)?;
1738          let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1739  
1740          // Insert a mock DAG in the BFT without bootup.
1741          *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1742  
1743          // Insert the certificates into the BFT without bootup.
1744          for certificate in pre_shutdown_certificates.clone() {
1745              assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1746          }
1747  
1748          // Insert the post shutdown certificates into the BFT without bootup.
1749          for certificate in post_shutdown_certificates.clone() {
1750              assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1751          }
1752          // Commit the second leader certificate.
1753          let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1754          let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1755          bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1756  
1757          // Simulate a bootup of the BFT.
1758  
1759          // Initialize a new instance of storage.
1760          let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1761  
1762          // Initialize a new instance of BFT with bootup.
1763          let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1764  
1765          // Sync the BFT DAG at bootup.
1766          bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1767  
1768          // Insert the post shutdown certificates to the storage and BFT with bootup.
1769          for certificate in post_shutdown_certificates.iter() {
1770              bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1771          }
1772          for certificate in post_shutdown_certificates.clone() {
1773              assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1774          }
1775          // Commit the second leader certificate.
1776          let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1777          let commit_subdag_metadata_bootup =
1778              commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1779          let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1780          bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1781  
1782          // Check that the final state of both BFTs is the same.
1783  
1784          // Check that both BFTs start from the same last committed round.
1785          assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1786  
1787          // Ensure that both BFTs have committed the leader certificates.
1788          assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1789          assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1790          assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1791          assert!(
1792              bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1793          );
1794  
1795          // Check that the bootup BFT has committed the pre shutdown certificates.
1796          for certificate in pre_shutdown_certificates.clone() {
1797              let certificate_round = certificate.round();
1798              let certificate_id = certificate.id();
1799              // Check that both BFTs have committed the certificates.
1800              assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1801              assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1802              // Check that the bootup BFT does not contain the certificates in its graph, because
1803              // it should not need to order them again in subsequent subdags.
1804              assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1805              assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1806          }
1807  
1808          // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1809          for certificate in committed_certificates_bootup.clone() {
1810              let certificate_round = certificate.round();
1811              let certificate_id = certificate.id();
1812              // Check that the both BFTs have committed the certificates.
1813              assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1814              assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1815              // Check that the bootup BFT does not contain the certificates in its graph, because
1816              // it should not need to order them again in subsequent subdags.
1817              assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1818              assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1819          }
1820  
1821          // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1822          assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1823  
1824          Ok(())
1825      }
1826  
1827      #[tokio::test]
1828      #[tracing_test::traced_test]
1829      async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1830          /*
1831          1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1832          2. Add post shutdown certificates to the bootup BFT.
1833          2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1834          */
1835  
1836          let rng = &mut TestRng::default();
1837  
1838          // Initialize the round parameters.
1839          let max_gc_rounds = alphavm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1840          let committee_round = 0;
1841          let commit_round = 2;
1842          let current_round = commit_round + 1;
1843          let next_round = current_round + 1;
1844  
1845          // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1846          let (round_to_certificates_map, committee) = {
1847              let private_keys = vec![
1848                  PrivateKey::new(rng).unwrap(),
1849                  PrivateKey::new(rng).unwrap(),
1850                  PrivateKey::new(rng).unwrap(),
1851                  PrivateKey::new(rng).unwrap(),
1852              ];
1853              let addresses = vec![
1854                  Address::try_from(private_keys[0])?,
1855                  Address::try_from(private_keys[1])?,
1856                  Address::try_from(private_keys[2])?,
1857                  Address::try_from(private_keys[3])?,
1858              ];
1859              let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1860                  committee_round,
1861                  addresses,
1862                  rng,
1863              );
1864              // Initialize a mapping from the round number to the set of batch certificates in the round.
1865              let mut round_to_certificates_map: IndexMap<
1866                  u64,
1867                  IndexSet<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1868              > = IndexMap::new();
1869              let mut previous_certificates = IndexSet::with_capacity(4);
1870              // Initialize the genesis batch certificates.
1871              for _ in 0..4 {
1872                  previous_certificates.insert(sample_batch_certificate(rng));
1873              }
1874              for round in 0..=commit_round + 2 {
1875                  let mut current_certificates = IndexSet::new();
1876                  let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1877                      IndexSet::new()
1878                  } else {
1879                      previous_certificates.iter().map(|c| c.id()).collect()
1880                  };
1881                  let transmission_ids =
1882                      alphavm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1883                          .into_iter()
1884                          .collect::<IndexSet<_>>();
1885                  let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1886                  let committee_id = committee.id();
1887                  for (i, private_key_1) in private_keys.iter().enumerate() {
1888                      let batch_header = alphavm::ledger::narwhal::BatchHeader::new(
1889                          private_key_1,
1890                          round,
1891                          timestamp,
1892                          committee_id,
1893                          transmission_ids.clone(),
1894                          previous_certificate_ids.clone(),
1895                          rng,
1896                      )
1897                      .unwrap();
1898                      let mut signatures = IndexSet::with_capacity(4);
1899                      for (j, private_key_2) in private_keys.iter().enumerate() {
1900                          if i != j {
1901                              signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1902                          }
1903                      }
1904                      let certificate =
1905                          alphavm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1906                      current_certificates.insert(certificate);
1907                  }
1908                  // Update the mapping.
1909                  round_to_certificates_map.insert(round, current_certificates.clone());
1910                  previous_certificates = current_certificates.clone();
1911              }
1912              (round_to_certificates_map, committee)
1913          };
1914  
1915          // Initialize the ledger.
1916          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1917          // Initialize the storage.
1918          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1919          // Get the leaders for the next 2 commit rounds.
1920          let leader = committee.get_leader(commit_round).unwrap();
1921          let next_leader = committee.get_leader(next_round).unwrap();
1922          // Insert the pre shutdown certificates into the storage.
1923          let mut pre_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1924          for i in 1..=commit_round {
1925              let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1926              if i == commit_round {
1927                  // Only insert the leader certificate for the commit round.
1928                  let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1929                  if let Some(c) = leader_certificate {
1930                      pre_shutdown_certificates.push(c.clone());
1931                  }
1932                  continue;
1933              }
1934              pre_shutdown_certificates.extend(certificates);
1935          }
1936          for certificate in pre_shutdown_certificates.iter() {
1937              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1938          }
1939          // Initialize the bootup BFT.
1940          let account = Account::new(rng)?;
1941          let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1942  
1943          // Insert a mock DAG in the BFT without bootup.
1944          *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1945          // Sync the BFT DAG at bootup.
1946          bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1947  
1948          // Insert the post shutdown certificates into the storage.
1949          let mut post_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1950              Vec::new();
1951          for j in commit_round..=commit_round + 2 {
1952              let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1953              post_shutdown_certificates.extend(certificate);
1954          }
1955          for certificate in post_shutdown_certificates.iter() {
1956              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1957          }
1958  
1959          // Insert the post shutdown certificates into the DAG.
1960          for certificate in post_shutdown_certificates.clone() {
1961              assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1962          }
1963  
1964          // Get the next leader certificate to commit.
1965          let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1966          let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1967          let committed_certificates = commit_subdag.values().flatten();
1968  
1969          // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1970          for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1971              for committed_certificate in committed_certificates.clone() {
1972                  assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1973              }
1974          }
1975          Ok(())
1976      }
1977  
1978      /// Tests that a leader certificate can be committed by sufficient endorsements in a succeeding leader certificate.
1979      #[test_log::test(tokio::test)]
1980      async fn test_commit_via_is_linked() {
1981          let rng = &mut TestRng::default();
1982  
1983          let committee_round = 0;
1984          let leader_round_1 = 2;
1985          let leader_round_2 = 4; // subsequent even round
1986          let max_gc_rounds = 50;
1987  
1988          // Create a committee with four members.
1989          let num_authors = 4;
1990          let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
1991          let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
1992  
1993          let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
1994          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1995          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1996          let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
1997  
1998          let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
1999  
2000          // Round 1
2001          let round1_certs: IndexSet<_> = (0..num_authors)
2002              .map(|idx| {
2003                  let author = &private_keys[idx];
2004                  let endorsements: Vec<_> = private_keys
2005                      .iter()
2006                      .enumerate()
2007                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2008                      .collect();
2009  
2010                  sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2011              })
2012              .collect();
2013          certificates_by_round.insert(1, round1_certs.clone());
2014  
2015          let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2016          let mut leader1_certificate = None;
2017  
2018          let round2_certs: IndexSet<_> = (0..num_authors)
2019              .map(|idx| {
2020                  let author = &private_keys[idx];
2021                  let endorsements: Vec<_> = private_keys
2022                      .iter()
2023                      .enumerate()
2024                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2025                      .collect();
2026                  let cert = sample_batch_certificate_for_round_with_committee(
2027                      leader_round_1,
2028                      round1_certs.iter().map(|c| c.id()).collect(),
2029                      author,
2030                      &endorsements[..],
2031                      rng,
2032                  );
2033  
2034                  if cert.author() == leader1 {
2035                      leader1_certificate = Some(cert.clone());
2036                  }
2037                  cert
2038              })
2039              .collect();
2040          certificates_by_round.insert(leader_round_1, round2_certs.clone());
2041  
2042          let round3_certs: IndexSet<_> = (0..num_authors)
2043              .map(|idx| {
2044                  let author = &private_keys[idx];
2045                  let endorsements: Vec<_> = private_keys
2046                      .iter()
2047                      .enumerate()
2048                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2049                      .collect();
2050  
2051                  let previous_certificate_ids: IndexSet<_> = round2_certs
2052                      .iter()
2053                      .filter_map(|cert| {
2054                          // Only have the leader endorse the previous round's leader certificate.
2055                          if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2056                      })
2057                      .collect();
2058  
2059                  sample_batch_certificate_for_round_with_committee(
2060                      leader_round_1 + 1,
2061                      previous_certificate_ids,
2062                      author,
2063                      &endorsements[..],
2064                      rng,
2065                  )
2066              })
2067              .collect();
2068          certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2069  
2070          // Ensure the first leader's certificate is not committed yet.
2071          let leader_certificate_1 = leader1_certificate.unwrap();
2072          assert!(
2073              !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2074              "Leader certificate 1 should not be committed yet"
2075          );
2076          assert_eq!(bft.dag.read().last_committed_round(), 0);
2077  
2078          let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2079          let round4_certs: IndexSet<_> = (0..num_authors)
2080              .map(|idx| {
2081                  let endorsements: Vec<_> = private_keys
2082                      .iter()
2083                      .enumerate()
2084                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2085                      .collect();
2086  
2087                  sample_batch_certificate_for_round_with_committee(
2088                      leader_round_2,
2089                      round3_certs.iter().map(|c| c.id()).collect(),
2090                      &private_keys[idx],
2091                      &endorsements[..],
2092                      rng,
2093                  )
2094              })
2095              .collect();
2096          certificates_by_round.insert(leader_round_2, round4_certs.clone());
2097  
2098          // Insert all certificates into the storage and DAG.
2099          for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2100              storage.testing_only_insert_certificate_testing_only(certificate.clone());
2101              bft.update_dag::<false, false>(certificate).await.unwrap();
2102          }
2103  
2104          let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2105  
2106          assert!(
2107              bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2108              "Leader certificate 1 should be linked to leader certificate 2"
2109          );
2110  
2111          // Explicitely commit leader certificate 2.
2112          bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2113  
2114          // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2115          assert!(
2116              bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2117              "Leader certificate for round 2 should be committed when committing at round 4"
2118          );
2119  
2120          // Leader certificate 2 should be committed as the above call was successful.
2121          assert!(
2122              bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2123              "Leader certificate for round 4 should be committed"
2124          );
2125  
2126          assert_eq!(bft.dag.read().last_committed_round(), 4);
2127      }
2128  
2129      #[test_log::test(tokio::test)]
2130      async fn test_commit_via_is_linked_with_skipped_anchor() {
2131          let rng = &mut TestRng::default();
2132  
2133          let committee_round = 0;
2134          let leader_round_1 = 2;
2135          let leader_round_2 = 4;
2136          let max_gc_rounds = 50;
2137  
2138          let num_authors = 4;
2139          let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2140          let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2141  
2142          let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2143          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2144          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2145          let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2146  
2147          let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2148  
2149          // Round 1
2150          let round1_certs: IndexSet<_> = (0..num_authors)
2151              .map(|idx| {
2152                  let author = &private_keys[idx];
2153                  let endorsements: Vec<_> = private_keys
2154                      .iter()
2155                      .enumerate()
2156                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2157                      .collect();
2158  
2159                  sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2160              })
2161              .collect();
2162          certificates_by_round.insert(1, round1_certs.clone());
2163  
2164          let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2165          let mut leader1_certificate = None;
2166  
2167          let round2_certs: IndexSet<_> = (0..num_authors)
2168              .map(|idx| {
2169                  let author = &private_keys[idx];
2170                  let endorsements: Vec<_> = private_keys
2171                      .iter()
2172                      .enumerate()
2173                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2174                      .collect();
2175                  let cert = sample_batch_certificate_for_round_with_committee(
2176                      leader_round_1,
2177                      round1_certs.iter().map(|c| c.id()).collect(),
2178                      author,
2179                      &endorsements[..],
2180                      rng,
2181                  );
2182  
2183                  if cert.author() == leader1 {
2184                      leader1_certificate = Some(cert.clone());
2185                  }
2186                  cert
2187              })
2188              .collect();
2189          certificates_by_round.insert(leader_round_1, round2_certs.clone());
2190  
2191          let round3_certs: IndexSet<_> = (0..num_authors)
2192              .map(|idx| {
2193                  let author = &private_keys[idx];
2194                  let endorsements: Vec<_> = private_keys
2195                      .iter()
2196                      .enumerate()
2197                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2198                      .collect();
2199  
2200                  let previous_certificate_ids: IndexSet<_> = round2_certs
2201                      .iter()
2202                      .filter_map(|cert| {
2203                          // Only have the leader endorse the previous round's leader certificate.
2204                          if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2205                      })
2206                      .collect();
2207  
2208                  sample_batch_certificate_for_round_with_committee(
2209                      leader_round_1 + 1,
2210                      previous_certificate_ids,
2211                      author,
2212                      &endorsements[..],
2213                      rng,
2214                  )
2215              })
2216              .collect();
2217          certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2218  
2219          // Ensure the first leader's certificate is not committed yet.
2220          let leader_certificate_1 = leader1_certificate.unwrap();
2221          assert!(
2222              !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2223              "Leader certificate 1 should not be committed yet"
2224          );
2225  
2226          let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2227          let round4_certs: IndexSet<_> = (0..num_authors)
2228              .map(|idx| {
2229                  let endorsements: Vec<_> = private_keys
2230                      .iter()
2231                      .enumerate()
2232                      .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2233                      .collect();
2234  
2235                  // Do not create a path to the previous leader certificate.
2236                  let previous_certificate_ids: IndexSet<_> = round3_certs
2237                      .iter()
2238                      .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2239                      .collect();
2240  
2241                  sample_batch_certificate_for_round_with_committee(
2242                      leader_round_2,
2243                      previous_certificate_ids,
2244                      &private_keys[idx],
2245                      &endorsements[..],
2246                      rng,
2247                  )
2248              })
2249              .collect();
2250          certificates_by_round.insert(leader_round_2, round4_certs.clone());
2251  
2252          // Insert all certificates into the storage and DAG.
2253          for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2254              storage.testing_only_insert_certificate_testing_only(certificate.clone());
2255              bft.update_dag::<false, false>(certificate).await.unwrap();
2256          }
2257  
2258          let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2259  
2260          assert!(
2261              !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2262              "Leader certificate 1 should not be linked to leader certificate 2"
2263          );
2264          assert_eq!(bft.dag.read().last_committed_round(), 0);
2265  
2266          // Explicitely commit leader certificate 2.
2267          bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2268  
2269          // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2270          assert!(
2271              !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2272              "Leader certificate for round 2 should not be committed when committing at round 4"
2273          );
2274  
2275          // Leader certificate 2 should be committed as the above call was successful.
2276          assert!(
2277              bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2278              "Leader certificate for round 4 should be committed"
2279          );
2280          assert_eq!(bft.dag.read().last_committed_round(), 4);
2281      }
2282  }