primary.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::{ 20 events::{BatchPropose, BatchSignature, Event}, 21 helpers::{ 22 assign_to_worker, 23 assign_to_workers, 24 fmt_id, 25 init_sync_channels, 26 init_worker_channels, 27 now, 28 BFTSender, 29 PrimaryReceiver, 30 PrimarySender, 31 Proposal, 32 ProposalCache, 33 SignedProposals, 34 Storage, 35 }, 36 spawn_blocking, 37 Gateway, 38 Sync, 39 Transport, 40 Worker, 41 MAX_BATCH_DELAY_IN_MS, 42 MAX_WORKERS, 43 MIN_BATCH_DELAY_IN_SECS, 44 PRIMARY_PING_IN_MS, 45 WORKER_PING_IN_MS, 46 }; 47 use alphaos_account::Account; 48 use alphaos_node_bft_events::PrimaryPing; 49 use alphaos_node_bft_ledger_service::LedgerService; 50 use alphaos_node_network::PeerPoolHandling; 51 use alphaos_node_sync::{BlockSync, Ping, DUMMY_SELF_IP}; 52 use alphavm::{ 53 console::{ 54 prelude::*, 55 types::{Address, Field}, 56 }, 57 ledger::{ 58 block::Transaction, 59 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID}, 60 puzzle::{Solution, SolutionID}, 61 }, 62 prelude::{committee::Committee, ConsensusVersion}, 63 utilities::flatten_error, 64 }; 65 66 use alphastd::StorageMode; 67 use anyhow::Context; 68 use colored::Colorize; 69 use futures::stream::{FuturesUnordered, StreamExt}; 70 use indexmap::{IndexMap, IndexSet}; 71 #[cfg(feature = "locktick")] 72 use locktick::{ 73 parking_lot::{Mutex, RwLock}, 74 tokio::Mutex as TMutex, 75 }; 76 #[cfg(not(feature = "locktick"))] 77 use parking_lot::{Mutex, RwLock}; 78 #[cfg(not(feature = "serial"))] 79 use rayon::prelude::*; 80 use std::{ 81 collections::{HashMap, HashSet}, 82 future::Future, 83 net::SocketAddr, 84 sync::Arc, 85 time::Duration, 86 }; 87 #[cfg(not(feature = "locktick"))] 88 use tokio::sync::Mutex as TMutex; 89 use tokio::{sync::OnceCell, task::JoinHandle}; 90 91 /// A helper type for an optional proposed batch. 92 pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>; 93 94 /// The primary logic of a node. 95 /// AlphaBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2). 96 #[derive(Clone)] 97 pub struct Primary<N: Network> { 98 /// The sync module enables fetching data from other validators. 99 sync: Sync<N>, 100 /// The gateway allows talking to other nodes in the validator set. 101 gateway: Gateway<N>, 102 /// The storage. 103 storage: Storage<N>, 104 /// The ledger service. 105 ledger: Arc<dyn LedgerService<N>>, 106 /// The workers. 107 workers: Arc<[Worker<N>]>, 108 /// The BFT sender. 109 bft_sender: Arc<OnceCell<BFTSender<N>>>, 110 /// The batch proposal, if the primary is currently proposing a batch. 111 proposed_batch: Arc<ProposedBatch<N>>, 112 /// The timestamp of the most recent proposed batch. 113 latest_proposed_batch_timestamp: Arc<RwLock<i64>>, 114 /// The recently-signed batch proposals. 115 signed_proposals: Arc<RwLock<SignedProposals<N>>>, 116 /// The handles for all background tasks spawned by this primary. 117 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 118 /// The lock for propose_batch. 119 propose_lock: Arc<TMutex<u64>>, 120 /// The storage mode of the node. 121 storage_mode: StorageMode, 122 } 123 124 impl<N: Network> Primary<N> { 125 /// The maximum number of unconfirmed transmissions to send to the primary. 126 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2; 127 128 /// Initializes a new primary instance. 129 #[allow(clippy::too_many_arguments)] 130 pub fn new( 131 account: Account<N>, 132 storage: Storage<N>, 133 ledger: Arc<dyn LedgerService<N>>, 134 block_sync: Arc<BlockSync<N>>, 135 ip: Option<SocketAddr>, 136 trusted_validators: &[SocketAddr], 137 trusted_peers_only: bool, 138 storage_mode: StorageMode, 139 dev: Option<u16>, 140 ) -> Result<Self> { 141 // Initialize the gateway. 142 let gateway = Gateway::new( 143 account, 144 storage.clone(), 145 ledger.clone(), 146 ip, 147 trusted_validators, 148 trusted_peers_only, 149 storage_mode.clone(), 150 dev, 151 )?; 152 // Initialize the sync module. 153 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync); 154 155 // Initialize the primary instance. 156 Ok(Self { 157 sync, 158 gateway, 159 storage, 160 ledger, 161 workers: Arc::from(vec![]), 162 bft_sender: Default::default(), 163 proposed_batch: Default::default(), 164 latest_proposed_batch_timestamp: Default::default(), 165 signed_proposals: Default::default(), 166 handles: Default::default(), 167 propose_lock: Default::default(), 168 storage_mode, 169 }) 170 } 171 172 /// Load the proposal cache file and update the Primary state with the stored data. 173 async fn load_proposal_cache(&self) -> Result<()> { 174 // IMPORTANT: Skip proposal cache in dev mode to allow fresh starts. 175 // Dev mode is for testing and should allow easy resets without manual cleanup. 176 // Production mode still benefits from proposal cache for crash recovery. 177 if self.storage_mode.dev().is_some() { 178 info!("Skipping proposal cache in dev mode (allows fresh restarts)"); 179 return Ok(()); 180 } 181 182 // Fetch the signed proposals from the file system if it exists. 183 match ProposalCache::<N>::exists(&self.storage_mode) { 184 // If the proposal cache exists, then process the proposal cache. 185 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) { 186 Ok(proposal_cache) => { 187 // Extract the proposal and signed proposals. 188 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) = 189 proposal_cache.into(); 190 191 // Write the proposed batch. 192 *self.proposed_batch.write() = proposed_batch; 193 // Write the signed proposals. 194 *self.signed_proposals.write() = signed_proposals; 195 // Writ the propose lock. 196 *self.propose_lock.lock().await = latest_certificate_round; 197 198 // Update the storage with the pending certificates. 199 for certificate in pending_certificates { 200 let batch_id = certificate.batch_id(); 201 // We use a dummy IP because the node should not need to request from any peers. 202 // The storage should have stored all the transmissions. If not, we simply 203 // skip the certificate. 204 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await 205 { 206 let err = err.context(format!( 207 "Failed to load stored certificate {} from proposal cache", 208 fmt_id(batch_id) 209 )); 210 warn!("{}", &flatten_error(err)); 211 } 212 } 213 Ok(()) 214 } 215 Err(err) => Err(err.context("Failed to read the signed proposals from the file system")), 216 }, 217 // If the proposal cache does not exist, then return early. 218 false => Ok(()), 219 } 220 } 221 222 /// Run the primary instance. 223 pub async fn run( 224 &mut self, 225 ping: Option<Arc<Ping<N>>>, 226 bft_sender: Option<BFTSender<N>>, 227 primary_sender: PrimarySender<N>, 228 primary_receiver: PrimaryReceiver<N>, 229 ) -> Result<()> { 230 info!("Starting the primary instance of the memory pool..."); 231 232 // Set the BFT sender. 233 if let Some(bft_sender) = &bft_sender { 234 // Set the BFT sender in the primary. 235 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set"); 236 } 237 238 // Construct a map of the worker senders. 239 let mut worker_senders = IndexMap::new(); 240 // Construct a map for the workers. 241 let mut workers = Vec::new(); 242 // Initialize the workers. 243 for id in 0..MAX_WORKERS { 244 // Construct the worker channels. 245 let (tx_worker, rx_worker) = init_worker_channels(); 246 // Construct the worker instance. 247 let worker = Worker::new( 248 id, 249 Arc::new(self.gateway.clone()), 250 self.storage.clone(), 251 self.ledger.clone(), 252 self.proposed_batch.clone(), 253 )?; 254 // Run the worker instance. 255 worker.run(rx_worker); 256 // Add the worker to the list of workers. 257 workers.push(worker); 258 // Add the worker sender to the map. 259 worker_senders.insert(id, tx_worker); 260 } 261 // Set the workers. 262 self.workers = Arc::from(workers); 263 264 // First, initialize the sync channels. 265 let (sync_sender, sync_receiver) = init_sync_channels(); 266 // Next, initialize the sync module and sync the storage from ledger. 267 self.sync.initialize(bft_sender).await?; 268 // Next, load and process the proposal cache before running the sync module. 269 self.load_proposal_cache().await?; 270 // Next, run the sync module. 271 self.sync.run(ping, sync_receiver).await?; 272 // Next, initialize the gateway. 273 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await; 274 // Lastly, start the primary handlers. 275 // Note: This ensures the primary does not start communicating before syncing is complete. 276 self.start_handlers(primary_receiver); 277 278 Ok(()) 279 } 280 281 /// Returns the current round. 282 pub fn current_round(&self) -> u64 { 283 self.storage.current_round() 284 } 285 286 /// Returns `true` if the primary is synced. 287 pub fn is_synced(&self) -> bool { 288 self.sync.is_synced() 289 } 290 291 /// Returns the gateway. 292 pub const fn gateway(&self) -> &Gateway<N> { 293 &self.gateway 294 } 295 296 /// Returns the storage. 297 pub const fn storage(&self) -> &Storage<N> { 298 &self.storage 299 } 300 301 /// Returns the ledger. 302 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> { 303 &self.ledger 304 } 305 306 /// Returns the number of workers. 307 pub fn num_workers(&self) -> u8 { 308 u8::try_from(self.workers.len()).expect("Too many workers") 309 } 310 311 /// Returns the workers. 312 pub const fn workers(&self) -> &Arc<[Worker<N>]> { 313 &self.workers 314 } 315 316 /// Returns the batch proposal of our primary, if one currently exists. 317 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> { 318 &self.proposed_batch 319 } 320 } 321 322 impl<N: Network> Primary<N> { 323 /// Returns the number of unconfirmed transmissions. 324 pub fn num_unconfirmed_transmissions(&self) -> usize { 325 self.workers.iter().map(|worker| worker.num_transmissions()).sum() 326 } 327 328 /// Returns the number of unconfirmed ratifications. 329 pub fn num_unconfirmed_ratifications(&self) -> usize { 330 self.workers.iter().map(|worker| worker.num_ratifications()).sum() 331 } 332 333 /// Returns the number of unconfirmed solutions. 334 pub fn num_unconfirmed_solutions(&self) -> usize { 335 self.workers.iter().map(|worker| worker.num_solutions()).sum() 336 } 337 338 /// Returns the number of unconfirmed transactions. 339 pub fn num_unconfirmed_transactions(&self) -> usize { 340 self.workers.iter().map(|worker| worker.num_transactions()).sum() 341 } 342 } 343 344 impl<N: Network> Primary<N> { 345 /// Returns the worker transmission IDs. 346 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> { 347 self.workers.iter().flat_map(|worker| worker.transmission_ids()) 348 } 349 350 /// Returns the worker transmissions. 351 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> { 352 self.workers.iter().flat_map(|worker| worker.transmissions()) 353 } 354 355 /// Returns the worker solutions. 356 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> { 357 self.workers.iter().flat_map(|worker| worker.solutions()) 358 } 359 360 /// Returns the worker transactions. 361 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> { 362 self.workers.iter().flat_map(|worker| worker.transactions()) 363 } 364 } 365 366 impl<N: Network> Primary<N> { 367 /// Clears the worker solutions. 368 pub fn clear_worker_solutions(&self) { 369 self.workers.iter().for_each(Worker::clear_solutions); 370 } 371 } 372 373 impl<N: Network> Primary<N> { 374 /// Proposes the batch for the current round. 375 /// 376 /// This method performs the following steps: 377 /// 1. Drain the workers. 378 /// 2. Sign the batch. 379 /// 3. Set the batch proposal in the primary. 380 /// 4. Broadcast the batch header to all validators for signing. 381 pub async fn propose_batch(&self) -> Result<()> { 382 // This function isn't re-entrant. 383 let mut lock_guard = self.propose_lock.lock().await; 384 385 // Check if the proposed batch has expired, and clear it if it has expired. 386 if let Err(err) = self 387 .check_proposed_batch_for_expiration() 388 .await 389 .with_context(|| "Failed to check the proposed batch for expiration") 390 { 391 warn!("{}", flatten_error(&err)); 392 return Ok(()); 393 } 394 395 // Retrieve the current round. 396 let round = self.current_round(); 397 // Compute the previous round. 398 let previous_round = round.saturating_sub(1); 399 400 // If the current round is 0, return early. 401 // This can actually never happen, because of the invariant that the current round is never 0 402 // (see [`StorageInner::current_round`]). 403 ensure!(round > 0, "Round 0 cannot have transaction batches"); 404 405 // If the current storage round is below the latest proposal round, then return early. 406 if round < *lock_guard { 407 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard); 408 return Ok(()); 409 } 410 411 // If there is a batch being proposed already, 412 // rebroadcast the batch header to the non-signers, and return early. 413 if let Some(proposal) = self.proposed_batch.read().as_ref() { 414 // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this. 415 if round < proposal.round() 416 || proposal 417 .batch_header() 418 .previous_certificate_ids() 419 .iter() 420 .any(|id| !self.storage.contains_certificate(*id)) 421 { 422 warn!( 423 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.", 424 proposal.round(), 425 ); 426 return Ok(()); 427 } 428 // Construct the event. 429 // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers. 430 let event = Event::BatchPropose(proposal.batch_header().clone().into()); 431 // Iterate through the non-signers. 432 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) { 433 // Resolve the address to the peer IP. 434 match self.gateway.resolver().read().get_peer_ip_for_address(address) { 435 // Resend the batch proposal to the validator for signing. 436 Some(peer_ip) => { 437 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round()); 438 tokio::spawn(async move { 439 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'"); 440 // Resend the batch proposal to the peer. 441 if gateway.send(peer_ip, event_).await.is_none() { 442 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'"); 443 } 444 }); 445 } 446 None => continue, 447 } 448 } 449 debug!("Proposed batch for round {} is still valid", proposal.round()); 450 return Ok(()); 451 } 452 453 #[cfg(feature = "metrics")] 454 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64); 455 456 // Ensure that the primary does not create a new proposal too quickly. 457 if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) { 458 debug!( 459 "{}", 460 flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}"))) 461 ); 462 return Ok(()); 463 } 464 465 // Ensure the primary has not proposed a batch for this round before. 466 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) { 467 // If a BFT sender was provided, attempt to advance the current round. 468 if let Some(bft_sender) = self.bft_sender.get() { 469 match bft_sender.send_primary_round_to_bft(self.current_round()).await { 470 // 'is_ready' is true if the primary is ready to propose a batch for the next round. 471 Ok(true) => (), // continue, 472 // 'is_ready' is false if the primary is not ready to propose a batch for the next round. 473 Ok(false) => return Ok(()), 474 // An error occurred while attempting to advance the current round. 475 Err(err) => { 476 let err = err.context("Failed to update the BFT to the next round"); 477 warn!("{}", &flatten_error(&err)); 478 return Err(err); 479 } 480 } 481 } 482 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed()); 483 return Ok(()); 484 } 485 486 // Determine if the current round has been proposed. 487 // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is 488 // good for network reliability and should not be prevented for the already existing proposed_batch. 489 // If a certificate already exists for the current round, an attempt should be made to advance the 490 // round as early as possible. 491 if round == *lock_guard { 492 debug!("Primary is safely skipping a batch proposal - round {round} already proposed"); 493 return Ok(()); 494 } 495 496 // Retrieve the committee to check against. 497 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?; 498 // Check if the primary is connected to enough validators to reach quorum threshold. 499 { 500 // Retrieve the connected validator addresses. 501 let mut connected_validators = self.gateway.connected_addresses(); 502 // Append the primary to the set. 503 connected_validators.insert(self.gateway.account().address()); 504 // If quorum threshold is not reached, return early. 505 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) { 506 debug!( 507 "Primary is safely skipping a batch proposal for round {round} {}", 508 "(please connect to more validators)".dimmed() 509 ); 510 trace!("Primary is connected to {} validators", connected_validators.len() - 1); 511 return Ok(()); 512 } 513 } 514 515 // Retrieve the previous certificates. 516 let previous_certificates = self.storage.get_certificates_for_round(previous_round); 517 518 // Check if the batch is ready to be proposed. 519 // Note: The primary starts at round 1, and round 0 contains no certificates, by definition. 520 let mut is_ready = previous_round == 0; 521 // If the previous round is not 0, check if the previous certificates have reached the quorum threshold. 522 if previous_round > 0 { 523 // Retrieve the committee lookback for the round. 524 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else { 525 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet") 526 }; 527 // Construct a set over the authors. 528 let authors = previous_certificates.iter().map(BatchCertificate::author).collect(); 529 // Check if the previous certificates have reached the quorum threshold. 530 if previous_committee_lookback.is_quorum_threshold_reached(&authors) { 531 is_ready = true; 532 } 533 } 534 // If the batch is not ready to be proposed, return early. 535 if !is_ready { 536 debug!( 537 "Primary is safely skipping a batch proposal for round {round} {}", 538 format!("(previous round {previous_round} has not reached quorum)").dimmed() 539 ); 540 return Ok(()); 541 } 542 543 // Initialize the map of transmissions. 544 let mut transmissions: IndexMap<_, _> = Default::default(); 545 // Track the total execution costs of the batch proposal as it is being constructed. 546 let mut proposal_cost = 0u64; 547 // Note: worker draining and transaction inclusion needs to be thought 548 // through carefully when there is more than one worker. The fairness 549 // provided by one worker (FIFO) is no longer guaranteed with multiple workers. 550 debug_assert_eq!(MAX_WORKERS, 1); 551 552 'outer: for worker in self.workers().iter() { 553 let mut num_worker_transmissions = 0usize; 554 555 while let Some((id, transmission)) = worker.remove_front() { 556 // Check the selected transmissions are below the batch limit. 557 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH { 558 // Reinsert the transmission into the worker. 559 worker.insert_front(id, transmission); 560 break 'outer; 561 } 562 563 // Check the max transmissions per worker is not exceeded. 564 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER { 565 // Reinsert the transmission into the worker. 566 worker.insert_front(id, transmission); 567 continue 'outer; 568 } 569 570 // Check if the ledger already contains the transmission. 571 if self.ledger.contains_transmission(&id).unwrap_or(true) { 572 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); 573 continue; 574 } 575 576 // Check if the storage already contain the transmission. 577 // Note: We do not skip if this is the first transmission in the proposal, to ensure that 578 // the primary does not propose a batch with no transmissions. 579 if !transmissions.is_empty() && self.storage.contains_transmission(id) { 580 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); 581 continue; 582 } 583 584 // Check the transmission is still valid. 585 match (id, transmission.clone()) { 586 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => { 587 // Ensure the checksum matches. If not, skip the solution. 588 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum) 589 { 590 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id)); 591 continue; 592 } 593 // Check if the solution is still valid. 594 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { 595 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); 596 continue; 597 } 598 } 599 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => { 600 // Ensure the checksum matches. If not, skip the transaction. 601 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum ) 602 { 603 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id)); 604 continue; 605 } 606 607 // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error. 608 let transaction = spawn_blocking!({ 609 match transaction { 610 Data::Object(transaction) => Ok(transaction), 611 Data::Buffer(bytes) => { 612 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?) 613 } 614 } 615 })?; 616 617 // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached. 618 // ConsensusVersion V8 Migration logic - 619 // Do not include deployments in a batch proposal. 620 let current_block_height = self.ledger.latest_block_height(); 621 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?; 622 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?; 623 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?; 624 if current_block_height > consensus_version_v7_height 625 && current_block_height <= consensus_version_v8_height 626 && transaction.is_deploy() 627 { 628 trace!( 629 "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})", 630 fmt_id(transaction_id) 631 ); 632 continue; 633 } 634 635 // Compute the transaction spent cost (in microcredits). 636 // Note: We purposefully discard this transaction if we are unable to compute the spent cost. 637 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version) 638 else { 639 debug!( 640 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost", 641 fmt_id(transaction_id) 642 ); 643 continue; 644 }; 645 646 // Check if the transaction is still valid. 647 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { 648 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); 649 continue; 650 } 651 652 // Compute the next proposal cost. 653 // Note: We purposefully discard this transaction if the proposal cost overflows. 654 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else { 655 debug!( 656 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed", 657 fmt_id(transaction_id) 658 ); 659 continue; 660 }; 661 662 // Check if the next proposal cost exceeds the batch proposal spend limit. 663 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height); 664 if next_proposal_cost > batch_spend_limit { 665 debug!( 666 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})", 667 fmt_id(transaction_id), 668 batch_spend_limit 669 ); 670 671 // Reinsert the transmission into the worker. 672 worker.insert_front(id, transmission); 673 break 'outer; 674 } 675 676 // Update the proposal cost. 677 proposal_cost = next_proposal_cost; 678 } 679 680 // Note: We explicitly forbid including ratifications, 681 // as the protocol currently does not support ratifications. 682 (TransmissionID::Ratification, Transmission::Ratification) => continue, 683 // All other combinations are clearly invalid. 684 _ => continue, 685 } 686 687 // If the transmission is valid, insert it into the proposal's transmission list. 688 transmissions.insert(id, transmission); 689 num_worker_transmissions = num_worker_transmissions.saturating_add(1); 690 } 691 } 692 693 // Determine the current timestamp. 694 let current_timestamp = now(); 695 696 *lock_guard = round; 697 698 /* Proceeding to sign & propose the batch. */ 699 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len()); 700 701 // Retrieve the private key. 702 let private_key = *self.gateway.account().private_key(); 703 // Retrieve the committee ID. 704 let committee_id = committee_lookback.id(); 705 // Prepare the transmission IDs. 706 let transmission_ids = transmissions.keys().copied().collect(); 707 // Prepare the previous batch certificate IDs. 708 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect(); 709 // Sign the batch header and construct the proposal. 710 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new( 711 &private_key, 712 round, 713 current_timestamp, 714 committee_id, 715 transmission_ids, 716 previous_certificate_ids, 717 &mut rand::thread_rng() 718 )) 719 .and_then(|batch_header| { 720 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone()) 721 .map(|proposal| (batch_header, proposal)) 722 }) 723 .inspect_err(|_| { 724 // On error, reinsert the transmissions and then propagate the error. 725 if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) { 726 error!("{}", flatten_error(err.context("Failed to reinsert transmissions"))); 727 } 728 })?; 729 // Broadcast the batch to all validators for signing. 730 self.gateway.broadcast(Event::BatchPropose(batch_header.into())); 731 // Set the timestamp of the latest proposed batch. 732 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp(); 733 // Set the proposed batch. 734 *self.proposed_batch.write() = Some(proposal); 735 Ok(()) 736 } 737 738 /// Processes a batch propose from a peer. 739 /// 740 /// This method performs the following steps: 741 /// 1. Verify the batch. 742 /// 2. Sign the batch. 743 /// 3. Broadcast the signature back to the validator. 744 /// 745 /// If our primary is ahead of the peer, we will not sign the batch. 746 /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch. 747 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> { 748 let BatchPropose { round: batch_round, batch_header } = batch_propose; 749 750 // Deserialize the batch header. 751 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?; 752 // Ensure the round matches in the batch header. 753 if batch_round != batch_header.round() { 754 // Proceed to disconnect the validator. 755 self.gateway.disconnect(peer_ip); 756 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round()); 757 } 758 759 // Retrieve the batch author. 760 let batch_author = batch_header.author(); 761 762 // Ensure the batch proposal is from the validator. 763 match self.gateway.resolve_to_alpha_addr(peer_ip) { 764 // If the peer is a validator, then ensure the batch proposal is from the validator. 765 Some(address) => { 766 if address != batch_author { 767 // Proceed to disconnect the validator. 768 self.gateway.disconnect(peer_ip); 769 bail!("Malicious peer - proposed batch from a different validator ({batch_author})"); 770 } 771 } 772 None => bail!("Batch proposal from a disconnected validator"), 773 } 774 // Ensure the batch author is a current committee member. 775 if !self.gateway.is_authorized_validator_address(batch_author) { 776 // Proceed to disconnect the validator. 777 self.gateway.disconnect(peer_ip); 778 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})"); 779 } 780 // Ensure the batch proposal is not from the current primary. 781 if self.gateway.account().address() == batch_author { 782 bail!("Invalid peer - proposed batch from myself ({batch_author})"); 783 } 784 785 // Ensure that the batch proposal's committee ID matches the expected committee ID. 786 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id(); 787 if expected_committee_id != batch_header.committee_id() { 788 // Proceed to disconnect the validator. 789 self.gateway.disconnect(peer_ip); 790 bail!( 791 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})", 792 batch_header.committee_id() 793 ); 794 } 795 796 // Retrieve the cached round and batch ID for this validator. 797 if let Some((signed_round, signed_batch_id, signature)) = 798 self.signed_proposals.read().get(&batch_author).copied() 799 { 800 // If the signed round is ahead of the peer's batch round, do not sign the proposal. 801 // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it. 802 if signed_round > batch_header.round() { 803 bail!( 804 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}", 805 batch_header.round() 806 ); 807 } 808 809 // If the round matches and the batch ID differs, then the validator is malicious. 810 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() { 811 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})"); 812 } 813 // If the round and batch ID matches, then skip signing the batch a second time. 814 // Instead, rebroadcast the cached signature to the peer. 815 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() { 816 let gateway = self.gateway.clone(); 817 tokio::spawn(async move { 818 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'"); 819 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature)); 820 // Resend the batch signature to the peer. 821 if gateway.send(peer_ip, event).await.is_none() { 822 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'"); 823 } 824 }); 825 // Return early. 826 return Ok(()); 827 } 828 } 829 830 // Ensure that the batch header doesn't already exist in storage. 831 // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task. 832 if self.storage.contains_batch(batch_header.batch_id()) { 833 debug!( 834 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}", 835 format!("batch for round {batch_round} already exists in storage").dimmed() 836 ); 837 return Ok(()); 838 } 839 840 // Compute the previous round. 841 let previous_round = batch_round.saturating_sub(1); 842 // Ensure that the peer did not propose a batch too quickly. 843 if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) { 844 // Proceed to disconnect the validator. 845 self.gateway.disconnect(peer_ip); 846 return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'"))); 847 } 848 849 // Ensure the batch header does not contain any ratifications. 850 if batch_header.contains(TransmissionID::Ratification) { 851 // Proceed to disconnect the validator. 852 self.gateway.disconnect(peer_ip); 853 bail!( 854 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'", 855 ); 856 } 857 858 // If the peer is ahead, use the batch header to sync up to the peer. 859 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?; 860 861 // Check that the transmission ids match and are not fee transactions. 862 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| { 863 // If the transmission is not well-formed, then return early. 864 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission) 865 }) { 866 let err = err.context(format!( 867 "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission" 868 )); 869 debug!("{}", flatten_error(err)); 870 return Ok(()); 871 } 872 873 // Ensure the batch is for the current round. 874 // This method must be called after fetching previous certificates (above), 875 // and prior to checking the batch header (below). 876 if let Err(e) = self.ensure_is_signing_round(batch_round) { 877 // If the primary is not signing for the peer's round, then return early. 878 debug!("{e} from '{peer_ip}'"); 879 return Ok(()); 880 } 881 882 // Ensure the batch header from the peer is valid. 883 let (storage, header) = (self.storage.clone(), batch_header.clone()); 884 885 // Check the batch header, and return early if it already exists in storage. 886 let Some(missing_transmissions) = 887 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))? 888 else { 889 return Ok(()); 890 }; 891 892 // Inserts the missing transmissions into the workers. 893 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?; 894 895 // Ensure the transaction doesn't bring the proposal above the spend limit. 896 let block_height = self.ledger.latest_block_height() + 1; 897 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 { 898 let mut proposal_cost = 0u64; 899 for transmission_id in batch_header.transmission_ids() { 900 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?; 901 let Some(worker) = self.workers.get(worker_id as usize) else { 902 debug!("Unable to find worker {worker_id}"); 903 return Ok(()); 904 }; 905 906 let Some(transmission) = worker.get_transmission(*transmission_id) else { 907 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id)); 908 return Ok(()); 909 }; 910 911 // If the transmission is a transaction, compute its execution cost. 912 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) = 913 (transmission_id, transmission) 914 { 915 // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error. 916 let transaction = spawn_blocking!({ 917 match transaction { 918 Data::Object(transaction) => Ok(transaction), 919 Data::Buffer(bytes) => { 920 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?) 921 } 922 } 923 })?; 924 925 // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached. 926 // ConsensusVersion V8 Migration logic - 927 // Do not include deployments in a batch proposal. 928 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?; 929 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?; 930 let consensus_version = N::CONSENSUS_VERSION(block_height)?; 931 if block_height > consensus_version_v7_height 932 && block_height <= consensus_version_v8_height 933 && transaction.is_deploy() 934 { 935 bail!( 936 "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})", 937 ) 938 } 939 940 // Compute the transaction spent cost (in microcredits). 941 // Note: We purposefully discard this transaction if we are unable to compute the spent cost. 942 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version) 943 else { 944 bail!( 945 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'", 946 fmt_id(transaction_id) 947 ) 948 }; 949 950 // Compute the next proposal cost. 951 // Note: We purposefully discard this transaction if the proposal cost overflows. 952 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else { 953 bail!( 954 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'", 955 fmt_id(transaction_id) 956 ) 957 }; 958 959 // Check if the next proposal cost exceeds the batch proposal spend limit. 960 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height); 961 if next_proposal_cost > batch_spend_limit { 962 bail!( 963 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})", 964 fmt_id(transaction_id), 965 batch_spend_limit 966 ); 967 } 968 969 // Update the proposal cost. 970 proposal_cost = next_proposal_cost; 971 } 972 } 973 } 974 975 /* Proceeding to sign the batch. */ 976 977 // Retrieve the batch ID. 978 let batch_id = batch_header.batch_id(); 979 // Sign the batch ID. 980 let account = self.gateway.account().clone(); 981 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?; 982 983 // Ensure the proposal has not already been signed. 984 // 985 // Note: Due to the need to sync the batch header with the peer, it is possible 986 // for the primary to receive the same 'BatchPropose' event again, whereby only 987 // one instance of this handler should sign the batch. This check guarantees this. 988 match self.signed_proposals.write().0.entry(batch_author) { 989 std::collections::hash_map::Entry::Occupied(mut entry) => { 990 // If the validator has already signed a batch for this round, then return early, 991 // since, if the peer still has not received the signature, they will request it again, 992 // and the logic at the start of this function will resend the (now cached) signature 993 // to the peer if asked to sign this batch proposal again. 994 if entry.get().0 == batch_round { 995 return Ok(()); 996 } 997 // Otherwise, cache the round, batch ID, and signature for this validator. 998 entry.insert((batch_round, batch_id, signature)); 999 } 1000 // If the validator has not signed a batch before, then continue. 1001 std::collections::hash_map::Entry::Vacant(entry) => { 1002 // Cache the round, batch ID, and signature for this validator. 1003 entry.insert((batch_round, batch_id, signature)); 1004 } 1005 }; 1006 1007 // Broadcast the signature back to the validator. 1008 let self_ = self.clone(); 1009 tokio::spawn(async move { 1010 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature)); 1011 // Send the batch signature to the peer. 1012 if self_.gateway.send(peer_ip, event).await.is_some() { 1013 debug!("Signed a batch for round {batch_round} from '{peer_ip}'"); 1014 } 1015 }); 1016 1017 Ok(()) 1018 } 1019 1020 /// Processes a batch signature from a peer. 1021 /// 1022 /// This method performs the following steps: 1023 /// 1. Ensure the proposed batch has not expired. 1024 /// 2. Verify the signature, ensuring it corresponds to the proposed batch. 1025 /// 3. Store the signature. 1026 /// 4. Certify the batch if enough signatures have been received. 1027 /// 5. Broadcast the batch certificate to all validators. 1028 async fn process_batch_signature_from_peer( 1029 &self, 1030 peer_ip: SocketAddr, 1031 batch_signature: BatchSignature<N>, 1032 ) -> Result<()> { 1033 // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired. 1034 self.check_proposed_batch_for_expiration().await?; 1035 1036 // Retrieve the signature and timestamp. 1037 let BatchSignature { batch_id, signature } = batch_signature; 1038 1039 // Retrieve the signer. 1040 let signer = signature.to_address(); 1041 1042 // Ensure the batch signature is signed by the validator. 1043 if self.gateway.resolve_to_alpha_addr(peer_ip) != Some(signer) { 1044 // Proceed to disconnect the validator. 1045 self.gateway.disconnect(peer_ip); 1046 bail!("Malicious peer - batch signature is from a different validator ({signer})"); 1047 } 1048 // Ensure the batch signature is not from the current primary. 1049 if self.gateway.account().address() == signer { 1050 bail!("Invalid peer - received a batch signature from myself ({signer})"); 1051 } 1052 1053 let self_ = self.clone(); 1054 let Some(proposal) = spawn_blocking!({ 1055 // Acquire the write lock. 1056 let mut proposed_batch = self_.proposed_batch.write(); 1057 // Add the signature to the batch, and determine if the batch is ready to be certified. 1058 match proposed_batch.as_mut() { 1059 Some(proposal) => { 1060 // Ensure the batch ID matches the currently proposed batch ID. 1061 if proposal.batch_id() != batch_id { 1062 match self_.storage.contains_batch(batch_id) { 1063 // If this batch was already certified, return early. 1064 true => { 1065 debug!( 1066 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified", 1067 proposal.round() 1068 ); 1069 return Ok(None); 1070 } 1071 // If the batch ID is unknown, return an error. 1072 false => bail!( 1073 "Unknown batch ID '{batch_id}', expected '{}' for round {}", 1074 proposal.batch_id(), 1075 proposal.round() 1076 ), 1077 } 1078 } 1079 // Retrieve the committee lookback for the round. 1080 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?; 1081 // Retrieve the address of the validator. 1082 let Some(signer) = self_.gateway.resolve_to_alpha_addr(peer_ip) else { 1083 bail!("Signature is from a disconnected validator"); 1084 }; 1085 // Add the signature to the batch. 1086 let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?; 1087 1088 if new_signature { 1089 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round()); 1090 // Check if the batch is ready to be certified. 1091 if !proposal.is_quorum_threshold_reached(&committee_lookback) { 1092 // If the batch is not ready to be certified, return early. 1093 return Ok(None); 1094 } 1095 } else { 1096 debug!( 1097 "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}", 1098 round = proposal.round() 1099 ); 1100 return Ok(None); 1101 } 1102 } 1103 // There is no proposed batch, so return early. 1104 None => return Ok(None), 1105 }; 1106 // Retrieve the batch proposal, clearing the proposed batch. 1107 match proposed_batch.take() { 1108 Some(proposal) => Ok(Some(proposal)), 1109 None => Ok(None), 1110 } 1111 })? 1112 else { 1113 return Ok(()); 1114 }; 1115 1116 /* Proceeding to certify the batch. */ 1117 1118 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round()); 1119 1120 // Retrieve the committee lookback for the round. 1121 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?; 1122 // Store the certified batch and broadcast it to all validators. 1123 // If there was an error storing the certificate, reinsert the transmissions back into the ready queue. 1124 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await { 1125 // Reinsert the transmissions back into the ready queue for the next proposal. 1126 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?; 1127 return Err(e); 1128 } 1129 1130 #[cfg(feature = "metrics")] 1131 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0); 1132 Ok(()) 1133 } 1134 1135 /// Processes a batch certificate from a peer. 1136 /// 1137 /// This method performs the following steps: 1138 /// 1. Stores the given batch certificate, after ensuring it is valid. 1139 /// 2. If there are enough certificates to reach quorum threshold for the current round, 1140 /// then proceed to advance to the next round. 1141 async fn process_batch_certificate_from_peer( 1142 &self, 1143 peer_ip: SocketAddr, 1144 certificate: BatchCertificate<N>, 1145 ) -> Result<()> { 1146 // Ensure the batch certificate is from an authorized validator. 1147 if !self.gateway.is_authorized_validator_ip(peer_ip) { 1148 // Proceed to disconnect the validator. 1149 self.gateway.disconnect(peer_ip); 1150 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})"); 1151 } 1152 // Ensure storage does not already contain the certificate. 1153 if self.storage.contains_certificate(certificate.id()) { 1154 return Ok(()); 1155 // Otherwise, ensure ephemeral storage contains the certificate. 1156 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) { 1157 self.storage.insert_unprocessed_certificate(certificate.clone())?; 1158 } 1159 1160 // Retrieve the batch certificate author. 1161 let author = certificate.author(); 1162 // Retrieve the batch certificate round. 1163 let certificate_round = certificate.round(); 1164 // Retrieve the batch certificate committee ID. 1165 let committee_id = certificate.committee_id(); 1166 1167 // Ensure the batch certificate is not from the current primary. 1168 if self.gateway.account().address() == author { 1169 bail!("Received a batch certificate for myself ({author})"); 1170 } 1171 1172 // Ensure that the incoming certificate is valid. 1173 self.storage.check_incoming_certificate(&certificate)?; 1174 1175 // Store the certificate, after ensuring it is valid above. 1176 // The following call recursively fetches and stores 1177 // the previous certificates referenced from this certificate. 1178 // It is critical to make the following call this after validating the certificate above. 1179 // The reason is that a sequence of malformed certificates, 1180 // with references to previous certificates with non-decreasing rounds, 1181 // cause the recursive fetching of certificates to crash the validator due to resource exhaustion. 1182 // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG 1183 // (i.e. that all the referenced previous certificates are in the DAG before storing this one), 1184 // then all the validity checks in [`Storage::check_certificate`] should be redundant. 1185 // TODO: eliminate those redundant checks 1186 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?; 1187 1188 // If there are enough certificates to reach quorum threshold for the certificate round, 1189 // then proceed to advance to the next round. 1190 1191 // Retrieve the committee lookback. 1192 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?; 1193 1194 // Retrieve the certificate authors. 1195 let authors = self.storage.get_certificate_authors_for_round(certificate_round); 1196 // Check if the certificates have reached the quorum threshold. 1197 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors); 1198 1199 // Ensure that the batch certificate's committee ID matches the expected committee ID. 1200 let expected_committee_id = committee_lookback.id(); 1201 if expected_committee_id != committee_id { 1202 // Proceed to disconnect the validator. 1203 self.gateway.disconnect(peer_ip); 1204 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})"); 1205 } 1206 1207 // Determine if we are currently proposing a round that is relevant. 1208 // Note: This is important, because while our peers have advanced, 1209 // they may not be proposing yet, and thus still able to sign our proposed batch. 1210 let should_advance = match &*self.proposed_batch.read() { 1211 // We advance if the proposal round is less than the current round that was just certified. 1212 Some(proposal) => proposal.round() < certificate_round, 1213 // If there's no proposal, we consider advancing. 1214 None => true, 1215 }; 1216 1217 // Retrieve the current round. 1218 let current_round = self.current_round(); 1219 1220 // Determine whether to advance to the next round. 1221 if is_quorum && should_advance && certificate_round >= current_round { 1222 // If we have reached the quorum threshold and the round should advance, then proceed to the next round. 1223 self.try_increment_to_the_next_round(current_round + 1).await?; 1224 } 1225 Ok(()) 1226 } 1227 } 1228 1229 impl<N: Network> Primary<N> { 1230 /// Starts the primary handlers. 1231 /// 1232 /// For each receiver in the `primary_receiver` struct, there will be a dedicated task 1233 /// that awaits new data and handles it accordingly. 1234 /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically 1235 /// tries to move the the next round of batches. 1236 /// 1237 /// This function is called exactly once, in `Self::run()`. 1238 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) { 1239 let PrimaryReceiver { 1240 mut rx_batch_propose, 1241 mut rx_batch_signature, 1242 mut rx_batch_certified, 1243 mut rx_primary_ping, 1244 mut rx_unconfirmed_solution, 1245 mut rx_unconfirmed_transaction, 1246 } = primary_receiver; 1247 1248 // Start the primary ping sender. 1249 let self_ = self.clone(); 1250 self.spawn(async move { 1251 loop { 1252 // Sleep briefly. 1253 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await; 1254 1255 // Retrieve the block locators. 1256 let self__ = self_.clone(); 1257 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) { 1258 Ok(block_locators) => block_locators, 1259 Err(e) => { 1260 warn!("Failed to retrieve block locators - {e}"); 1261 continue; 1262 } 1263 }; 1264 1265 // Retrieve the latest certificate of the primary. 1266 let primary_certificate = { 1267 // Retrieve the primary address. 1268 let primary_address = self_.gateway.account().address(); 1269 1270 // Iterate backwards from the latest round to find the primary certificate. 1271 let mut certificate = None; 1272 let mut current_round = self_.current_round(); 1273 while certificate.is_none() { 1274 // If the current round is 0, then break the while loop. 1275 if current_round == 0 { 1276 break; 1277 } 1278 // Retrieve the primary certificates. 1279 if let Some(primary_certificate) = 1280 self_.storage.get_certificate_for_round_with_author(current_round, primary_address) 1281 { 1282 certificate = Some(primary_certificate); 1283 // If the primary certificate was not found, decrement the round. 1284 } else { 1285 current_round = current_round.saturating_sub(1); 1286 } 1287 } 1288 1289 // Determine if the primary certificate was found. 1290 match certificate { 1291 Some(certificate) => certificate, 1292 // Skip this iteration of the loop (do not send a primary ping). 1293 None => continue, 1294 } 1295 }; 1296 1297 // Construct the primary ping. 1298 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate)); 1299 // Broadcast the event. 1300 self_.gateway.broadcast(Event::PrimaryPing(primary_ping)); 1301 } 1302 }); 1303 1304 // Start the primary ping handler. 1305 let self_ = self.clone(); 1306 self.spawn(async move { 1307 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await { 1308 // If the primary is not synced, then do not process the primary ping. 1309 if self_.sync.is_synced() { 1310 trace!("Processing new primary ping from '{peer_ip}'"); 1311 } else { 1312 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1313 continue; 1314 } 1315 1316 // Spawn a task to process the primary certificate. 1317 { 1318 let self_ = self_.clone(); 1319 tokio::spawn(async move { 1320 // Deserialize the primary certificate in the primary ping. 1321 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking()) 1322 else { 1323 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'"); 1324 return; 1325 }; 1326 // Process the primary certificate. 1327 let id = fmt_id(primary_certificate.id()); 1328 let round = primary_certificate.round(); 1329 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await { 1330 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}"); 1331 } 1332 }); 1333 } 1334 } 1335 }); 1336 1337 // Start the worker ping(s). 1338 let self_ = self.clone(); 1339 self.spawn(async move { 1340 loop { 1341 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await; 1342 // If the primary is not synced, then do not broadcast the worker ping(s). 1343 if !self_.sync.is_synced() { 1344 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed()); 1345 continue; 1346 } 1347 // Broadcast the worker ping(s). 1348 for worker in self_.workers.iter() { 1349 worker.broadcast_ping(); 1350 } 1351 } 1352 }); 1353 1354 // Start the batch proposer. 1355 let self_ = self.clone(); 1356 self.spawn(async move { 1357 loop { 1358 // Sleep briefly, but longer than if there were no batch. 1359 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; 1360 let current_round = self_.current_round(); 1361 // If the primary is not synced, then do not propose a batch. 1362 if !self_.sync.is_synced() { 1363 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed()); 1364 continue; 1365 } 1366 // A best-effort attempt to skip the scheduled batch proposal if 1367 // round progression already triggered one. 1368 if self_.propose_lock.try_lock().is_err() { 1369 trace!( 1370 "Skipping batch proposal for round {current_round} {}", 1371 "(node is already proposing)".dimmed() 1372 ); 1373 continue; 1374 }; 1375 // If there is no proposed batch, attempt to propose a batch. 1376 // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path, 1377 // and only one batch needs to be proposed at a time. 1378 if let Err(e) = self_.propose_batch().await { 1379 warn!("Cannot propose a batch - {e}"); 1380 } 1381 } 1382 }); 1383 1384 // Start the proposed batch handler. 1385 let self_ = self.clone(); 1386 self.spawn(async move { 1387 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await { 1388 // If the primary is not synced, then do not sign the batch. 1389 if !self_.sync.is_synced() { 1390 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1391 continue; 1392 } 1393 // Spawn a task to process the proposed batch. 1394 let self_ = self_.clone(); 1395 tokio::spawn(async move { 1396 // Process the batch proposal. 1397 let round = batch_propose.round; 1398 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await { 1399 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}"); 1400 } 1401 }); 1402 } 1403 }); 1404 1405 // Start the batch signature handler. 1406 let self_ = self.clone(); 1407 self.spawn(async move { 1408 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await { 1409 // If the primary is not synced, then do not store the signature. 1410 if !self_.sync.is_synced() { 1411 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1412 continue; 1413 } 1414 // Process the batch signature. 1415 // Note: Do NOT spawn a task around this function call. Processing signatures from peers 1416 // is a critical path, and we should only store the minimum required number of signatures. 1417 // In addition, spawning a task can cause concurrent processing of signatures (even with a lock), 1418 // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe. 1419 let id = fmt_id(batch_signature.batch_id); 1420 if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await { 1421 let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'")); 1422 warn!("{}", flatten_error(err)); 1423 } 1424 } 1425 }); 1426 1427 // Start the certified batch handler. 1428 let self_ = self.clone(); 1429 self.spawn(async move { 1430 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await { 1431 // If the primary is not synced, then do not store the certificate. 1432 if !self_.sync.is_synced() { 1433 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1434 continue; 1435 } 1436 // Spawn a task to process the batch certificate. 1437 let self_ = self_.clone(); 1438 tokio::spawn(async move { 1439 // Deserialize the batch certificate. 1440 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else { 1441 warn!("Failed to deserialize the batch certificate from '{peer_ip}'"); 1442 return; 1443 }; 1444 // Process the batch certificate. 1445 let id = fmt_id(batch_certificate.id()); 1446 let round = batch_certificate.round(); 1447 if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await { 1448 warn!( 1449 "{}", 1450 flatten_error(err.context(format!( 1451 "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'" 1452 ))) 1453 ); 1454 } 1455 }); 1456 } 1457 }); 1458 1459 // This task periodically tries to move to the next round. 1460 // 1461 // Note: This is necessary to ensure that the primary is not stuck on a previous round 1462 // despite having received enough certificates to advance to the next round. 1463 let self_ = self.clone(); 1464 self.spawn(async move { 1465 loop { 1466 // Sleep briefly. 1467 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; 1468 // If the primary is not synced, then do not increment to the next round. 1469 if !self_.sync.is_synced() { 1470 trace!("Skipping round increment {}", "(node is syncing)".dimmed()); 1471 continue; 1472 } 1473 // Attempt to increment to the next round. 1474 let current_round = self_.current_round(); 1475 let next_round = current_round.saturating_add(1); 1476 // Determine if the quorum threshold is reached for the current round. 1477 let is_quorum_threshold_reached = { 1478 // Retrieve the certificate authors for the current round. 1479 let authors = self_.storage.get_certificate_authors_for_round(current_round); 1480 // If there are no certificates, then skip this check. 1481 if authors.is_empty() { 1482 continue; 1483 } 1484 // Retrieve the committee lookback for the current round. 1485 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else { 1486 warn!("Failed to retrieve the committee lookback for round {current_round}"); 1487 continue; 1488 }; 1489 // Check if the quorum threshold is reached for the current round. 1490 committee_lookback.is_quorum_threshold_reached(&authors) 1491 }; 1492 // Attempt to increment to the next round if the quorum threshold is reached. 1493 if is_quorum_threshold_reached { 1494 debug!("Quorum threshold reached for round {current_round}"); 1495 if let Err(err) = self_.try_increment_to_the_next_round(next_round).await { 1496 warn!("{}", flatten_error(err.context("Failed to increment to the next round"))); 1497 } 1498 } 1499 } 1500 }); 1501 1502 // Start a handler to process new unconfirmed solutions. 1503 let self_ = self.clone(); 1504 self.spawn(async move { 1505 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await { 1506 // Compute the checksum for the solution. 1507 let Ok(checksum) = solution.to_checksum::<N>() else { 1508 error!("Failed to compute the checksum for the unconfirmed solution"); 1509 continue; 1510 }; 1511 // Compute the worker ID. 1512 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else { 1513 error!("Unable to determine the worker ID for the unconfirmed solution"); 1514 continue; 1515 }; 1516 let self_ = self_.clone(); 1517 tokio::spawn(async move { 1518 // Retrieve the worker. 1519 let worker = &self_.workers[worker_id as usize]; 1520 // Process the unconfirmed solution. 1521 let result = worker.process_unconfirmed_solution(solution_id, solution).await; 1522 // Send the result to the callback. 1523 callback.send(result).ok(); 1524 }); 1525 } 1526 }); 1527 1528 // Start a handler to process new unconfirmed transactions. 1529 let self_ = self.clone(); 1530 self.spawn(async move { 1531 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await { 1532 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id)); 1533 // Compute the checksum for the transaction. 1534 let Ok(checksum) = transaction.to_checksum::<N>() else { 1535 error!("Failed to compute the checksum for the unconfirmed transaction"); 1536 continue; 1537 }; 1538 // Compute the worker ID. 1539 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else { 1540 error!("Unable to determine the worker ID for the unconfirmed transaction"); 1541 continue; 1542 }; 1543 let self_ = self_.clone(); 1544 tokio::spawn(async move { 1545 // Retrieve the worker. 1546 let worker = &self_.workers[worker_id as usize]; 1547 // Process the unconfirmed transaction. 1548 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await; 1549 // Send the result to the callback. 1550 callback.send(result).ok(); 1551 }); 1552 } 1553 }); 1554 } 1555 1556 /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired. 1557 async fn check_proposed_batch_for_expiration(&self) -> Result<()> { 1558 // Check if the proposed batch is timed out or stale. 1559 let is_expired = match self.proposed_batch.read().as_ref() { 1560 Some(proposal) => proposal.round() < self.current_round(), 1561 None => false, 1562 }; 1563 // If the batch is expired, clear the proposed batch. 1564 if is_expired { 1565 // Reset the proposed batch. 1566 let proposal = self.proposed_batch.write().take(); 1567 if let Some(proposal) = proposal { 1568 debug!("Cleared expired proposal for round {}", proposal.round()); 1569 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?; 1570 } 1571 } 1572 Ok(()) 1573 } 1574 1575 /// Increments to the next round. 1576 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> { 1577 // If the next round is within GC range, then iterate to the penultimate round. 1578 if self.current_round() + self.storage.max_gc_rounds() >= next_round { 1579 let mut fast_forward_round = self.current_round(); 1580 // Iterate until the penultimate round is reached. 1581 while fast_forward_round < next_round.saturating_sub(1) { 1582 // Update to the next round in storage. 1583 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?; 1584 // Clear the proposed batch. 1585 *self.proposed_batch.write() = None; 1586 } 1587 } 1588 1589 // Retrieve the current round. 1590 let current_round = self.current_round(); 1591 // Attempt to advance to the next round. 1592 if current_round < next_round { 1593 // If a BFT sender was provided, send the current round to the BFT. 1594 let is_ready = if let Some(bft_sender) = self.bft_sender.get() { 1595 match bft_sender.send_primary_round_to_bft(current_round).await { 1596 Ok(is_ready) => is_ready, 1597 Err(err) => { 1598 let err = err.context("Failed to update the BFT to the next round"); 1599 warn!("{}", flatten_error(&err)); 1600 return Err(err); 1601 } 1602 } 1603 } 1604 // Otherwise, handle the Narwhal case. 1605 else { 1606 // Update to the next round in storage. 1607 self.storage.increment_to_next_round(current_round)?; 1608 // Set 'is_ready' to 'true'. 1609 true 1610 }; 1611 1612 // Log whether the next round is ready. 1613 match is_ready { 1614 true => debug!("Primary is ready to propose the next round"), 1615 false => debug!("Primary is not ready to propose the next round"), 1616 } 1617 1618 // If the node is ready, propose a batch for the next round. 1619 if is_ready { 1620 self.propose_batch().await?; 1621 } 1622 } 1623 Ok(()) 1624 } 1625 1626 /// Ensures the primary is signing for the specified batch round. 1627 /// This method is used to ensure: for a given round, as soon as the primary starts proposing, 1628 /// it will no longer sign for the previous round (as it has enough previous certificates to proceed). 1629 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> { 1630 // Retrieve the current round. 1631 let current_round = self.current_round(); 1632 // Ensure the batch round is within GC range of the current round. 1633 if current_round + self.storage.max_gc_rounds() <= batch_round { 1634 bail!("Round {batch_round} is too far in the future") 1635 } 1636 // Ensure the batch round is at or one before the current round. 1637 // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing, 1638 // so we can still sign for the previous round. If we have started proposing, the next check will fail. 1639 if current_round > batch_round + 1 { 1640 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}") 1641 } 1642 // Check if the primary is still signing for the batch round. 1643 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) { 1644 if signing_round > batch_round { 1645 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}") 1646 } 1647 } 1648 Ok(()) 1649 } 1650 1651 /// Ensure the primary is not creating batch proposals too frequently. 1652 /// This checks that the certificate timestamp for the previous round is within the expected range. 1653 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> { 1654 // Retrieve the timestamp of the previous timestamp to check against. 1655 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) { 1656 // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. 1657 Some(certificate) => certificate.timestamp(), 1658 None => match self.gateway.account().address() == author { 1659 // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. 1660 true => *self.latest_proposed_batch_timestamp.read(), 1661 // If we do not see a previous certificate for the author, then proceed optimistically. 1662 false => return Ok(()), 1663 }, 1664 }; 1665 1666 // Determine the elapsed time since the previous timestamp. 1667 let elapsed = timestamp 1668 .checked_sub(previous_timestamp) 1669 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; 1670 // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. 1671 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 { 1672 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"), 1673 false => Ok(()), 1674 } 1675 } 1676 1677 /// Stores the certified batch and broadcasts it to all validators, returning the certificate. 1678 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> { 1679 // Create the batch certificate and transmissions. 1680 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?; 1681 // Convert the transmissions into a HashMap. 1682 // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety. 1683 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>(); 1684 // Store the certified batch. 1685 let (storage, certificate_) = (self.storage.clone(), certificate.clone()); 1686 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?; 1687 debug!("Stored a batch certificate for round {}", certificate.round()); 1688 // If a BFT sender was provided, send the certificate to the BFT. 1689 if let Some(bft_sender) = self.bft_sender.get() { 1690 // Await the callback to continue. 1691 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await { 1692 let err = err.context("Failed to update the BFT DAG from primary"); 1693 warn!("{}", flatten_error(&err)); 1694 return Err(err); 1695 }; 1696 } 1697 // Broadcast the certified batch to all validators. 1698 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into())); 1699 // Log the certified batch. 1700 let num_transmissions = certificate.transmission_ids().len(); 1701 let round = certificate.round(); 1702 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n"); 1703 // Increment to the next round. 1704 self.try_increment_to_the_next_round(round + 1).await 1705 } 1706 1707 /// Inserts the missing transmissions from the proposal into the workers. 1708 fn insert_missing_transmissions_into_workers( 1709 &self, 1710 peer_ip: SocketAddr, 1711 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>, 1712 ) -> Result<()> { 1713 // Insert the transmissions into the workers. 1714 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| { 1715 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission); 1716 }) 1717 } 1718 1719 /// Re-inserts the transmissions from the proposal into the workers. 1720 fn reinsert_transmissions_into_workers( 1721 &self, 1722 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>, 1723 ) -> Result<()> { 1724 // Re-insert the transmissions into the workers. 1725 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| { 1726 worker.reinsert(transmission_id, transmission); 1727 }) 1728 } 1729 1730 /// Recursively stores a given batch certificate, after ensuring: 1731 /// - Ensure the round matches the committee round. 1732 /// - Ensure the address is a member of the committee. 1733 /// - Ensure the timestamp is within range. 1734 /// - Ensure we have all of the transmissions. 1735 /// - Ensure we have all of the previous certificates. 1736 /// - Ensure the previous certificates are for the previous round (i.e. round - 1). 1737 /// - Ensure the previous certificates have reached the quorum threshold. 1738 /// - Ensure we have not already signed the batch ID. 1739 #[async_recursion::async_recursion] 1740 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>( 1741 &self, 1742 peer_ip: SocketAddr, 1743 certificate: BatchCertificate<N>, 1744 ) -> Result<()> { 1745 // Retrieve the batch header. 1746 let batch_header = certificate.batch_header(); 1747 // Retrieve the batch round. 1748 let batch_round = batch_header.round(); 1749 1750 // If the certificate round is outdated, do not store it. 1751 if batch_round <= self.storage.gc_round() { 1752 return Ok(()); 1753 } 1754 // If the certificate already exists in storage, return early. 1755 if self.storage.contains_certificate(certificate.id()) { 1756 return Ok(()); 1757 } 1758 1759 // If node is not in sync mode and the node is not synced. Then return an error. 1760 if !IS_SYNCING && !self.is_synced() { 1761 bail!( 1762 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)", 1763 fmt_id(certificate.id()) 1764 ); 1765 } 1766 1767 // If the peer is ahead, use the batch header to sync up to the peer. 1768 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?; 1769 1770 // Check if the certificate needs to be stored. 1771 if !self.storage.contains_certificate(certificate.id()) { 1772 // Store the batch certificate. 1773 let (storage, certificate_) = (self.storage.clone(), certificate.clone()); 1774 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?; 1775 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'"); 1776 // If a BFT sender was provided, send the round and certificate to the BFT. 1777 if let Some(bft_sender) = self.bft_sender.get() { 1778 // Send the certificate to the BFT. 1779 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await { 1780 let err = err.context("Failed to update the BFT DAG from sync"); 1781 warn!("{}", &flatten_error(&err)); 1782 return Err(err); 1783 }; 1784 } 1785 } 1786 Ok(()) 1787 } 1788 1789 /// Recursively syncs using the given batch header. 1790 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>( 1791 &self, 1792 peer_ip: SocketAddr, 1793 batch_header: &BatchHeader<N>, 1794 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> { 1795 // Retrieve the batch round. 1796 let batch_round = batch_header.round(); 1797 1798 // If the certificate round is outdated, do not store it. 1799 if batch_round <= self.storage.gc_round() { 1800 bail!("Round {batch_round} is too far in the past") 1801 } 1802 1803 // If node is not in sync mode and the node is not synced, then return an error. 1804 if !IS_SYNCING && !self.is_synced() { 1805 bail!( 1806 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)", 1807 fmt_id(batch_header.batch_id()) 1808 ); 1809 } 1810 1811 // Determine if quorum threshold is reached on the batch round. 1812 let is_quorum_threshold_reached = { 1813 let authors = self.storage.get_certificate_authors_for_round(batch_round); 1814 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?; 1815 committee_lookback.is_quorum_threshold_reached(&authors) 1816 }; 1817 1818 // Check if our primary should move to the next round. 1819 // Note: Checking that quorum threshold is reached is important for mitigating a race condition, 1820 // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary 1821 // will advance to the next round assuming f+1, not N-f, which can lead to a network stall. 1822 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round(); 1823 // Check if our primary is far behind the peer. 1824 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds(); 1825 // If our primary is far behind the peer, update our committee to the batch round. 1826 if is_behind_schedule || is_peer_far_in_future { 1827 // If the batch round is greater than the current committee round, update the committee. 1828 self.try_increment_to_the_next_round(batch_round).await?; 1829 } 1830 1831 // Ensure the primary has all of the transmissions. 1832 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header); 1833 1834 // Ensure the primary has all of the previous certificates. 1835 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header); 1836 1837 // Wait for the missing transmissions and previous certificates to be fetched. 1838 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!( 1839 missing_transmissions_handle, 1840 missing_previous_certificates_handle, 1841 ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?; 1842 1843 // Iterate through the missing previous certificates. 1844 for batch_certificate in missing_previous_certificates { 1845 // Store the batch certificate (recursively fetching any missing previous certificates). 1846 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?; 1847 } 1848 Ok(missing_transmissions) 1849 } 1850 1851 /// Fetches any missing transmissions for the specified batch header. 1852 /// If a transmission does not exist, it will be fetched from the specified peer IP. 1853 async fn fetch_missing_transmissions( 1854 &self, 1855 peer_ip: SocketAddr, 1856 batch_header: &BatchHeader<N>, 1857 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> { 1858 // If the round is <= the GC round, return early. 1859 if batch_header.round() <= self.storage.gc_round() { 1860 return Ok(Default::default()); 1861 } 1862 1863 // Ensure this batch ID is new, otherwise return early. 1864 if self.storage.contains_batch(batch_header.batch_id()) { 1865 trace!("Batch for round {} from peer has already been processed", batch_header.round()); 1866 return Ok(Default::default()); 1867 } 1868 1869 // Retrieve the workers. 1870 let workers = self.workers.clone(); 1871 1872 // Initialize a list for the transmissions. 1873 let mut fetch_transmissions = FuturesUnordered::new(); 1874 1875 // Retrieve the number of workers. 1876 let num_workers = self.num_workers(); 1877 // Iterate through the transmission IDs. 1878 for transmission_id in batch_header.transmission_ids() { 1879 // If the transmission does not exist in storage, proceed to fetch the transmission. 1880 if !self.storage.contains_transmission(*transmission_id) { 1881 // Determine the worker ID. 1882 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else { 1883 bail!("Unable to assign transmission ID '{transmission_id}' to a worker") 1884 }; 1885 // Retrieve the worker. 1886 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") }; 1887 // Push the callback onto the list. 1888 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id)); 1889 } 1890 } 1891 1892 // Initialize a set for the transmissions. 1893 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len()); 1894 // Wait for all of the transmissions to be fetched. 1895 while let Some(result) = fetch_transmissions.next().await { 1896 // Retrieve the transmission. 1897 let (transmission_id, transmission) = result?; 1898 // Insert the transmission into the set. 1899 transmissions.insert(transmission_id, transmission); 1900 } 1901 // Return the transmissions. 1902 Ok(transmissions) 1903 } 1904 1905 /// Fetches any missing previous certificates for the specified batch header from the specified peer. 1906 async fn fetch_missing_previous_certificates( 1907 &self, 1908 peer_ip: SocketAddr, 1909 batch_header: &BatchHeader<N>, 1910 ) -> Result<HashSet<BatchCertificate<N>>> { 1911 // Retrieve the round. 1912 let round = batch_header.round(); 1913 // If the previous round is 0, or is <= the GC round, return early. 1914 if round == 1 || round <= self.storage.gc_round() + 1 { 1915 return Ok(Default::default()); 1916 } 1917 1918 // Fetch the missing previous certificates. 1919 let missing_previous_certificates = 1920 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?; 1921 if !missing_previous_certificates.is_empty() { 1922 debug!( 1923 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'", 1924 missing_previous_certificates.len(), 1925 ); 1926 } 1927 // Return the missing previous certificates. 1928 Ok(missing_previous_certificates) 1929 } 1930 1931 /// Fetches any missing certificates for the specified batch header from the specified peer. 1932 async fn fetch_missing_certificates( 1933 &self, 1934 peer_ip: SocketAddr, 1935 round: u64, 1936 certificate_ids: &IndexSet<Field<N>>, 1937 ) -> Result<HashSet<BatchCertificate<N>>> { 1938 // Initialize a list for the missing certificates. 1939 let mut fetch_certificates = FuturesUnordered::new(); 1940 // Initialize a set for the missing certificates. 1941 let mut missing_certificates = HashSet::default(); 1942 // Iterate through the certificate IDs. 1943 for certificate_id in certificate_ids { 1944 // Check if the certificate already exists in the ledger. 1945 if self.ledger.contains_certificate(certificate_id)? { 1946 continue; 1947 } 1948 // Check if the certificate already exists in storage. 1949 if self.storage.contains_certificate(*certificate_id) { 1950 continue; 1951 } 1952 // If we have not fully processed the certificate yet, store it. 1953 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) { 1954 missing_certificates.insert(certificate); 1955 } else { 1956 // If we do not have the certificate, request it. 1957 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'"); 1958 // TODO (howardwu): Limit the number of open requests we send to a peer. 1959 // Send an certificate request to the peer. 1960 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id)); 1961 } 1962 } 1963 1964 // If there are no certificates to fetch, return early with the existing unprocessed certificates. 1965 match fetch_certificates.is_empty() { 1966 true => return Ok(missing_certificates), 1967 false => trace!( 1968 "Fetching {} missing certificates for round {round} from '{peer_ip}'...", 1969 fetch_certificates.len(), 1970 ), 1971 } 1972 1973 // Wait for all of the missing certificates to be fetched. 1974 while let Some(result) = fetch_certificates.next().await { 1975 // Insert the missing certificate into the set. 1976 missing_certificates.insert(result?); 1977 } 1978 // Return the missing certificates. 1979 Ok(missing_certificates) 1980 } 1981 } 1982 1983 impl<N: Network> Primary<N> { 1984 /// Spawns a task with the given future; it should only be used for long-running tasks. 1985 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 1986 self.handles.lock().push(tokio::spawn(future)); 1987 } 1988 1989 /// Shuts down the primary. 1990 pub async fn shut_down(&self) { 1991 info!("Shutting down the primary..."); 1992 // Shut down the workers. 1993 self.workers.iter().for_each(|worker| worker.shut_down()); 1994 // Abort the tasks. 1995 self.handles.lock().iter().for_each(|handle| handle.abort()); 1996 // Save the current proposal cache to disk. 1997 let proposal_cache = { 1998 let proposal = self.proposed_batch.write().take(); 1999 let signed_proposals = self.signed_proposals.read().clone(); 2000 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await); 2001 let pending_certificates = self.storage.get_pending_certificates(); 2002 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates) 2003 }; 2004 if let Err(err) = proposal_cache.store(&self.storage_mode) { 2005 error!("{}", flatten_error(err.context("Failed to store the current proposal cache"))); 2006 } 2007 // Close the gateway. 2008 self.gateway.shut_down().await; 2009 } 2010 } 2011 2012 #[cfg(test)] 2013 mod tests { 2014 use super::*; 2015 use alphaos_node_bft_ledger_service::MockLedgerService; 2016 use alphaos_node_bft_storage_service::BFTMemoryService; 2017 use alphaos_node_sync::{locators::test_helpers::sample_block_locators, BlockSync}; 2018 use alphavm::{ 2019 ledger::{ 2020 committee::{Committee, MIN_VALIDATOR_STAKE}, 2021 test_helpers::sample_execution_transaction_with_fee, 2022 }, 2023 prelude::{Address, Signature}, 2024 }; 2025 2026 use bytes::Bytes; 2027 use indexmap::IndexSet; 2028 use rand::RngCore; 2029 2030 type CurrentNetwork = alphavm::prelude::MainnetV0; 2031 2032 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) { 2033 // Create a committee containing the primary's account. 2034 const COMMITTEE_SIZE: usize = 4; 2035 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE); 2036 let mut members = IndexMap::new(); 2037 2038 for i in 0..COMMITTEE_SIZE { 2039 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap(); 2040 let account = Account::new(rng).unwrap(); 2041 2042 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100))); 2043 accounts.push((socket_addr, account)); 2044 } 2045 2046 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap()) 2047 } 2048 2049 // Returns a primary and a list of accounts in the configured committee. 2050 fn primary_with_committee( 2051 account_index: usize, 2052 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2053 committee: Committee<CurrentNetwork>, 2054 height: u32, 2055 ) -> Primary<CurrentNetwork> { 2056 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height)); 2057 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10); 2058 2059 // Initialize the primary. 2060 let account = accounts[account_index].1.clone(); 2061 let block_sync = Arc::new(BlockSync::new(ledger.clone())); 2062 let mut primary = 2063 Primary::new(account, storage, ledger, block_sync, None, &[], false, StorageMode::new_test(None), None) 2064 .unwrap(); 2065 2066 // Construct a worker instance. 2067 primary.workers = Arc::from([Worker::new( 2068 0, // id 2069 Arc::new(primary.gateway.clone()), 2070 primary.storage.clone(), 2071 primary.ledger.clone(), 2072 primary.proposed_batch.clone(), 2073 ) 2074 .unwrap()]); 2075 for a in accounts.iter().skip(account_index) { 2076 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address()); 2077 } 2078 2079 primary 2080 } 2081 2082 fn primary_without_handlers( 2083 rng: &mut TestRng, 2084 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) { 2085 let (accounts, committee) = sample_committee(rng); 2086 let primary = primary_with_committee( 2087 0, // index of primary's account 2088 &accounts, 2089 committee, 2090 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(), 2091 ); 2092 2093 (primary, accounts) 2094 } 2095 2096 // Creates a mock solution. 2097 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) { 2098 // Sample a random fake solution ID. 2099 let solution_id = rng.r#gen::<u64>().into(); 2100 // Vary the size of the solutions. 2101 let size = rng.gen_range(1024..10 * 1024); 2102 // Sample random fake solution bytes. 2103 let mut vec = vec![0u8; size]; 2104 rng.fill_bytes(&mut vec); 2105 let solution = Data::Buffer(Bytes::from(vec)); 2106 // Return the solution ID and solution. 2107 (solution_id, solution) 2108 } 2109 2110 // Samples a test transaction. 2111 fn sample_unconfirmed_transaction( 2112 rng: &mut TestRng, 2113 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) { 2114 let transaction = sample_execution_transaction_with_fee(false, rng, 0); 2115 let id = transaction.id(); 2116 2117 (id, Data::Object(transaction)) 2118 } 2119 2120 // Creates a batch proposal with one solution and one transaction. 2121 fn create_test_proposal( 2122 author: &Account<CurrentNetwork>, 2123 committee: Committee<CurrentNetwork>, 2124 round: u64, 2125 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>, 2126 timestamp: i64, 2127 num_transactions: u64, 2128 rng: &mut TestRng, 2129 ) -> Proposal<CurrentNetwork> { 2130 let mut transmission_ids = IndexSet::new(); 2131 let mut transmissions = IndexMap::new(); 2132 2133 // Prepare the solution and insert into the sets. 2134 let (solution_id, solution) = sample_unconfirmed_solution(rng); 2135 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 2136 let solution_transmission_id = (solution_id, solution_checksum).into(); 2137 transmission_ids.insert(solution_transmission_id); 2138 transmissions.insert(solution_transmission_id, Transmission::Solution(solution)); 2139 2140 // Prepare the transactions and insert into the sets. 2141 for _ in 0..num_transactions { 2142 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng); 2143 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap(); 2144 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into(); 2145 transmission_ids.insert(transaction_transmission_id); 2146 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction)); 2147 } 2148 2149 // Retrieve the private key. 2150 let private_key = author.private_key(); 2151 // Sign the batch header. 2152 let batch_header = BatchHeader::new( 2153 private_key, 2154 round, 2155 timestamp, 2156 committee.id(), 2157 transmission_ids, 2158 previous_certificate_ids, 2159 rng, 2160 ) 2161 .unwrap(); 2162 // Construct the proposal. 2163 Proposal::new(committee, batch_header, transmissions).unwrap() 2164 } 2165 2166 // Creates a signature of the primary's current proposal for each committee member (excluding 2167 // the primary). 2168 fn peer_signatures_for_proposal( 2169 primary: &Primary<CurrentNetwork>, 2170 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2171 rng: &mut TestRng, 2172 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> { 2173 // Each committee member signs the batch. 2174 let mut signatures = Vec::with_capacity(accounts.len() - 1); 2175 for (socket_addr, account) in accounts { 2176 if account.address() == primary.gateway.account().address() { 2177 continue; 2178 } 2179 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id(); 2180 let signature = account.sign(&[batch_id], rng).unwrap(); 2181 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature))); 2182 } 2183 2184 signatures 2185 } 2186 2187 /// Creates a signature of the batch ID for each committee member (excluding the primary). 2188 fn peer_signatures_for_batch( 2189 primary_address: Address<CurrentNetwork>, 2190 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2191 batch_id: Field<CurrentNetwork>, 2192 rng: &mut TestRng, 2193 ) -> IndexSet<Signature<CurrentNetwork>> { 2194 let mut signatures = IndexSet::new(); 2195 for (_, account) in accounts { 2196 if account.address() == primary_address { 2197 continue; 2198 } 2199 let signature = account.sign(&[batch_id], rng).unwrap(); 2200 signatures.insert(signature); 2201 } 2202 signatures 2203 } 2204 2205 // Creates a batch certificate. 2206 fn create_batch_certificate( 2207 primary_address: Address<CurrentNetwork>, 2208 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2209 round: u64, 2210 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>, 2211 rng: &mut TestRng, 2212 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) { 2213 let timestamp = now(); 2214 2215 let author = 2216 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap(); 2217 let private_key = author.private_key(); 2218 2219 let committee_id = Field::rand(rng); 2220 let (solution_id, solution) = sample_unconfirmed_solution(rng); 2221 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng); 2222 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 2223 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap(); 2224 2225 let solution_transmission_id = (solution_id, solution_checksum).into(); 2226 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into(); 2227 2228 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into(); 2229 let transmissions = [ 2230 (solution_transmission_id, Transmission::Solution(solution)), 2231 (transaction_transmission_id, Transmission::Transaction(transaction)), 2232 ] 2233 .into(); 2234 2235 let batch_header = BatchHeader::new( 2236 private_key, 2237 round, 2238 timestamp, 2239 committee_id, 2240 transmission_ids, 2241 previous_certificate_ids, 2242 rng, 2243 ) 2244 .unwrap(); 2245 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng); 2246 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap(); 2247 (certificate, transmissions) 2248 } 2249 2250 // Create a certificate chain up to, but not including, the specified round in the primary storage. 2251 fn store_certificate_chain( 2252 primary: &Primary<CurrentNetwork>, 2253 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2254 round: u64, 2255 rng: &mut TestRng, 2256 ) -> IndexSet<Field<CurrentNetwork>> { 2257 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new(); 2258 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new(); 2259 for cur_round in 1..round { 2260 for (_, account) in accounts.iter() { 2261 let (certificate, transmissions) = create_batch_certificate( 2262 account.address(), 2263 accounts, 2264 cur_round, 2265 previous_certificates.clone(), 2266 rng, 2267 ); 2268 next_certificates.insert(certificate.id()); 2269 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok()); 2270 } 2271 2272 assert!(primary.storage.increment_to_next_round(cur_round).is_ok()); 2273 previous_certificates = next_certificates; 2274 next_certificates = IndexSet::<Field<CurrentNetwork>>::new(); 2275 } 2276 2277 previous_certificates 2278 } 2279 2280 // Insert the account socket addresses into the resolver so that 2281 // they are recognized as "connected". 2282 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) { 2283 // First account is primary, which doesn't need to resolve. 2284 for (addr, acct) in accounts.iter().skip(1) { 2285 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address())); 2286 } 2287 } 2288 2289 #[tokio::test] 2290 async fn test_propose_batch() { 2291 let mut rng = TestRng::default(); 2292 let (primary, _) = primary_without_handlers(&mut rng); 2293 2294 // Check there is no batch currently proposed. 2295 assert!(primary.proposed_batch.read().is_none()); 2296 2297 // Generate a solution and a transaction. 2298 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2299 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2300 2301 // Store it on one of the workers. 2302 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2303 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2304 2305 // Try to propose a batch again. This time, it should succeed. 2306 assert!(primary.propose_batch().await.is_ok()); 2307 assert!(primary.proposed_batch.read().is_some()); 2308 } 2309 2310 #[tokio::test] 2311 async fn test_propose_batch_with_no_transmissions() { 2312 let mut rng = TestRng::default(); 2313 let (primary, _) = primary_without_handlers(&mut rng); 2314 2315 // Check there is no batch currently proposed. 2316 assert!(primary.proposed_batch.read().is_none()); 2317 2318 // Try to propose a batch with no transmissions. 2319 assert!(primary.propose_batch().await.is_ok()); 2320 assert!(primary.proposed_batch.read().is_some()); 2321 } 2322 2323 #[tokio::test] 2324 async fn test_propose_batch_in_round() { 2325 let round = 3; 2326 let mut rng = TestRng::default(); 2327 let (primary, accounts) = primary_without_handlers(&mut rng); 2328 2329 // Fill primary storage. 2330 store_certificate_chain(&primary, &accounts, round, &mut rng); 2331 2332 // Sleep for a while to ensure the primary is ready to propose the next round. 2333 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; 2334 2335 // Generate a solution and a transaction. 2336 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2337 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2338 2339 // Store it on one of the workers. 2340 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2341 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2342 2343 // Propose a batch again. This time, it should succeed. 2344 assert!(primary.propose_batch().await.is_ok()); 2345 assert!(primary.proposed_batch.read().is_some()); 2346 } 2347 2348 #[tokio::test] 2349 async fn test_propose_batch_skip_transmissions_from_previous_certificates() { 2350 let round = 3; 2351 let prev_round = round - 1; 2352 let mut rng = TestRng::default(); 2353 let (primary, accounts) = primary_without_handlers(&mut rng); 2354 let peer_account = &accounts[1]; 2355 let peer_ip = peer_account.0; 2356 2357 // Fill primary storage. 2358 store_certificate_chain(&primary, &accounts, round, &mut rng); 2359 2360 // Get transmissions from previous certificates. 2361 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round); 2362 2363 // Track the number of transmissions in the previous round. 2364 let mut num_transmissions_in_previous_round = 0; 2365 2366 // Generate a solution and a transaction. 2367 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); 2368 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2369 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 2370 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap(); 2371 2372 // Store it on one of the workers. 2373 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); 2374 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2375 2376 // Check that the worker has 2 transmissions. 2377 assert_eq!(primary.workers[0].num_transmissions(), 2); 2378 2379 // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage. 2380 for (_, account) in accounts.iter() { 2381 let (certificate, transmissions) = create_batch_certificate( 2382 account.address(), 2383 &accounts, 2384 round, 2385 previous_certificate_ids.clone(), 2386 &mut rng, 2387 ); 2388 2389 // Add the transmissions to the worker. 2390 for (transmission_id, transmission) in transmissions.iter() { 2391 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 2392 } 2393 2394 // Insert the certificate to storage. 2395 num_transmissions_in_previous_round += transmissions.len(); 2396 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap(); 2397 } 2398 2399 // Sleep for a while to ensure the primary is ready to propose the next round. 2400 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; 2401 2402 // Advance to the next round. 2403 assert!(primary.storage.increment_to_next_round(round).is_ok()); 2404 2405 // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions. 2406 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2); 2407 2408 // Propose the batch. 2409 assert!(primary.propose_batch().await.is_ok()); 2410 2411 // Check that the proposal only contains the new transmissions that were not in previous certificates. 2412 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone(); 2413 assert_eq!(proposed_transmissions.len(), 2); 2414 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum))); 2415 assert!(proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))); 2416 } 2417 2418 #[tokio::test] 2419 async fn test_propose_batch_over_spend_limit() { 2420 let mut rng = TestRng::default(); 2421 2422 // Create a primary to test spend limit backwards compatibility with V4. 2423 let (accounts, committee) = sample_committee(&mut rng); 2424 let primary = primary_with_committee( 2425 0, 2426 &accounts, 2427 committee.clone(), 2428 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(), 2429 ); 2430 2431 // Check there is no batch currently proposed. 2432 assert!(primary.proposed_batch.read().is_none()); 2433 // Check the workers are empty. 2434 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty())); 2435 2436 // Generate a solution and transactions. 2437 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2438 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2439 2440 for _i in 0..5 { 2441 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2442 // Store it on one of the workers. 2443 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2444 } 2445 2446 // Try to propose a batch again. This time, it should succeed. 2447 assert!(primary.propose_batch().await.is_ok()); 2448 // Expect 2/5 transactions to be included in the proposal in addition to the solution. 2449 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3); 2450 // Check the transmissions were correctly drained from the workers. 2451 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3); 2452 } 2453 2454 #[tokio::test] 2455 async fn test_batch_propose_from_peer() { 2456 let mut rng = TestRng::default(); 2457 let (primary, accounts) = primary_without_handlers(&mut rng); 2458 2459 // Create a valid proposal with an author that isn't the primary. 2460 let round = 1; 2461 let peer_account = &accounts[1]; 2462 let peer_ip = peer_account.0; 2463 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2464 let proposal = create_test_proposal( 2465 &peer_account.1, 2466 primary.ledger.current_committee().unwrap(), 2467 round, 2468 Default::default(), 2469 timestamp, 2470 1, 2471 &mut rng, 2472 ); 2473 2474 // Make sure the primary is aware of the transmissions in the proposal. 2475 for (transmission_id, transmission) in proposal.transmissions() { 2476 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2477 } 2478 2479 // The author must be known to resolver to pass propose checks. 2480 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2481 2482 // The primary will only consider itself synced if we received 2483 // block locators from a peer. 2484 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2485 primary.sync.testing_only_try_block_sync_testing_only().await; 2486 2487 // Try to process the batch proposal from the peer, should succeed. 2488 assert!(primary 2489 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) 2490 .await 2491 .is_ok()); 2492 } 2493 2494 #[tokio::test] 2495 async fn test_batch_propose_from_peer_when_not_synced() { 2496 let mut rng = TestRng::default(); 2497 let (primary, accounts) = primary_without_handlers(&mut rng); 2498 2499 // Create a valid proposal with an author that isn't the primary. 2500 let round = 1; 2501 let peer_account = &accounts[1]; 2502 let peer_ip = peer_account.0; 2503 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2504 let proposal = create_test_proposal( 2505 &peer_account.1, 2506 primary.ledger.current_committee().unwrap(), 2507 round, 2508 Default::default(), 2509 timestamp, 2510 1, 2511 &mut rng, 2512 ); 2513 2514 // Make sure the primary is aware of the transmissions in the proposal. 2515 for (transmission_id, transmission) in proposal.transmissions() { 2516 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2517 } 2518 2519 // The author must be known to resolver to pass propose checks. 2520 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2521 2522 // Add a high block locator to indicate we are not synced. 2523 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap(); 2524 2525 // Try to process the batch proposal from the peer, should fail 2526 assert!(primary 2527 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) 2528 .await 2529 .is_err()); 2530 } 2531 2532 #[tokio::test] 2533 async fn test_batch_propose_from_peer_in_round() { 2534 let round = 2; 2535 let mut rng = TestRng::default(); 2536 let (primary, accounts) = primary_without_handlers(&mut rng); 2537 2538 // Generate certificates. 2539 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2540 2541 // Create a valid proposal with an author that isn't the primary. 2542 let peer_account = &accounts[1]; 2543 let peer_ip = peer_account.0; 2544 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2545 let proposal = create_test_proposal( 2546 &peer_account.1, 2547 primary.ledger.current_committee().unwrap(), 2548 round, 2549 previous_certificates, 2550 timestamp, 2551 1, 2552 &mut rng, 2553 ); 2554 2555 // Make sure the primary is aware of the transmissions in the proposal. 2556 for (transmission_id, transmission) in proposal.transmissions() { 2557 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2558 } 2559 2560 // The author must be known to resolver to pass propose checks. 2561 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2562 2563 // The primary will only consider itself synced if we received 2564 // block locators from a peer. 2565 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2566 primary.sync.testing_only_try_block_sync_testing_only().await; 2567 2568 // Try to process the batch proposal from the peer, should succeed. 2569 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap(); 2570 } 2571 2572 #[tokio::test] 2573 async fn test_batch_propose_from_peer_wrong_round() { 2574 let mut rng = TestRng::default(); 2575 let (primary, accounts) = primary_without_handlers(&mut rng); 2576 2577 // Create a valid proposal with an author that isn't the primary. 2578 let round = 1; 2579 let peer_account = &accounts[1]; 2580 let peer_ip = peer_account.0; 2581 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2582 let proposal = create_test_proposal( 2583 &peer_account.1, 2584 primary.ledger.current_committee().unwrap(), 2585 round, 2586 Default::default(), 2587 timestamp, 2588 1, 2589 &mut rng, 2590 ); 2591 2592 // Make sure the primary is aware of the transmissions in the proposal. 2593 for (transmission_id, transmission) in proposal.transmissions() { 2594 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2595 } 2596 2597 // The author must be known to resolver to pass propose checks. 2598 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2599 // The primary must be considered synced. 2600 primary.sync.testing_only_try_block_sync_testing_only().await; 2601 2602 // Try to process the batch proposal from the peer, should error. 2603 assert!(primary 2604 .process_batch_propose_from_peer(peer_ip, BatchPropose { 2605 round: round + 1, 2606 batch_header: Data::Object(proposal.batch_header().clone()) 2607 }) 2608 .await 2609 .is_err()); 2610 } 2611 2612 #[tokio::test] 2613 async fn test_batch_propose_from_peer_in_round_wrong_round() { 2614 let round = 4; 2615 let mut rng = TestRng::default(); 2616 let (primary, accounts) = primary_without_handlers(&mut rng); 2617 2618 // Generate certificates. 2619 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2620 2621 // Create a valid proposal with an author that isn't the primary. 2622 let peer_account = &accounts[1]; 2623 let peer_ip = peer_account.0; 2624 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2625 let proposal = create_test_proposal( 2626 &peer_account.1, 2627 primary.ledger.current_committee().unwrap(), 2628 round, 2629 previous_certificates, 2630 timestamp, 2631 1, 2632 &mut rng, 2633 ); 2634 2635 // Make sure the primary is aware of the transmissions in the proposal. 2636 for (transmission_id, transmission) in proposal.transmissions() { 2637 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2638 } 2639 2640 // The author must be known to resolver to pass propose checks. 2641 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2642 // The primary must be considered synced. 2643 primary.sync.testing_only_try_block_sync_testing_only().await; 2644 2645 // Try to process the batch proposal from the peer, should error. 2646 assert!(primary 2647 .process_batch_propose_from_peer(peer_ip, BatchPropose { 2648 round: round + 1, 2649 batch_header: Data::Object(proposal.batch_header().clone()) 2650 }) 2651 .await 2652 .is_err()); 2653 } 2654 2655 /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected. 2656 #[tokio::test] 2657 async fn test_batch_propose_from_peer_with_past_timestamp() { 2658 let round = 2; 2659 let mut rng = TestRng::default(); 2660 let (primary, accounts) = primary_without_handlers(&mut rng); 2661 2662 // Generate certificates. 2663 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2664 2665 // Create a valid proposal with an author that isn't the primary. 2666 let peer_account = &accounts[1]; 2667 let peer_ip = peer_account.0; 2668 2669 // Use a timestamp that is too early. 2670 // Set it to something that is less than the minimum batch delay 2671 // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp 2672 let last_timestamp = primary 2673 .storage 2674 .get_certificate_for_round_with_author(round - 1, peer_account.1.address()) 2675 .expect("No previous proposal exists") 2676 .timestamp(); 2677 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1; 2678 2679 let proposal = create_test_proposal( 2680 &peer_account.1, 2681 primary.ledger.current_committee().unwrap(), 2682 round, 2683 previous_certificates, 2684 invalid_timestamp, 2685 1, 2686 &mut rng, 2687 ); 2688 2689 // Make sure the primary is aware of the transmissions in the proposal. 2690 for (transmission_id, transmission) in proposal.transmissions() { 2691 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2692 } 2693 2694 // The author must be known to resolver to pass propose checks. 2695 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2696 // The primary must be considered synced. 2697 primary.sync.testing_only_try_block_sync_testing_only().await; 2698 2699 // Try to process the batch proposal from the peer, should error. 2700 assert!(primary 2701 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) 2702 .await 2703 .is_err()); 2704 } 2705 2706 /// Check that proposals rejected that have timestamps older than the previous proposal. 2707 #[tokio::test] 2708 async fn test_batch_propose_from_peer_over_spend_limit() { 2709 let mut rng = TestRng::default(); 2710 2711 // Create two primaries to test spend limit activation on V5. 2712 let (accounts, committee) = sample_committee(&mut rng); 2713 let primary_v4 = primary_with_committee( 2714 0, 2715 &accounts, 2716 committee.clone(), 2717 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(), 2718 ); 2719 let primary_v5 = primary_with_committee( 2720 1, 2721 &accounts, 2722 committee.clone(), 2723 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(), 2724 ); 2725 2726 // Create a valid proposal with an author that isn't the primary. 2727 let round = 1; 2728 let peer_account = &accounts[2]; 2729 let peer_ip = peer_account.0; 2730 2731 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2732 2733 let proposal = 2734 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng); 2735 2736 // Make sure the primary is aware of the transmissions in the proposal. 2737 for (transmission_id, transmission) in proposal.transmissions() { 2738 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 2739 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 2740 } 2741 2742 // The author must be known to resolver to pass propose checks. 2743 primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2744 primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2745 2746 // primary v4 must be considered synced. 2747 primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2748 primary_v4.sync.testing_only_try_block_sync_testing_only().await; 2749 2750 // primary v5 must be ocnsidered synced. 2751 primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2752 primary_v5.sync.testing_only_try_block_sync_testing_only().await; 2753 2754 // Check the spend limit is enforced from V5 onwards. 2755 assert!(primary_v4 2756 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) 2757 .await 2758 .is_ok()); 2759 2760 assert!(primary_v5 2761 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) 2762 .await 2763 .is_err()); 2764 } 2765 2766 #[tokio::test] 2767 async fn test_propose_batch_with_storage_round_behind_proposal_lock() { 2768 let round = 3; 2769 let mut rng = TestRng::default(); 2770 let (primary, _) = primary_without_handlers(&mut rng); 2771 2772 // Check there is no batch currently proposed. 2773 assert!(primary.proposed_batch.read().is_none()); 2774 2775 // Generate a solution and a transaction. 2776 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2777 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2778 2779 // Store it on one of the workers. 2780 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2781 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2782 2783 // Set the proposal lock to a round ahead of the storage. 2784 let old_proposal_lock_round = *primary.propose_lock.lock().await; 2785 *primary.propose_lock.lock().await = round + 1; 2786 2787 // Propose a batch and enforce that it fails. 2788 assert!(primary.propose_batch().await.is_ok()); 2789 assert!(primary.proposed_batch.read().is_none()); 2790 2791 // Set the proposal lock back to the old round. 2792 *primary.propose_lock.lock().await = old_proposal_lock_round; 2793 2794 // Try to propose a batch again. This time, it should succeed. 2795 assert!(primary.propose_batch().await.is_ok()); 2796 assert!(primary.proposed_batch.read().is_some()); 2797 } 2798 2799 #[tokio::test] 2800 async fn test_propose_batch_with_storage_round_behind_proposal() { 2801 let round = 5; 2802 let mut rng = TestRng::default(); 2803 let (primary, accounts) = primary_without_handlers(&mut rng); 2804 2805 // Generate previous certificates. 2806 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2807 2808 // Create a valid proposal. 2809 let timestamp = now(); 2810 let proposal = create_test_proposal( 2811 primary.gateway.account(), 2812 primary.ledger.current_committee().unwrap(), 2813 round + 1, 2814 previous_certificates, 2815 timestamp, 2816 1, 2817 &mut rng, 2818 ); 2819 2820 // Store the proposal on the primary. 2821 *primary.proposed_batch.write() = Some(proposal); 2822 2823 // Try to propose a batch will terminate early because the storage is behind the proposal. 2824 assert!(primary.propose_batch().await.is_ok()); 2825 assert!(primary.proposed_batch.read().is_some()); 2826 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round()); 2827 } 2828 2829 #[tokio::test(flavor = "multi_thread")] 2830 async fn test_batch_signature_from_peer() { 2831 let mut rng = TestRng::default(); 2832 let (primary, accounts) = primary_without_handlers(&mut rng); 2833 map_account_addresses(&primary, &accounts); 2834 2835 // Create a valid proposal. 2836 let round = 1; 2837 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2838 let proposal = create_test_proposal( 2839 primary.gateway.account(), 2840 primary.ledger.current_committee().unwrap(), 2841 round, 2842 Default::default(), 2843 timestamp, 2844 1, 2845 &mut rng, 2846 ); 2847 2848 // Store the proposal on the primary. 2849 *primary.proposed_batch.write() = Some(proposal); 2850 2851 // Each committee member signs the batch. 2852 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2853 2854 // Have the primary process the signatures. 2855 for (socket_addr, signature) in signatures { 2856 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap(); 2857 } 2858 2859 // Check the certificate was created and stored by the primary. 2860 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2861 // Check the round was incremented. 2862 assert_eq!(primary.current_round(), round + 1); 2863 } 2864 2865 #[tokio::test(flavor = "multi_thread")] 2866 async fn test_batch_signature_from_peer_in_round() { 2867 let round = 5; 2868 let mut rng = TestRng::default(); 2869 let (primary, accounts) = primary_without_handlers(&mut rng); 2870 map_account_addresses(&primary, &accounts); 2871 2872 // Generate certificates. 2873 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2874 2875 // Create a valid proposal. 2876 let timestamp = now(); 2877 let proposal = create_test_proposal( 2878 primary.gateway.account(), 2879 primary.ledger.current_committee().unwrap(), 2880 round, 2881 previous_certificates, 2882 timestamp, 2883 1, 2884 &mut rng, 2885 ); 2886 2887 // Store the proposal on the primary. 2888 *primary.proposed_batch.write() = Some(proposal); 2889 2890 // Each committee member signs the batch. 2891 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2892 2893 // Have the primary process the signatures. 2894 for (socket_addr, signature) in signatures { 2895 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap(); 2896 } 2897 2898 // Check the certificate was created and stored by the primary. 2899 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2900 // Check the round was incremented. 2901 assert_eq!(primary.current_round(), round + 1); 2902 } 2903 2904 #[tokio::test] 2905 async fn test_batch_signature_from_peer_no_quorum() { 2906 let mut rng = TestRng::default(); 2907 let (primary, accounts) = primary_without_handlers(&mut rng); 2908 map_account_addresses(&primary, &accounts); 2909 2910 // Create a valid proposal. 2911 let round = 1; 2912 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2913 let proposal = create_test_proposal( 2914 primary.gateway.account(), 2915 primary.ledger.current_committee().unwrap(), 2916 round, 2917 Default::default(), 2918 timestamp, 2919 1, 2920 &mut rng, 2921 ); 2922 2923 // Store the proposal on the primary. 2924 *primary.proposed_batch.write() = Some(proposal); 2925 2926 // Each committee member signs the batch. 2927 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2928 2929 // Have the primary process only one signature, mimicking a lack of quorum. 2930 let (socket_addr, signature) = signatures.first().unwrap(); 2931 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap(); 2932 2933 // Check the certificate was not created and stored by the primary. 2934 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2935 // Check the round was incremented. 2936 assert_eq!(primary.current_round(), round); 2937 } 2938 2939 #[tokio::test] 2940 async fn test_batch_signature_from_peer_in_round_no_quorum() { 2941 let round = 7; 2942 let mut rng = TestRng::default(); 2943 let (primary, accounts) = primary_without_handlers(&mut rng); 2944 map_account_addresses(&primary, &accounts); 2945 2946 // Generate certificates. 2947 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2948 2949 // Create a valid proposal. 2950 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2951 let proposal = create_test_proposal( 2952 primary.gateway.account(), 2953 primary.ledger.current_committee().unwrap(), 2954 round, 2955 previous_certificates, 2956 timestamp, 2957 1, 2958 &mut rng, 2959 ); 2960 2961 // Store the proposal on the primary. 2962 *primary.proposed_batch.write() = Some(proposal); 2963 2964 // Each committee member signs the batch. 2965 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2966 2967 // Have the primary process only one signature, mimicking a lack of quorum. 2968 let (socket_addr, signature) = signatures.first().unwrap(); 2969 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap(); 2970 2971 // Check the certificate was not created and stored by the primary. 2972 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2973 // Check the round was incremented. 2974 assert_eq!(primary.current_round(), round); 2975 } 2976 2977 #[tokio::test] 2978 async fn test_insert_certificate_with_aborted_transmissions() { 2979 let round = 3; 2980 let prev_round = round - 1; 2981 let mut rng = TestRng::default(); 2982 let (primary, accounts) = primary_without_handlers(&mut rng); 2983 let peer_account = &accounts[1]; 2984 let peer_ip = peer_account.0; 2985 2986 // Fill primary storage. 2987 store_certificate_chain(&primary, &accounts, round, &mut rng); 2988 2989 // Get transmissions from previous certificates. 2990 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round); 2991 2992 // Generate a solution and a transaction. 2993 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); 2994 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2995 2996 // Store it on one of the workers. 2997 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); 2998 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2999 3000 // Check that the worker has 2 transmissions. 3001 assert_eq!(primary.workers[0].num_transmissions(), 2); 3002 3003 // Create certificates for the current round. 3004 let account = accounts[0].1.clone(); 3005 let (certificate, transmissions) = 3006 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng); 3007 let certificate_id = certificate.id(); 3008 3009 // Randomly abort some of the transmissions. 3010 let mut aborted_transmissions = HashSet::new(); 3011 let mut transmissions_without_aborted = HashMap::new(); 3012 for (transmission_id, transmission) in transmissions.clone() { 3013 match rng.r#gen::<bool>() || aborted_transmissions.is_empty() { 3014 true => { 3015 // Insert the aborted transmission. 3016 aborted_transmissions.insert(transmission_id); 3017 } 3018 false => { 3019 // Insert the transmission without the aborted transmission. 3020 transmissions_without_aborted.insert(transmission_id, transmission); 3021 } 3022 }; 3023 } 3024 3025 // Add the non-aborted transmissions to the worker. 3026 for (transmission_id, transmission) in transmissions_without_aborted.iter() { 3027 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 3028 } 3029 3030 // Check that inserting the transmission with missing transmissions fails. 3031 assert!(primary 3032 .storage 3033 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default()) 3034 .is_err()); 3035 assert!(primary 3036 .storage 3037 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default()) 3038 .is_err()); 3039 3040 // Insert the certificate to storage. 3041 primary 3042 .storage 3043 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone()) 3044 .unwrap(); 3045 3046 // Ensure the certificate exists in storage. 3047 assert!(primary.storage.contains_certificate(certificate_id)); 3048 // Ensure that the aborted transmission IDs exist in storage. 3049 for aborted_transmission_id in aborted_transmissions { 3050 assert!(primary.storage.contains_transmission(aborted_transmission_id)); 3051 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none()); 3052 } 3053 } 3054 }