bft.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{ 17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, 18 Primary, 19 helpers::{ 20 BFTReceiver, 21 ConsensusSender, 22 DAG, 23 PrimaryReceiver, 24 PrimarySender, 25 Storage, 26 fmt_id, 27 init_bft_channels, 28 now, 29 }, 30 }; 31 use alphaos_account::Account; 32 use alphaos_node_bft_ledger_service::LedgerService; 33 use alphaos_node_sync::{BlockSync, Ping}; 34 use alphavm::{ 35 console::account::Address, 36 ledger::{ 37 block::Transaction, 38 committee::Committee, 39 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID}, 40 puzzle::{Solution, SolutionID}, 41 }, 42 prelude::{Field, Network, Result, bail, ensure}, 43 utilities::flatten_error, 44 }; 45 46 use alpha_std::StorageMode; 47 use anyhow::Context; 48 use colored::Colorize; 49 use indexmap::{IndexMap, IndexSet}; 50 #[cfg(feature = "locktick")] 51 use locktick::{ 52 parking_lot::{Mutex, RwLock}, 53 tokio::Mutex as TMutex, 54 }; 55 #[cfg(not(feature = "locktick"))] 56 use parking_lot::{Mutex, RwLock}; 57 use std::{ 58 collections::{BTreeMap, HashSet}, 59 future::Future, 60 net::SocketAddr, 61 sync::{ 62 Arc, 63 atomic::{AtomicI64, Ordering}, 64 }, 65 }; 66 #[cfg(not(feature = "locktick"))] 67 use tokio::sync::Mutex as TMutex; 68 use tokio::{ 69 sync::{OnceCell, oneshot}, 70 task::JoinHandle, 71 }; 72 73 #[derive(Clone)] 74 pub struct BFT<N: Network> { 75 /// The primary for this node. 76 primary: Primary<N>, 77 /// The DAG of batches from which we build the blockchain. 78 dag: Arc<RwLock<DAG<N>>>, 79 /// The batch certificate of the leader from the current even round, if one was present. 80 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>, 81 /// The timer for the leader certificate to be received. 82 leader_certificate_timer: Arc<AtomicI64>, 83 /// The consensus sender. 84 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>, 85 /// Handles for all spawned tasks. 86 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 87 /// The BFT lock. 88 lock: Arc<TMutex<()>>, 89 } 90 91 impl<N: Network> BFT<N> { 92 /// Initializes a new instance of the BFT. 93 #[allow(clippy::too_many_arguments)] 94 pub fn new( 95 account: Account<N>, 96 storage: Storage<N>, 97 ledger: Arc<dyn LedgerService<N>>, 98 block_sync: Arc<BlockSync<N>>, 99 ip: Option<SocketAddr>, 100 trusted_validators: &[SocketAddr], 101 trusted_peers_only: bool, 102 storage_mode: StorageMode, 103 dev: Option<u16>, 104 ) -> Result<Self> { 105 Ok(Self { 106 primary: Primary::new( 107 account, 108 storage, 109 ledger, 110 block_sync, 111 ip, 112 trusted_validators, 113 trusted_peers_only, 114 storage_mode, 115 dev, 116 )?, 117 dag: Default::default(), 118 leader_certificate: Default::default(), 119 leader_certificate_timer: Default::default(), 120 consensus_sender: Default::default(), 121 handles: Default::default(), 122 lock: Default::default(), 123 }) 124 } 125 126 /// Run the BFT instance. 127 /// 128 /// This will return as soon as all required tasks are spawned. 129 /// The function must not be called more than once per instance. 130 pub async fn run( 131 &mut self, 132 ping: Option<Arc<Ping<N>>>, 133 consensus_sender: Option<ConsensusSender<N>>, 134 primary_sender: PrimarySender<N>, 135 primary_receiver: PrimaryReceiver<N>, 136 ) -> Result<()> { 137 info!("Starting the BFT instance..."); 138 // Initialize the BFT channels. 139 let (bft_sender, bft_receiver) = init_bft_channels::<N>(); 140 // First, start the BFT handlers. 141 self.start_handlers(bft_receiver); 142 // Next, run the primary instance. 143 self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?; 144 // Lastly, set the consensus sender. 145 // Note: This ensures during initial syncing, that the BFT does not advance the ledger. 146 if let Some(consensus_sender) = consensus_sender { 147 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set"); 148 } 149 Ok(()) 150 } 151 152 /// Returns `true` if the primary is synced. 153 pub fn is_synced(&self) -> bool { 154 self.primary.is_synced() 155 } 156 157 /// Returns the primary. 158 pub const fn primary(&self) -> &Primary<N> { 159 &self.primary 160 } 161 162 /// Returns the storage. 163 pub const fn storage(&self) -> &Storage<N> { 164 self.primary.storage() 165 } 166 167 /// Returns the ledger. 168 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> { 169 self.primary.ledger() 170 } 171 172 /// Returns the leader of the current even round, if one was present. 173 pub fn leader(&self) -> Option<Address<N>> { 174 self.leader_certificate.read().as_ref().map(|certificate| certificate.author()) 175 } 176 177 /// Returns the certificate of the leader from the current even round, if one was present. 178 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> { 179 &self.leader_certificate 180 } 181 } 182 183 impl<N: Network> BFT<N> { 184 /// Returns the number of unconfirmed transmissions. 185 pub fn num_unconfirmed_transmissions(&self) -> usize { 186 self.primary.num_unconfirmed_transmissions() 187 } 188 189 /// Returns the number of unconfirmed ratifications. 190 pub fn num_unconfirmed_ratifications(&self) -> usize { 191 self.primary.num_unconfirmed_ratifications() 192 } 193 194 /// Returns the number of solutions. 195 pub fn num_unconfirmed_solutions(&self) -> usize { 196 self.primary.num_unconfirmed_solutions() 197 } 198 199 /// Returns the number of unconfirmed transactions. 200 pub fn num_unconfirmed_transactions(&self) -> usize { 201 self.primary.num_unconfirmed_transactions() 202 } 203 } 204 205 impl<N: Network> BFT<N> { 206 /// Returns the worker transmission IDs. 207 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> { 208 self.primary.worker_transmission_ids() 209 } 210 211 /// Returns the worker transmissions. 212 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> { 213 self.primary.worker_transmissions() 214 } 215 216 /// Returns the worker solutions. 217 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> { 218 self.primary.worker_solutions() 219 } 220 221 /// Returns the worker transactions. 222 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> { 223 self.primary.worker_transactions() 224 } 225 } 226 227 impl<N: Network> BFT<N> { 228 /// Stores the certificate in the DAG, and attempts to commit one or more anchors. 229 fn update_to_next_round(&self, current_round: u64) -> bool { 230 // Ensure the current round is at least the storage round (this is a sanity check). 231 let storage_round = self.storage().current_round(); 232 if current_round < storage_round { 233 debug!( 234 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}" 235 ); 236 return false; 237 } 238 239 // Determine if the BFT is ready to update to the next round. 240 let is_ready = match current_round % 2 == 0 { 241 true => self.update_leader_certificate_to_even_round(current_round), 242 false => self.is_leader_quorum_or_nonleaders_available(current_round), 243 }; 244 245 #[cfg(feature = "metrics")] 246 { 247 let start = self.leader_certificate_timer.load(Ordering::SeqCst); 248 // Only log if the timer was set, otherwise we get a time difference since the EPOCH. 249 if start > 0 { 250 let end = now(); 251 let elapsed = std::time::Duration::from_secs((end - start) as u64); 252 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64()); 253 } 254 } 255 256 // Log whether the round is going to update. 257 if current_round % 2 == 0 { 258 // Determine if there is a leader certificate. 259 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() { 260 // Ensure the state of the leader certificate is consistent with the BFT being ready. 261 if !is_ready { 262 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false"); 263 } 264 // Log the leader election. 265 let leader_round = leader_certificate.round(); 266 match leader_round == current_round { 267 true => { 268 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author()); 269 #[cfg(feature = "metrics")] 270 metrics::increment_counter(metrics::bft::LEADERS_ELECTED); 271 } 272 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"), 273 } 274 } else { 275 match is_ready { 276 true => info!("\n\nRound {current_round} reached quorum without a leader\n"), 277 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()), 278 } 279 } 280 } 281 282 // If the BFT is ready, then update to the next round. 283 if is_ready { 284 // Update to the next round in storage. 285 if let Err(err) = self 286 .storage() 287 .increment_to_next_round(current_round) 288 .with_context(|| format!("BFT failed to increment to the next round from round {current_round}")) 289 { 290 warn!("{}", &flatten_error(err)); 291 return false; 292 } 293 // Update the timer for the leader certificate. 294 self.leader_certificate_timer.store(now(), Ordering::SeqCst); 295 } 296 297 is_ready 298 } 299 300 /// Updates the leader certificate to the current even round, 301 /// returning `true` if the BFT is ready to update to the next round. 302 /// 303 /// This method runs on every even round, by determining the leader of the current even round, 304 /// and setting the leader certificate to their certificate in the round, if they were present. 305 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool { 306 // Retrieve the current round. 307 let current_round = self.storage().current_round(); 308 // Ensure the current round matches the given round. 309 if current_round != even_round { 310 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}"); 311 return false; 312 } 313 314 // If the current round is odd, return false. 315 if current_round % 2 != 0 || current_round < 2 { 316 error!("BFT cannot update the leader certificate in an odd round"); 317 return false; 318 } 319 320 // Retrieve the certificates for the current round. 321 let current_certificates = self.storage().get_certificates_for_round(current_round); 322 // If there are no current certificates, set the leader certificate to 'None', and return early. 323 if current_certificates.is_empty() { 324 // Set the leader certificate to 'None'. 325 *self.leader_certificate.write() = None; 326 return false; 327 } 328 329 // Retrieve the committee lookback of the current round. 330 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) { 331 Ok(committee) => committee, 332 Err(err) => { 333 let err = err.context(format!( 334 "BFT failed to retrieve the committee lookback for the even round {current_round}" 335 )); 336 warn!("{}", &flatten_error(err)); 337 return false; 338 } 339 }; 340 // Determine the leader of the current round. 341 let leader = match self.ledger().latest_leader() { 342 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader, 343 _ => { 344 // Compute the leader for the current round. 345 let computed_leader = match committee_lookback.get_leader(current_round) { 346 Ok(leader) => leader, 347 Err(err) => { 348 let err = 349 err.context(format!("BFT failed to compute the leader for the even round {current_round}")); 350 error!("{}", &flatten_error(err)); 351 return false; 352 } 353 }; 354 355 // Cache the computed leader. 356 self.ledger().update_latest_leader(current_round, computed_leader); 357 358 computed_leader 359 } 360 }; 361 // Find and set the leader certificate, if the leader was present in the current even round. 362 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader); 363 *self.leader_certificate.write() = leader_certificate.cloned(); 364 365 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round) 366 } 367 368 /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions: 369 /// - If the leader certificate is set for the current even round. 370 /// - The timer for the leader certificate has expired. 371 fn is_even_round_ready_for_next_round( 372 &self, 373 certificates: IndexSet<BatchCertificate<N>>, 374 committee: Committee<N>, 375 current_round: u64, 376 ) -> bool { 377 // Retrieve the authors for the current round. 378 let authors = certificates.into_iter().map(|c| c.author()).collect(); 379 // Check if quorum threshold is reached. 380 if !committee.is_quorum_threshold_reached(&authors) { 381 trace!("BFT failed to reach quorum threshold in even round {current_round}"); 382 return false; 383 } 384 // If the leader certificate is set for the current even round, return 'true'. 385 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() { 386 if leader_certificate.round() == current_round { 387 return true; 388 } 389 } 390 // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'. 391 if self.is_timer_expired() { 392 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)"); 393 return true; 394 } 395 // Otherwise, return 'false'. 396 false 397 } 398 399 /// Returns `true` if the timer for the leader certificate has expired. 400 /// 401 /// This is always true for a new BFT instance. 402 fn is_timer_expired(&self) -> bool { 403 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now() 404 } 405 406 /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions: 407 /// - The leader certificate is `None`. 408 /// - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round). 409 /// - The leader certificate timer has expired. 410 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool { 411 // Retrieve the current round. 412 let current_round = self.storage().current_round(); 413 // Ensure the current round matches the given round. 414 if current_round != odd_round { 415 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}"); 416 return false; 417 } 418 // If the current round is even, return false. 419 if current_round % 2 != 1 { 420 error!("BFT does not compute stakes for the leader certificate in an even round"); 421 return false; 422 } 423 // Retrieve the certificates for the current round. 424 let current_certificates = self.storage().get_certificates_for_round(current_round); 425 // Retrieve the committee lookback for the current round. 426 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) { 427 Ok(committee) => committee, 428 Err(err) => { 429 let err = err.context(format!( 430 "BFT failed to retrieve the committee lookback for the odd round {current_round}" 431 )); 432 error!("{}", &flatten_error(err)); 433 return false; 434 } 435 }; 436 // Retrieve the authors of the current certificates. 437 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect(); 438 // Check if quorum threshold is reached. 439 if !committee_lookback.is_quorum_threshold_reached(&authors) { 440 trace!("BFT failed reach quorum threshold in odd round {current_round}."); 441 return false; 442 } 443 // Retrieve the leader certificate. 444 let Some(leader_certificate) = self.leader_certificate.read().clone() else { 445 // If there is no leader certificate for the previous round, return 'true'. 446 return true; 447 }; 448 // Compute the stake for the leader certificate. 449 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate( 450 leader_certificate.id(), 451 current_certificates, 452 &committee_lookback, 453 ); 454 // Return 'true' if any of the following conditions hold: 455 stake_with_leader >= committee_lookback.availability_threshold() 456 || stake_without_leader >= committee_lookback.quorum_threshold() 457 || self.is_timer_expired() 458 } 459 460 /// Computes the amount of stake that has & has not signed for the leader certificate. 461 fn compute_stake_for_leader_certificate( 462 &self, 463 leader_certificate_id: Field<N>, 464 current_certificates: IndexSet<BatchCertificate<N>>, 465 current_committee: &Committee<N>, 466 ) -> (u64, u64) { 467 // If there are no current certificates, return early. 468 if current_certificates.is_empty() { 469 return (0, 0); 470 } 471 472 // Initialize a tracker for the stake with the leader. 473 let mut stake_with_leader = 0u64; 474 // Initialize a tracker for the stake without the leader. 475 let mut stake_without_leader = 0u64; 476 // Iterate over the current certificates. 477 for certificate in current_certificates { 478 // Retrieve the stake for the author of the certificate. 479 let stake = current_committee.get_stake(certificate.author()); 480 // Determine if the certificate includes the leader. 481 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) { 482 // If the certificate includes the leader, add the stake to the stake with the leader. 483 true => stake_with_leader = stake_with_leader.saturating_add(stake), 484 // If the certificate does not include the leader, add the stake to the stake without the leader. 485 false => stake_without_leader = stake_without_leader.saturating_add(stake), 486 } 487 } 488 // Return the stake with the leader, and the stake without the leader. 489 (stake_with_leader, stake_without_leader) 490 } 491 } 492 493 impl<N: Network> BFT<N> { 494 /// Stores the certificate in the DAG, and attempts to commit one or more anchors. 495 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>( 496 &self, 497 certificate: BatchCertificate<N>, 498 ) -> Result<()> { 499 // Acquire the BFT lock. 500 let _lock = self.lock.lock().await; 501 502 // ### First, insert the certificate into the DAG. ### 503 // Retrieve the round of the new certificate to add to the DAG. 504 let certificate_round = certificate.round(); 505 506 // Insert the certificate into the DAG. 507 self.dag.write().insert(certificate); 508 509 // ### Second, determine if a new leader certificate can be committed. ### 510 let commit_round = certificate_round.saturating_sub(1); 511 512 // Leaders are elected in even rounds. 513 // If the previous round is odd, the current round cannot commit any leader certs. 514 // Similarly, no leader certificate can be committed for round zero. 515 if !commit_round.is_multiple_of(2) || commit_round < 2 { 516 return Ok(()); 517 } 518 // If the commit round is at or below the last committed round, return early. 519 if commit_round <= self.dag.read().last_committed_round() { 520 return Ok(()); 521 } 522 523 /* Proceeding to check if the leader is ready to be committed. */ 524 trace!("Checking if the leader is ready to be committed for round {commit_round}..."); 525 526 // Retrieve the committee lookback for the commit round. 527 let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| { 528 format!("BFT failed to retrieve the committee with lag for commit round {commit_round}") 529 })?; 530 531 // Either retrieve the cached leader or compute it. 532 let leader = match self.ledger().latest_leader() { 533 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader, 534 _ => { 535 // Compute the leader for the commit round. 536 let computed_leader = committee_lookback 537 .get_leader(commit_round) 538 .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?; 539 540 // Cache the computed leader. 541 self.ledger().update_latest_leader(commit_round, computed_leader); 542 543 computed_leader 544 } 545 }; 546 547 // Retrieve the leader certificate for the commit round. 548 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader) 549 else { 550 trace!("BFT did not find the leader certificate for commit round {commit_round} yet"); 551 return Ok(()); 552 }; 553 // Retrieve all of the certificates for the **certificate** round. 554 let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| { 555 format!("BFT failed to retrieve the certificates for certificate round {certificate_round}") 556 })?; 557 558 // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round). 559 let certificate_committee_lookback = 560 self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| { 561 format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}") 562 })?; 563 564 // Construct a set over the authors who included the leader's certificate in the certificate round. 565 let authors = certificates 566 .values() 567 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) { 568 true => Some(c.author()), 569 false => None, 570 }) 571 .collect(); 572 // Check if the leader is ready to be committed. 573 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) { 574 // If the leader is not ready to be committed, return early. 575 trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet."); 576 return Ok(()); 577 } 578 579 if IS_SYNCING { 580 info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader)); 581 } else { 582 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); 583 } 584 585 // Commit the leader certificate, and all previous leader certificates since the last committed round. 586 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await 587 } 588 589 /// Commits the leader certificate, and all previous leader certificates since the last committed round. 590 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>( 591 &self, 592 leader_certificate: BatchCertificate<N>, 593 ) -> Result<()> { 594 #[cfg(debug_assertions)] 595 trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round()); 596 597 // Fetch the leader round. 598 let latest_leader_round = leader_certificate.round(); 599 // Determine the list of all previous leader certificates since the last committed round. 600 // The order of the leader certificates is from **newest** to **oldest**. 601 let mut leader_certificates = vec![leader_certificate.clone()]; 602 { 603 // Retrieve the leader round. 604 let leader_round = leader_certificate.round(); 605 606 let mut current_certificate = leader_certificate; 607 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2) 608 { 609 // Retrieve the previous committee for the leader round. 610 let previous_committee_lookback = 611 self.ledger().get_committee_lookback_for_round(round).with_context(|| { 612 format!("BFT failed to retrieve a previous committee lookback for the even round {round}") 613 })?; 614 615 // Either retrieve the cached leader or compute it. 616 let leader = match self.ledger().latest_leader() { 617 Some((cached_round, cached_leader)) if cached_round == round => cached_leader, 618 _ => { 619 // Compute the leader for the commit round. 620 let computed_leader = previous_committee_lookback 621 .get_leader(round) 622 .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?; 623 624 // Cache the computed leader. 625 self.ledger().update_latest_leader(round, computed_leader); 626 627 computed_leader 628 } 629 }; 630 // Retrieve the previous leader certificate. 631 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader) 632 else { 633 continue; 634 }; 635 // Determine if there is a path between the previous certificate and the current certificate. 636 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? { 637 // Add the previous leader certificate to the list of certificates to commit. 638 leader_certificates.push(previous_certificate.clone()); 639 // Update the current certificate to the previous leader certificate. 640 current_certificate = previous_certificate; 641 } else { 642 #[cfg(debug_assertions)] 643 trace!( 644 "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate" 645 ); 646 } 647 } 648 } 649 650 // Iterate over the leader certificates to commit. 651 for leader_certificate in leader_certificates.into_iter().rev() { 652 // Retrieve the leader certificate round. 653 let leader_round = leader_certificate.round(); 654 // Compute the commit subdag. 655 let commit_subdag = self 656 .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) 657 .with_context(|| "BFT failed to order the DAG with DFS")?; 658 // If the node is not syncing, trigger consensus, as this will build a new block for the ledger. 659 if !IS_SYNCING { 660 // Initialize a map for the deduped transmissions. 661 let mut transmissions = IndexMap::new(); 662 // Initialize a map for the deduped transaction ids. 663 let mut seen_transaction_ids = IndexSet::new(); 664 // Initialize a map for the deduped solution ids. 665 let mut seen_solution_ids = IndexSet::new(); 666 // Start from the oldest leader certificate. 667 for certificate in commit_subdag.values().flatten() { 668 // Retrieve the transmissions. 669 for transmission_id in certificate.transmission_ids() { 670 // If the transaction ID or solution ID already exists in the map, skip it. 671 // Note: This additional check is done to ensure that we do not include duplicate 672 // transaction IDs or solution IDs that may have a different transmission ID. 673 match transmission_id { 674 TransmissionID::Solution(solution_id, _) => { 675 // If the solution already exists, skip it. 676 if seen_solution_ids.contains(&solution_id) { 677 continue; 678 } 679 } 680 TransmissionID::Transaction(transaction_id, _) => { 681 // If the transaction already exists, skip it. 682 if seen_transaction_ids.contains(transaction_id) { 683 continue; 684 } 685 } 686 TransmissionID::Ratification => { 687 bail!("Ratifications are currently not supported in the BFT.") 688 } 689 } 690 // If the transmission already exists in the map, skip it. 691 if transmissions.contains_key(transmission_id) { 692 continue; 693 } 694 // If the transmission already exists in the ledger, skip it. 695 // Note: On failure to read from the ledger, we skip including this transmission, out of safety. 696 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) { 697 continue; 698 } 699 // Retrieve the transmission. 700 let transmission = self.storage().get_transmission(*transmission_id).with_context(|| { 701 format!( 702 "BFT failed to retrieve transmission '{}.{}' from round {}", 703 fmt_id(transmission_id), 704 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(), 705 certificate.round() 706 ) 707 })?; 708 // Insert the transaction ID or solution ID into the map. 709 match transmission_id { 710 TransmissionID::Solution(id, _) => { 711 seen_solution_ids.insert(id); 712 } 713 TransmissionID::Transaction(id, _) => { 714 seen_transaction_ids.insert(id); 715 } 716 TransmissionID::Ratification => {} 717 } 718 // Add the transmission to the set. 719 transmissions.insert(*transmission_id, transmission); 720 } 721 } 722 // Trigger consensus, as this will build a new block for the ledger. 723 // Construct the subdag. 724 let subdag = Subdag::from(commit_subdag.clone())?; 725 // Retrieve the anchor round. 726 let anchor_round = subdag.anchor_round(); 727 // Retrieve the number of transmissions. 728 let num_transmissions = transmissions.len(); 729 // Retrieve metadata about the subdag. 730 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>(); 731 732 // Ensure the subdag anchor round matches the leader round. 733 ensure!( 734 anchor_round == leader_round, 735 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}", 736 ); 737 738 // Trigger consensus. 739 if let Some(consensus_sender) = self.consensus_sender.get() { 740 // Initialize a callback sender and receiver. 741 let (callback_sender, callback_receiver) = oneshot::channel(); 742 // Send the subdag and transmissions to consensus. 743 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?; 744 // Await the callback to continue. 745 match callback_receiver.await { 746 Ok(Ok(())) => (), // continue 747 Ok(Err(err)) => { 748 let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}")); 749 error!("{}", &flatten_error(err)); 750 return Ok(()); 751 } 752 Err(err) => { 753 let err: anyhow::Error = err.into(); 754 let err = 755 err.context(format!("BFT failed to receive the callback for round {anchor_round}")); 756 error!("{}", flatten_error(err)); 757 return Ok(()); 758 } 759 } 760 } 761 762 info!( 763 "\n\nCommitting a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})\n", 764 ); 765 } 766 767 // Update the DAG, as the subdag was successfully included into a block. 768 { 769 let mut dag_write = self.dag.write(); 770 let mut count = 0; 771 for certificate in commit_subdag.values().flatten() { 772 dag_write.commit(certificate, self.storage().max_gc_rounds()); 773 count += 1; 774 } 775 776 trace!("Committed {count} certificates to the DAG"); 777 } 778 779 // Update the validator telemetry. 780 #[cfg(feature = "telemetry")] 781 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?); 782 } 783 784 // Perform garbage collection based on the latest committed leader round. 785 // The protocol guarantees that validators commit the same anchors in the same order, 786 // but they may do so in different chunks of anchors, 787 // where 'chunk' refers to the vector of certificates that the loop just above iterates over. 788 // Doing garbage collection at the end of each chunk (as we do here), 789 // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end), 790 // may give raise to a discrepancy between the DAGs of different validators who commit different chunks: 791 // one validator may have more certificates than the other, not yet garbage collected. 792 // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor, 793 // it excludes certificates that are below the GC round, 794 // so the possible aforementioned discrepancy between DAGs should not affect the consensus. 795 // That exclusion in `order_dag_with_dfs()` is critical to prevent forking, 796 // so long as garbage collection is done after each chunk. 797 // If garbage collection were done after each committed certificate, 798 // that exclusion in `order_dag_with_dfs()` should be unnecessary. 799 self.storage().garbage_collect_certificates(latest_leader_round); 800 801 Ok(()) 802 } 803 804 /// Returns the subdag of batch certificates to commit. 805 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>( 806 &self, 807 leader_certificate: BatchCertificate<N>, 808 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> { 809 // Initialize a map for the certificates to commit. 810 let mut commit = BTreeMap::<u64, IndexSet<_>>::new(); 811 // Initialize a set for the already ordered certificates. 812 let mut already_ordered = HashSet::new(); 813 // Initialize a buffer for the certificates to order. 814 let mut buffer = vec![leader_certificate]; 815 // Iterate over the certificates to order. 816 while let Some(certificate) = buffer.pop() { 817 // Insert the certificate into the map. 818 commit.entry(certificate.round()).or_default().insert(certificate.clone()); 819 820 // Check if the previous certificate is below the GC round. 821 // This is currently a critical check to prevent forking, 822 // as explained in the comment at the end of `commit_leader_certificate()`, 823 // just before the call to garbage collection. 824 let previous_round = certificate.round().saturating_sub(1); 825 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() { 826 continue; 827 } 828 // Iterate over the previous certificate IDs. 829 // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level), 830 // because this 'while' loop uses 'pop()' to retrieve the next certificate to order. 831 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() { 832 // If the previous certificate is already ordered, continue. 833 if already_ordered.contains(previous_certificate_id) { 834 continue; 835 } 836 // If the previous certificate was recently committed, continue. 837 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) { 838 continue; 839 } 840 // If the previous certificate already exists in the ledger, continue. 841 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) { 842 continue; 843 } 844 845 // Retrieve the previous certificate. 846 let previous_certificate = { 847 // Start by retrieving the previous certificate from the DAG. 848 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) { 849 // If the previous certificate is found, return it. 850 Some(previous_certificate) => previous_certificate, 851 // If the previous certificate is not found, retrieve it from the storage. 852 None => match self.storage().get_certificate(*previous_certificate_id) { 853 // If the previous certificate is found, return it. 854 Some(previous_certificate) => previous_certificate, 855 // Otherwise, the previous certificate is missing, and throw an error. 856 None => bail!( 857 "Missing previous certificate {} for round {previous_round}", 858 fmt_id(previous_certificate_id) 859 ), 860 }, 861 } 862 }; 863 // Insert the previous certificate into the set of already ordered certificates. 864 already_ordered.insert(previous_certificate.id()); 865 // Insert the previous certificate into the buffer. 866 buffer.push(previous_certificate); 867 } 868 } 869 // Ensure we only retain certificates that are above the GC round. 870 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round()); 871 // Return the certificates to commit. 872 Ok(commit) 873 } 874 875 /// Returns `true` if there is a path from the previous certificate to the current certificate. 876 fn is_linked( 877 &self, 878 previous_certificate: BatchCertificate<N>, 879 current_certificate: BatchCertificate<N>, 880 ) -> Result<bool> { 881 // Initialize the list containing the traversal. 882 let mut traversal = vec![current_certificate.clone()]; 883 // Iterate over the rounds from the current certificate to the previous certificate. 884 for round in (previous_certificate.round()..current_certificate.round()).rev() { 885 // Retrieve all of the certificates for this past round. 886 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else { 887 // This is a critical error, as the traversal should have these certificates. 888 // If this error is hit, it is likely that the maximum GC rounds should be increased. 889 bail!("BFT failed to retrieve the certificates for past round {round}"); 890 }; 891 // Filter the certificates to only include those that are in the traversal. 892 traversal = certificates 893 .into_values() 894 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id()))) 895 .collect(); 896 } 897 Ok(traversal.contains(&previous_certificate)) 898 } 899 } 900 901 impl<N: Network> BFT<N> { 902 /// Starts the BFT handlers. 903 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) { 904 let BFTReceiver { 905 mut rx_primary_round, 906 mut rx_primary_certificate, 907 mut rx_sync_bft_dag_at_bootup, 908 mut rx_sync_bft, 909 } = bft_receiver; 910 911 // Process the current round from the primary. 912 let self_ = self.clone(); 913 self.spawn(async move { 914 while let Some((current_round, callback)) = rx_primary_round.recv().await { 915 callback.send(self_.update_to_next_round(current_round)).ok(); 916 } 917 }); 918 919 // Process the certificate from the primary. 920 let self_ = self.clone(); 921 self.spawn(async move { 922 while let Some((certificate, callback)) = rx_primary_certificate.recv().await { 923 // Update the DAG with the certificate. 924 let result = self_.update_dag::<true, false>(certificate).await; 925 // Send the callback **after** updating the DAG. 926 // Note: We must await the DAG update before proceeding. 927 callback.send(result).ok(); 928 } 929 }); 930 931 // Process the request to sync the BFT DAG at bootup. 932 let self_ = self.clone(); 933 self.spawn(async move { 934 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await { 935 self_.sync_bft_dag_at_bootup(certificates).await; 936 } 937 }); 938 939 // Handler for new certificates that were fetched by the sync module. 940 let self_ = self.clone(); 941 self.spawn(async move { 942 while let Some((certificate, callback)) = rx_sync_bft.recv().await { 943 // Update the DAG with the certificate. 944 let result = self_.update_dag::<true, true>(certificate).await; 945 // Send the callback **after** updating the DAG. 946 // Note: We must await the DAG update before proceeding. 947 callback.send(result).ok(); 948 } 949 }); 950 } 951 952 /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must** 953 /// already exist in the ledger. 954 /// 955 /// This method commits all the certificates into the DAG. 956 /// Note that there is no need to insert the certificates into the DAG, because these certificates 957 /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags. 958 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) { 959 // Acquire the BFT write lock. 960 let mut dag = self.dag.write(); 961 962 // Commit all the certificates. 963 for certificate in certificates { 964 dag.commit(&certificate, self.storage().max_gc_rounds()); 965 } 966 } 967 968 /// Spawns a task with the given future; it should only be used for long-running tasks. 969 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 970 self.handles.lock().push(tokio::spawn(future)); 971 } 972 973 /// Shuts down the BFT. 974 pub async fn shut_down(&self) { 975 info!("Shutting down the BFT..."); 976 // Acquire the lock. 977 let _lock = self.lock.lock().await; 978 // Shut down the primary. 979 self.primary.shut_down().await; 980 // Abort the tasks. 981 self.handles.lock().iter().for_each(|handle| handle.abort()); 982 } 983 } 984 985 #[cfg(test)] 986 mod tests { 987 use crate::{ 988 BFT, 989 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, 990 helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round}, 991 }; 992 993 use alphaos_account::Account; 994 use alphaos_node_bft_ledger_service::{LedgerService, MockLedgerService}; 995 use alphaos_node_bft_storage_service::BFTMemoryService; 996 use alphaos_node_sync::BlockSync; 997 use alphavm::{ 998 console::account::{Address, PrivateKey}, 999 ledger::{ 1000 committee::{ 1001 Committee, 1002 test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members}, 1003 }, 1004 narwhal::{ 1005 BatchCertificate, 1006 batch_certificate::test_helpers::{ 1007 sample_batch_certificate, 1008 sample_batch_certificate_for_round, 1009 sample_batch_certificate_for_round_with_committee, 1010 }, 1011 }, 1012 }, 1013 utilities::TestRng, 1014 }; 1015 1016 use alpha_std::StorageMode; 1017 use anyhow::Result; 1018 use indexmap::{IndexMap, IndexSet}; 1019 use std::sync::Arc; 1020 1021 type CurrentNetwork = alphavm::console::network::MainnetV0; 1022 1023 /// Samples a new test instance, with an optional committee round and the given maximum GC rounds. 1024 fn sample_test_instance( 1025 committee_round: Option<u64>, 1026 max_gc_rounds: u64, 1027 rng: &mut TestRng, 1028 ) -> ( 1029 Committee<CurrentNetwork>, 1030 Account<CurrentNetwork>, 1031 Arc<MockLedgerService<CurrentNetwork>>, 1032 Storage<CurrentNetwork>, 1033 ) { 1034 let committee = match committee_round { 1035 Some(round) => sample_committee_for_round(round, rng), 1036 None => sample_committee(rng), 1037 }; 1038 let account = Account::new(rng).unwrap(); 1039 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1040 let transmissions = Arc::new(BFTMemoryService::new()); 1041 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds); 1042 1043 (committee, account, ledger, storage) 1044 } 1045 1046 // Helper function to set up BFT for testing. 1047 fn initialize_bft( 1048 account: Account<CurrentNetwork>, 1049 storage: Storage<CurrentNetwork>, 1050 ledger: Arc<MockLedgerService<CurrentNetwork>>, 1051 ) -> anyhow::Result<BFT<CurrentNetwork>> { 1052 // Create the block synchronization logic. 1053 let block_sync = Arc::new(BlockSync::new(ledger.clone())); 1054 // Initialize the BFT. 1055 BFT::new( 1056 account.clone(), 1057 storage.clone(), 1058 ledger.clone(), 1059 block_sync, 1060 None, 1061 &[], 1062 false, 1063 StorageMode::new_test(None), 1064 None, 1065 ) 1066 } 1067 1068 #[test] 1069 #[tracing_test::traced_test] 1070 fn test_is_leader_quorum_odd() -> Result<()> { 1071 let rng = &mut TestRng::default(); 1072 1073 // Sample batch certificates. 1074 let mut certificates = IndexSet::new(); 1075 certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng)); 1076 certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng)); 1077 certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng)); 1078 certificates.insert(alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng)); 1079 1080 // Initialize the committee. 1081 let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members( 1082 1, 1083 vec![ 1084 certificates[0].author(), 1085 certificates[1].author(), 1086 certificates[2].author(), 1087 certificates[3].author(), 1088 ], 1089 rng, 1090 ); 1091 1092 // Initialize the ledger. 1093 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1094 // Initialize the storage. 1095 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10); 1096 // Initialize the account. 1097 let account = Account::new(rng)?; 1098 // Initialize the BFT. 1099 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1100 assert!(bft.is_timer_expired()); 1101 // Ensure this call succeeds on an odd round. 1102 let result = bft.is_leader_quorum_or_nonleaders_available(1); 1103 // If timer has expired but quorum threshold is not reached, return 'false'. 1104 assert!(!result); 1105 // Insert certificates into storage. 1106 for certificate in certificates.iter() { 1107 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1108 } 1109 // Ensure this call succeeds on an odd round. 1110 let result = bft.is_leader_quorum_or_nonleaders_available(1); 1111 assert!(result); // no previous leader certificate 1112 // Set the leader certificate. 1113 let leader_certificate = sample_batch_certificate(rng); 1114 *bft.leader_certificate.write() = Some(leader_certificate); 1115 // Ensure this call succeeds on an odd round. 1116 let result = bft.is_leader_quorum_or_nonleaders_available(1); 1117 assert!(result); // should now fall through to the end of function 1118 1119 Ok(()) 1120 } 1121 1122 #[test] 1123 #[tracing_test::traced_test] 1124 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> { 1125 let rng = &mut TestRng::default(); 1126 1127 // Sample the test instance. 1128 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng); 1129 assert_eq!(committee.starting_round(), 1); 1130 assert_eq!(storage.current_round(), 1); 1131 assert_eq!(storage.max_gc_rounds(), 10); 1132 1133 // Set up the BFT logic. 1134 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1135 assert!(bft.is_timer_expired()); 1136 1137 // Store is at round 1, and we are checking for round 2. 1138 // Ensure this call fails on an even round. 1139 let result = bft.is_leader_quorum_or_nonleaders_available(2); 1140 assert!(!result); 1141 Ok(()) 1142 } 1143 1144 #[test] 1145 #[tracing_test::traced_test] 1146 fn test_is_leader_quorum_even() -> Result<()> { 1147 let rng = &mut TestRng::default(); 1148 1149 // Sample the test instance. 1150 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng); 1151 assert_eq!(committee.starting_round(), 2); 1152 assert_eq!(storage.current_round(), 2); 1153 assert_eq!(storage.max_gc_rounds(), 10); 1154 1155 // Set up the BFT logic. 1156 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1157 assert!(bft.is_timer_expired()); 1158 1159 // Ensure this call fails on an even round. 1160 let result = bft.is_leader_quorum_or_nonleaders_available(2); 1161 assert!(!result); 1162 Ok(()) 1163 } 1164 1165 #[test] 1166 #[tracing_test::traced_test] 1167 fn test_is_even_round_ready() -> Result<()> { 1168 let rng = &mut TestRng::default(); 1169 1170 // Sample batch certificates. 1171 let mut certificates = IndexSet::new(); 1172 certificates.insert(sample_batch_certificate_for_round(2, rng)); 1173 certificates.insert(sample_batch_certificate_for_round(2, rng)); 1174 certificates.insert(sample_batch_certificate_for_round(2, rng)); 1175 certificates.insert(sample_batch_certificate_for_round(2, rng)); 1176 1177 // Initialize the committee. 1178 let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members( 1179 2, 1180 vec![ 1181 certificates[0].author(), 1182 certificates[1].author(), 1183 certificates[2].author(), 1184 certificates[3].author(), 1185 ], 1186 rng, 1187 ); 1188 1189 // Initialize the ledger. 1190 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1191 // Initialize the storage. 1192 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10); 1193 // Initialize the account. 1194 let account = Account::new(rng)?; 1195 1196 // Set up the BFT logic. 1197 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1198 assert!(bft.is_timer_expired()); 1199 1200 // Set the leader certificate. 1201 let leader_certificate = sample_batch_certificate_for_round(2, rng); 1202 *bft.leader_certificate.write() = Some(leader_certificate); 1203 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2); 1204 // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round. 1205 assert!(!result); 1206 // Once quorum threshold is reached, we are ready for the next round. 1207 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2); 1208 assert!(result); 1209 1210 // Initialize a new BFT. 1211 let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1212 // If the leader certificate is not set and the timer has not expired, we are not ready for the next round. 1213 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2); 1214 if !bft_timer.is_timer_expired() { 1215 assert!(!result); 1216 } 1217 // Wait for the timer to expire. 1218 let leader_certificate_timeout = 1219 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000); 1220 std::thread::sleep(leader_certificate_timeout); 1221 // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round. 1222 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2); 1223 if bft_timer.is_timer_expired() { 1224 assert!(result); 1225 } else { 1226 assert!(!result); 1227 } 1228 1229 Ok(()) 1230 } 1231 1232 #[test] 1233 #[tracing_test::traced_test] 1234 fn test_update_leader_certificate_odd() -> Result<()> { 1235 let rng = &mut TestRng::default(); 1236 1237 // Sample the test instance. 1238 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng); 1239 assert_eq!(storage.max_gc_rounds(), 10); 1240 1241 // Initialize the BFT. 1242 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1243 assert!(bft.is_timer_expired()); 1244 1245 // Ensure this call fails on an odd round. 1246 let result = bft.update_leader_certificate_to_even_round(1); 1247 assert!(!result); 1248 Ok(()) 1249 } 1250 1251 #[test] 1252 #[tracing_test::traced_test] 1253 fn test_update_leader_certificate_bad_round() -> Result<()> { 1254 let rng = &mut TestRng::default(); 1255 1256 // Sample the test instance. 1257 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng); 1258 assert_eq!(storage.max_gc_rounds(), 10); 1259 1260 // Initialize the BFT. 1261 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1262 1263 // Ensure this call succeeds on an even round. 1264 let result = bft.update_leader_certificate_to_even_round(6); 1265 assert!(!result); 1266 Ok(()) 1267 } 1268 1269 #[test] 1270 #[tracing_test::traced_test] 1271 fn test_update_leader_certificate_even() -> Result<()> { 1272 let rng = &mut TestRng::default(); 1273 1274 // Set the current round. 1275 let current_round = 3; 1276 1277 // Sample the certificates. 1278 let (_, certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates( 1279 current_round, 1280 rng, 1281 ); 1282 1283 // Initialize the committee. 1284 let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members( 1285 2, 1286 vec![ 1287 certificates[0].author(), 1288 certificates[1].author(), 1289 certificates[2].author(), 1290 certificates[3].author(), 1291 ], 1292 rng, 1293 ); 1294 1295 // Initialize the ledger. 1296 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1297 1298 // Initialize the storage. 1299 let transmissions = Arc::new(BFTMemoryService::new()); 1300 let storage = Storage::new(ledger.clone(), transmissions, 10); 1301 storage.testing_only_insert_certificate_testing_only(certificates[0].clone()); 1302 storage.testing_only_insert_certificate_testing_only(certificates[1].clone()); 1303 storage.testing_only_insert_certificate_testing_only(certificates[2].clone()); 1304 storage.testing_only_insert_certificate_testing_only(certificates[3].clone()); 1305 assert_eq!(storage.current_round(), 2); 1306 1307 // Retrieve the leader certificate. 1308 let leader = committee.get_leader(2).unwrap(); 1309 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap(); 1310 1311 // Initialize the BFT. 1312 let account = Account::new(rng)?; 1313 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1314 1315 // Set the leader certificate. 1316 *bft.leader_certificate.write() = Some(leader_certificate); 1317 1318 // Update the leader certificate. 1319 // Ensure this call succeeds on an even round. 1320 let result = bft.update_leader_certificate_to_even_round(2); 1321 assert!(result); 1322 1323 Ok(()) 1324 } 1325 1326 #[tokio::test] 1327 #[tracing_test::traced_test] 1328 async fn test_order_dag_with_dfs() -> Result<()> { 1329 let rng = &mut TestRng::default(); 1330 1331 // Sample the test instance. 1332 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng); 1333 1334 // Initialize the round parameters. 1335 let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below. 1336 let current_round = previous_round + 1; 1337 1338 // Sample the current certificate and previous certificates. 1339 let (certificate, previous_certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates( 1340 current_round, 1341 rng, 1342 ); 1343 1344 /* Test GC */ 1345 1346 // Ensure the function succeeds in returning only certificates above GC. 1347 { 1348 // Initialize the storage. 1349 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 1350 // Initialize the BFT. 1351 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1352 1353 // Insert a mock DAG in the BFT. 1354 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3); 1355 1356 // Insert the previous certificates into the BFT. 1357 for certificate in previous_certificates.clone() { 1358 assert!(bft.update_dag::<false, false>(certificate).await.is_ok()); 1359 } 1360 1361 // Ensure this call succeeds and returns all given certificates. 1362 let result = bft.order_dag_with_dfs::<false>(certificate.clone()); 1363 assert!(result.is_ok()); 1364 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>(); 1365 assert_eq!(candidate_certificates.len(), 1); 1366 let expected_certificates = vec![certificate.clone()]; 1367 assert_eq!( 1368 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(), 1369 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>() 1370 ); 1371 assert_eq!(candidate_certificates, expected_certificates); 1372 } 1373 1374 /* Test normal case */ 1375 1376 // Ensure the function succeeds in returning all given certificates. 1377 { 1378 // Initialize the storage. 1379 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 1380 // Initialize the BFT. 1381 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1382 1383 // Insert a mock DAG in the BFT. 1384 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2); 1385 1386 // Insert the previous certificates into the BFT. 1387 for certificate in previous_certificates.clone() { 1388 assert!(bft.update_dag::<false, false>(certificate).await.is_ok()); 1389 } 1390 1391 // Ensure this call succeeds and returns all given certificates. 1392 let result = bft.order_dag_with_dfs::<false>(certificate.clone()); 1393 assert!(result.is_ok()); 1394 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>(); 1395 assert_eq!(candidate_certificates.len(), 5); 1396 let expected_certificates = vec![ 1397 previous_certificates[0].clone(), 1398 previous_certificates[1].clone(), 1399 previous_certificates[2].clone(), 1400 previous_certificates[3].clone(), 1401 certificate, 1402 ]; 1403 assert_eq!( 1404 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(), 1405 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>() 1406 ); 1407 assert_eq!(candidate_certificates, expected_certificates); 1408 } 1409 1410 Ok(()) 1411 } 1412 1413 #[test] 1414 #[tracing_test::traced_test] 1415 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> { 1416 let rng = &mut TestRng::default(); 1417 1418 // Sample the test instance. 1419 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng); 1420 assert_eq!(committee.starting_round(), 1); 1421 assert_eq!(storage.current_round(), 1); 1422 assert_eq!(storage.max_gc_rounds(), 1); 1423 1424 // Initialize the round parameters. 1425 let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below. 1426 let current_round = previous_round + 1; 1427 1428 // Sample the current certificate and previous certificates. 1429 let (certificate, previous_certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates( 1430 current_round, 1431 rng, 1432 ); 1433 // Construct the previous certificate IDs. 1434 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect(); 1435 1436 /* Test missing previous certificate. */ 1437 1438 // Initialize the BFT. 1439 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1440 1441 // The expected error message. 1442 let error_msg = format!( 1443 "Missing previous certificate {} for round {previous_round}", 1444 crate::helpers::fmt_id(previous_certificate_ids[3]), 1445 ); 1446 1447 // Ensure this call fails on a missing previous certificate. 1448 let result = bft.order_dag_with_dfs::<false>(certificate); 1449 assert!(result.is_err()); 1450 assert_eq!(result.unwrap_err().to_string(), error_msg); 1451 Ok(()) 1452 } 1453 1454 #[tokio::test] 1455 async fn test_bft_gc_on_commit() -> Result<()> { 1456 let rng = &mut TestRng::default(); 1457 1458 // Initialize the round parameters. 1459 let max_gc_rounds = 1; 1460 let committee_round = 0; 1461 let commit_round = 2; 1462 let current_round = commit_round + 1; 1463 1464 // Sample the certificates. 1465 let (_, certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates( 1466 current_round, 1467 rng, 1468 ); 1469 1470 // Initialize the committee. 1471 let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members( 1472 committee_round, 1473 vec![ 1474 certificates[0].author(), 1475 certificates[1].author(), 1476 certificates[2].author(), 1477 certificates[3].author(), 1478 ], 1479 rng, 1480 ); 1481 1482 // Initialize the ledger. 1483 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1484 1485 // Initialize the storage. 1486 let transmissions = Arc::new(BFTMemoryService::new()); 1487 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds); 1488 // Insert the certificates into the storage. 1489 for certificate in certificates.iter() { 1490 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1491 } 1492 1493 // Get the leader certificate. 1494 let leader = committee.get_leader(commit_round).unwrap(); 1495 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap(); 1496 1497 // Initialize the BFT. 1498 let account = Account::new(rng)?; 1499 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1500 1501 // Create an empty mock DAG with last committed round set to `commit_round`. 1502 *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round); 1503 1504 // Ensure that the `gc_round` has not been updated yet. 1505 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds)); 1506 1507 // Insert the certificates into the BFT. 1508 for certificate in certificates { 1509 assert!(bft.update_dag::<false, false>(certificate).await.is_ok()); 1510 } 1511 1512 // Commit the leader certificate. 1513 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap(); 1514 1515 // Ensure that the `gc_round` has been updated. 1516 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds); 1517 1518 Ok(()) 1519 } 1520 1521 #[tokio::test] 1522 #[tracing_test::traced_test] 1523 async fn test_sync_bft_dag_at_bootup() -> Result<()> { 1524 let rng = &mut TestRng::default(); 1525 1526 // Initialize the round parameters. 1527 let max_gc_rounds = 1; 1528 let committee_round = 0; 1529 let commit_round = 2; 1530 let current_round = commit_round + 1; 1531 1532 // Sample the current certificate and previous certificates. 1533 let (_, certificates) = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates( 1534 current_round, 1535 rng, 1536 ); 1537 1538 // Initialize the committee. 1539 let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members( 1540 committee_round, 1541 vec![ 1542 certificates[0].author(), 1543 certificates[1].author(), 1544 certificates[2].author(), 1545 certificates[3].author(), 1546 ], 1547 rng, 1548 ); 1549 1550 // Initialize the ledger. 1551 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1552 1553 // Initialize the storage. 1554 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1555 // Insert the certificates into the storage. 1556 for certificate in certificates.iter() { 1557 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1558 } 1559 1560 // Get the leader certificate. 1561 let leader = committee.get_leader(commit_round).unwrap(); 1562 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap(); 1563 1564 // Initialize the BFT. 1565 let account = Account::new(rng)?; 1566 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1567 1568 // Insert a mock DAG in the BFT. 1569 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round); 1570 1571 // Insert the previous certificates into the BFT. 1572 for certificate in certificates.clone() { 1573 assert!(bft.update_dag::<false, false>(certificate).await.is_ok()); 1574 } 1575 1576 // Commit the leader certificate. 1577 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap(); 1578 1579 // Simulate a bootup of the BFT. 1580 1581 // Initialize a new instance of storage. 1582 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1583 // Initialize a new instance of BFT. 1584 let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?; 1585 1586 // Sync the BFT DAG at bootup. 1587 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await; 1588 1589 // Check that the BFT starts from the same last committed round. 1590 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round()); 1591 1592 // Ensure that both BFTs have committed the leader certificate. 1593 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); 1594 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); 1595 1596 // Check the state of the bootup BFT. 1597 for certificate in certificates { 1598 let certificate_round = certificate.round(); 1599 let certificate_id = certificate.id(); 1600 // Check that the bootup BFT has committed the certificates. 1601 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id)); 1602 // Check that the bootup BFT does not contain the certificates in its graph, because 1603 // it should not need to order them again in subsequent subdags. 1604 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); 1605 } 1606 1607 Ok(()) 1608 } 1609 1610 #[tokio::test] 1611 #[tracing_test::traced_test] 1612 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> { 1613 /* 1614 1. Run one uninterrupted BFT on a set of certificates for 2 leader commits. 1615 2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates. 1616 3. Observe that the uninterrupted BFT and the bootup BFT end in the same state. 1617 */ 1618 1619 let rng = &mut TestRng::default(); 1620 1621 // Initialize the round parameters. 1622 let max_gc_rounds = alphavm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64; 1623 let committee_round = 0; 1624 let commit_round = 2; 1625 let current_round = commit_round + 1; 1626 let next_round = current_round + 1; 1627 1628 // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors. 1629 let (round_to_certificates_map, committee) = { 1630 let private_keys = vec![ 1631 PrivateKey::new(rng).unwrap(), 1632 PrivateKey::new(rng).unwrap(), 1633 PrivateKey::new(rng).unwrap(), 1634 PrivateKey::new(rng).unwrap(), 1635 ]; 1636 let addresses = vec![ 1637 Address::try_from(private_keys[0])?, 1638 Address::try_from(private_keys[1])?, 1639 Address::try_from(private_keys[2])?, 1640 Address::try_from(private_keys[3])?, 1641 ]; 1642 let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members( 1643 committee_round, 1644 addresses, 1645 rng, 1646 ); 1647 // Initialize a mapping from the round number to the set of batch certificates in the round. 1648 let mut round_to_certificates_map: IndexMap< 1649 u64, 1650 IndexSet<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>>, 1651 > = IndexMap::new(); 1652 let mut previous_certificates = IndexSet::with_capacity(4); 1653 // Initialize the genesis batch certificates. 1654 for _ in 0..4 { 1655 previous_certificates.insert(sample_batch_certificate(rng)); 1656 } 1657 for round in 0..commit_round + 3 { 1658 let mut current_certificates = IndexSet::new(); 1659 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { 1660 IndexSet::new() 1661 } else { 1662 previous_certificates.iter().map(|c| c.id()).collect() 1663 }; 1664 let transmission_ids = 1665 alphavm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng) 1666 .into_iter() 1667 .collect::<IndexSet<_>>(); 1668 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp(); 1669 let committee_id = committee.id(); 1670 for (i, private_key_1) in private_keys.iter().enumerate() { 1671 let batch_header = alphavm::ledger::narwhal::BatchHeader::new( 1672 private_key_1, 1673 round, 1674 timestamp, 1675 committee_id, 1676 transmission_ids.clone(), 1677 previous_certificate_ids.clone(), 1678 rng, 1679 ) 1680 .unwrap(); 1681 let mut signatures = IndexSet::with_capacity(4); 1682 for (j, private_key_2) in private_keys.iter().enumerate() { 1683 if i != j { 1684 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); 1685 } 1686 } 1687 let certificate = 1688 alphavm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap(); 1689 current_certificates.insert(certificate); 1690 } 1691 // Update the mapping. 1692 round_to_certificates_map.insert(round, current_certificates.clone()); 1693 previous_certificates = current_certificates.clone(); 1694 } 1695 (round_to_certificates_map, committee) 1696 }; 1697 1698 // Initialize the ledger. 1699 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1700 // Initialize the storage. 1701 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1702 // Get the leaders for the next 2 commit rounds. 1703 let leader = committee.get_leader(commit_round).unwrap(); 1704 let next_leader = committee.get_leader(next_round).unwrap(); 1705 // Insert the pre shutdown certificates into the storage. 1706 let mut pre_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new(); 1707 for i in 1..=commit_round { 1708 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone(); 1709 if i == commit_round { 1710 // Only insert the leader certificate for the commit round. 1711 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader); 1712 if let Some(c) = leader_certificate { 1713 pre_shutdown_certificates.push(c.clone()); 1714 } 1715 continue; 1716 } 1717 pre_shutdown_certificates.extend(certificates); 1718 } 1719 for certificate in pre_shutdown_certificates.iter() { 1720 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1721 } 1722 // Insert the post shutdown certificates into the storage. 1723 let mut post_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = 1724 Vec::new(); 1725 for j in commit_round..=commit_round + 2 { 1726 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone(); 1727 post_shutdown_certificates.extend(certificate); 1728 } 1729 for certificate in post_shutdown_certificates.iter() { 1730 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1731 } 1732 // Get the leader certificates. 1733 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap(); 1734 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap(); 1735 1736 // Initialize the BFT without bootup. 1737 let account = Account::new(rng)?; 1738 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1739 1740 // Insert a mock DAG in the BFT without bootup. 1741 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); 1742 1743 // Insert the certificates into the BFT without bootup. 1744 for certificate in pre_shutdown_certificates.clone() { 1745 assert!(bft.update_dag::<false, false>(certificate).await.is_ok()); 1746 } 1747 1748 // Insert the post shutdown certificates into the BFT without bootup. 1749 for certificate in post_shutdown_certificates.clone() { 1750 assert!(bft.update_dag::<false, false>(certificate).await.is_ok()); 1751 } 1752 // Commit the second leader certificate. 1753 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap(); 1754 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>(); 1755 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap(); 1756 1757 // Simulate a bootup of the BFT. 1758 1759 // Initialize a new instance of storage. 1760 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1761 1762 // Initialize a new instance of BFT with bootup. 1763 let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?; 1764 1765 // Sync the BFT DAG at bootup. 1766 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await; 1767 1768 // Insert the post shutdown certificates to the storage and BFT with bootup. 1769 for certificate in post_shutdown_certificates.iter() { 1770 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone()); 1771 } 1772 for certificate in post_shutdown_certificates.clone() { 1773 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok()); 1774 } 1775 // Commit the second leader certificate. 1776 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap(); 1777 let commit_subdag_metadata_bootup = 1778 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>(); 1779 let committed_certificates_bootup = commit_subdag_bootup.values().flatten(); 1780 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap(); 1781 1782 // Check that the final state of both BFTs is the same. 1783 1784 // Check that both BFTs start from the same last committed round. 1785 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round()); 1786 1787 // Ensure that both BFTs have committed the leader certificates. 1788 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); 1789 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())); 1790 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); 1791 assert!( 1792 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()) 1793 ); 1794 1795 // Check that the bootup BFT has committed the pre shutdown certificates. 1796 for certificate in pre_shutdown_certificates.clone() { 1797 let certificate_round = certificate.round(); 1798 let certificate_id = certificate.id(); 1799 // Check that both BFTs have committed the certificates. 1800 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id)); 1801 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id)); 1802 // Check that the bootup BFT does not contain the certificates in its graph, because 1803 // it should not need to order them again in subsequent subdags. 1804 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); 1805 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); 1806 } 1807 1808 // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus. 1809 for certificate in committed_certificates_bootup.clone() { 1810 let certificate_round = certificate.round(); 1811 let certificate_id = certificate.id(); 1812 // Check that the both BFTs have committed the certificates. 1813 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id)); 1814 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id)); 1815 // Check that the bootup BFT does not contain the certificates in its graph, because 1816 // it should not need to order them again in subsequent subdags. 1817 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); 1818 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); 1819 } 1820 1821 // Check that the commit subdag metadata for the second leader is the same for both BFTs. 1822 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata); 1823 1824 Ok(()) 1825 } 1826 1827 #[tokio::test] 1828 #[tracing_test::traced_test] 1829 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> { 1830 /* 1831 1. Run a bootup BFT that syncs with a set of pre shutdown certificates. 1832 2. Add post shutdown certificates to the bootup BFT. 1833 2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates. 1834 */ 1835 1836 let rng = &mut TestRng::default(); 1837 1838 // Initialize the round parameters. 1839 let max_gc_rounds = alphavm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64; 1840 let committee_round = 0; 1841 let commit_round = 2; 1842 let current_round = commit_round + 1; 1843 let next_round = current_round + 1; 1844 1845 // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors. 1846 let (round_to_certificates_map, committee) = { 1847 let private_keys = vec![ 1848 PrivateKey::new(rng).unwrap(), 1849 PrivateKey::new(rng).unwrap(), 1850 PrivateKey::new(rng).unwrap(), 1851 PrivateKey::new(rng).unwrap(), 1852 ]; 1853 let addresses = vec![ 1854 Address::try_from(private_keys[0])?, 1855 Address::try_from(private_keys[1])?, 1856 Address::try_from(private_keys[2])?, 1857 Address::try_from(private_keys[3])?, 1858 ]; 1859 let committee = alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_members( 1860 committee_round, 1861 addresses, 1862 rng, 1863 ); 1864 // Initialize a mapping from the round number to the set of batch certificates in the round. 1865 let mut round_to_certificates_map: IndexMap< 1866 u64, 1867 IndexSet<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>>, 1868 > = IndexMap::new(); 1869 let mut previous_certificates = IndexSet::with_capacity(4); 1870 // Initialize the genesis batch certificates. 1871 for _ in 0..4 { 1872 previous_certificates.insert(sample_batch_certificate(rng)); 1873 } 1874 for round in 0..=commit_round + 2 { 1875 let mut current_certificates = IndexSet::new(); 1876 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { 1877 IndexSet::new() 1878 } else { 1879 previous_certificates.iter().map(|c| c.id()).collect() 1880 }; 1881 let transmission_ids = 1882 alphavm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng) 1883 .into_iter() 1884 .collect::<IndexSet<_>>(); 1885 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp(); 1886 let committee_id = committee.id(); 1887 for (i, private_key_1) in private_keys.iter().enumerate() { 1888 let batch_header = alphavm::ledger::narwhal::BatchHeader::new( 1889 private_key_1, 1890 round, 1891 timestamp, 1892 committee_id, 1893 transmission_ids.clone(), 1894 previous_certificate_ids.clone(), 1895 rng, 1896 ) 1897 .unwrap(); 1898 let mut signatures = IndexSet::with_capacity(4); 1899 for (j, private_key_2) in private_keys.iter().enumerate() { 1900 if i != j { 1901 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); 1902 } 1903 } 1904 let certificate = 1905 alphavm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap(); 1906 current_certificates.insert(certificate); 1907 } 1908 // Update the mapping. 1909 round_to_certificates_map.insert(round, current_certificates.clone()); 1910 previous_certificates = current_certificates.clone(); 1911 } 1912 (round_to_certificates_map, committee) 1913 }; 1914 1915 // Initialize the ledger. 1916 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1917 // Initialize the storage. 1918 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1919 // Get the leaders for the next 2 commit rounds. 1920 let leader = committee.get_leader(commit_round).unwrap(); 1921 let next_leader = committee.get_leader(next_round).unwrap(); 1922 // Insert the pre shutdown certificates into the storage. 1923 let mut pre_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new(); 1924 for i in 1..=commit_round { 1925 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone(); 1926 if i == commit_round { 1927 // Only insert the leader certificate for the commit round. 1928 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader); 1929 if let Some(c) = leader_certificate { 1930 pre_shutdown_certificates.push(c.clone()); 1931 } 1932 continue; 1933 } 1934 pre_shutdown_certificates.extend(certificates); 1935 } 1936 for certificate in pre_shutdown_certificates.iter() { 1937 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1938 } 1939 // Initialize the bootup BFT. 1940 let account = Account::new(rng)?; 1941 let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; 1942 1943 // Insert a mock DAG in the BFT without bootup. 1944 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); 1945 // Sync the BFT DAG at bootup. 1946 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await; 1947 1948 // Insert the post shutdown certificates into the storage. 1949 let mut post_shutdown_certificates: Vec<alphavm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = 1950 Vec::new(); 1951 for j in commit_round..=commit_round + 2 { 1952 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone(); 1953 post_shutdown_certificates.extend(certificate); 1954 } 1955 for certificate in post_shutdown_certificates.iter() { 1956 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1957 } 1958 1959 // Insert the post shutdown certificates into the DAG. 1960 for certificate in post_shutdown_certificates.clone() { 1961 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok()); 1962 } 1963 1964 // Get the next leader certificate to commit. 1965 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap(); 1966 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap(); 1967 let committed_certificates = commit_subdag.values().flatten(); 1968 1969 // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round. 1970 for pre_shutdown_certificate in pre_shutdown_certificates.clone() { 1971 for committed_certificate in committed_certificates.clone() { 1972 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id()); 1973 } 1974 } 1975 Ok(()) 1976 } 1977 1978 /// Tests that a leader certificate can be committed by sufficient endorsements in a succeeding leader certificate. 1979 #[test_log::test(tokio::test)] 1980 async fn test_commit_via_is_linked() { 1981 let rng = &mut TestRng::default(); 1982 1983 let committee_round = 0; 1984 let leader_round_1 = 2; 1985 let leader_round_2 = 4; // subsequent even round 1986 let max_gc_rounds = 50; 1987 1988 // Create a committee with four members. 1989 let num_authors = 4; 1990 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect(); 1991 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect(); 1992 1993 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng); 1994 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1995 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1996 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap(); 1997 1998 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new(); 1999 2000 // Round 1 2001 let round1_certs: IndexSet<_> = (0..num_authors) 2002 .map(|idx| { 2003 let author = &private_keys[idx]; 2004 let endorsements: Vec<_> = private_keys 2005 .iter() 2006 .enumerate() 2007 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2008 .collect(); 2009 2010 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng) 2011 }) 2012 .collect(); 2013 certificates_by_round.insert(1, round1_certs.clone()); 2014 2015 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap(); 2016 let mut leader1_certificate = None; 2017 2018 let round2_certs: IndexSet<_> = (0..num_authors) 2019 .map(|idx| { 2020 let author = &private_keys[idx]; 2021 let endorsements: Vec<_> = private_keys 2022 .iter() 2023 .enumerate() 2024 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2025 .collect(); 2026 let cert = sample_batch_certificate_for_round_with_committee( 2027 leader_round_1, 2028 round1_certs.iter().map(|c| c.id()).collect(), 2029 author, 2030 &endorsements[..], 2031 rng, 2032 ); 2033 2034 if cert.author() == leader1 { 2035 leader1_certificate = Some(cert.clone()); 2036 } 2037 cert 2038 }) 2039 .collect(); 2040 certificates_by_round.insert(leader_round_1, round2_certs.clone()); 2041 2042 let round3_certs: IndexSet<_> = (0..num_authors) 2043 .map(|idx| { 2044 let author = &private_keys[idx]; 2045 let endorsements: Vec<_> = private_keys 2046 .iter() 2047 .enumerate() 2048 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2049 .collect(); 2050 2051 let previous_certificate_ids: IndexSet<_> = round2_certs 2052 .iter() 2053 .filter_map(|cert| { 2054 // Only have the leader endorse the previous round's leader certificate. 2055 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) } 2056 }) 2057 .collect(); 2058 2059 sample_batch_certificate_for_round_with_committee( 2060 leader_round_1 + 1, 2061 previous_certificate_ids, 2062 author, 2063 &endorsements[..], 2064 rng, 2065 ) 2066 }) 2067 .collect(); 2068 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone()); 2069 2070 // Ensure the first leader's certificate is not committed yet. 2071 let leader_certificate_1 = leader1_certificate.unwrap(); 2072 assert!( 2073 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()), 2074 "Leader certificate 1 should not be committed yet" 2075 ); 2076 assert_eq!(bft.dag.read().last_committed_round(), 0); 2077 2078 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap(); 2079 let round4_certs: IndexSet<_> = (0..num_authors) 2080 .map(|idx| { 2081 let endorsements: Vec<_> = private_keys 2082 .iter() 2083 .enumerate() 2084 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2085 .collect(); 2086 2087 sample_batch_certificate_for_round_with_committee( 2088 leader_round_2, 2089 round3_certs.iter().map(|c| c.id()).collect(), 2090 &private_keys[idx], 2091 &endorsements[..], 2092 rng, 2093 ) 2094 }) 2095 .collect(); 2096 certificates_by_round.insert(leader_round_2, round4_certs.clone()); 2097 2098 // Insert all certificates into the storage and DAG. 2099 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) { 2100 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 2101 bft.update_dag::<false, false>(certificate).await.unwrap(); 2102 } 2103 2104 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap(); 2105 2106 assert!( 2107 bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(), 2108 "Leader certificate 1 should be linked to leader certificate 2" 2109 ); 2110 2111 // Explicitely commit leader certificate 2. 2112 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap(); 2113 2114 // Leader certificate 1 should be committed transitively when committing the leader certificate 2. 2115 assert!( 2116 bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()), 2117 "Leader certificate for round 2 should be committed when committing at round 4" 2118 ); 2119 2120 // Leader certificate 2 should be committed as the above call was successful. 2121 assert!( 2122 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()), 2123 "Leader certificate for round 4 should be committed" 2124 ); 2125 2126 assert_eq!(bft.dag.read().last_committed_round(), 4); 2127 } 2128 2129 #[test_log::test(tokio::test)] 2130 async fn test_commit_via_is_linked_with_skipped_anchor() { 2131 let rng = &mut TestRng::default(); 2132 2133 let committee_round = 0; 2134 let leader_round_1 = 2; 2135 let leader_round_2 = 4; 2136 let max_gc_rounds = 50; 2137 2138 let num_authors = 4; 2139 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect(); 2140 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect(); 2141 2142 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng); 2143 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 2144 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 2145 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap(); 2146 2147 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new(); 2148 2149 // Round 1 2150 let round1_certs: IndexSet<_> = (0..num_authors) 2151 .map(|idx| { 2152 let author = &private_keys[idx]; 2153 let endorsements: Vec<_> = private_keys 2154 .iter() 2155 .enumerate() 2156 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2157 .collect(); 2158 2159 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng) 2160 }) 2161 .collect(); 2162 certificates_by_round.insert(1, round1_certs.clone()); 2163 2164 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap(); 2165 let mut leader1_certificate = None; 2166 2167 let round2_certs: IndexSet<_> = (0..num_authors) 2168 .map(|idx| { 2169 let author = &private_keys[idx]; 2170 let endorsements: Vec<_> = private_keys 2171 .iter() 2172 .enumerate() 2173 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2174 .collect(); 2175 let cert = sample_batch_certificate_for_round_with_committee( 2176 leader_round_1, 2177 round1_certs.iter().map(|c| c.id()).collect(), 2178 author, 2179 &endorsements[..], 2180 rng, 2181 ); 2182 2183 if cert.author() == leader1 { 2184 leader1_certificate = Some(cert.clone()); 2185 } 2186 cert 2187 }) 2188 .collect(); 2189 certificates_by_round.insert(leader_round_1, round2_certs.clone()); 2190 2191 let round3_certs: IndexSet<_> = (0..num_authors) 2192 .map(|idx| { 2193 let author = &private_keys[idx]; 2194 let endorsements: Vec<_> = private_keys 2195 .iter() 2196 .enumerate() 2197 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2198 .collect(); 2199 2200 let previous_certificate_ids: IndexSet<_> = round2_certs 2201 .iter() 2202 .filter_map(|cert| { 2203 // Only have the leader endorse the previous round's leader certificate. 2204 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) } 2205 }) 2206 .collect(); 2207 2208 sample_batch_certificate_for_round_with_committee( 2209 leader_round_1 + 1, 2210 previous_certificate_ids, 2211 author, 2212 &endorsements[..], 2213 rng, 2214 ) 2215 }) 2216 .collect(); 2217 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone()); 2218 2219 // Ensure the first leader's certificate is not committed yet. 2220 let leader_certificate_1 = leader1_certificate.unwrap(); 2221 assert!( 2222 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()), 2223 "Leader certificate 1 should not be committed yet" 2224 ); 2225 2226 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap(); 2227 let round4_certs: IndexSet<_> = (0..num_authors) 2228 .map(|idx| { 2229 let endorsements: Vec<_> = private_keys 2230 .iter() 2231 .enumerate() 2232 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) }) 2233 .collect(); 2234 2235 // Do not create a path to the previous leader certificate. 2236 let previous_certificate_ids: IndexSet<_> = round3_certs 2237 .iter() 2238 .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) }) 2239 .collect(); 2240 2241 sample_batch_certificate_for_round_with_committee( 2242 leader_round_2, 2243 previous_certificate_ids, 2244 &private_keys[idx], 2245 &endorsements[..], 2246 rng, 2247 ) 2248 }) 2249 .collect(); 2250 certificates_by_round.insert(leader_round_2, round4_certs.clone()); 2251 2252 // Insert all certificates into the storage and DAG. 2253 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) { 2254 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 2255 bft.update_dag::<false, false>(certificate).await.unwrap(); 2256 } 2257 2258 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap(); 2259 2260 assert!( 2261 !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(), 2262 "Leader certificate 1 should not be linked to leader certificate 2" 2263 ); 2264 assert_eq!(bft.dag.read().last_committed_round(), 0); 2265 2266 // Explicitely commit leader certificate 2. 2267 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap(); 2268 2269 // Leader certificate 1 should be committed transitively when committing the leader certificate 2. 2270 assert!( 2271 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()), 2272 "Leader certificate for round 2 should not be committed when committing at round 4" 2273 ); 2274 2275 // Leader certificate 2 should be committed as the above call was successful. 2276 assert!( 2277 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()), 2278 "Leader certificate for round 4 should be committed" 2279 ); 2280 assert_eq!(bft.dag.read().last_committed_round(), 4); 2281 } 2282 }