primary.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{ 17 Gateway, 18 MAX_BATCH_DELAY_IN_MS, 19 MAX_WORKERS, 20 MIN_BATCH_DELAY_IN_SECS, 21 PRIMARY_PING_IN_MS, 22 Sync, 23 Transport, 24 WORKER_PING_IN_MS, 25 Worker, 26 events::{BatchPropose, BatchSignature, Event}, 27 helpers::{ 28 BFTSender, 29 PrimaryReceiver, 30 PrimarySender, 31 Proposal, 32 ProposalCache, 33 SignedProposals, 34 Storage, 35 assign_to_worker, 36 assign_to_workers, 37 fmt_id, 38 init_sync_channels, 39 init_worker_channels, 40 now, 41 }, 42 spawn_blocking, 43 }; 44 use alphaos_account::Account; 45 use alphaos_node_bft_events::PrimaryPing; 46 use alphaos_node_bft_ledger_service::LedgerService; 47 use alphaos_node_network::PeerPoolHandling; 48 use alphaos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping}; 49 use alphavm::{ 50 console::{ 51 prelude::*, 52 types::{Address, Field}, 53 }, 54 ledger::{ 55 block::Transaction, 56 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID}, 57 puzzle::{Solution, SolutionID}, 58 }, 59 prelude::{ConsensusVersion, committee::Committee}, 60 utilities::flatten_error, 61 }; 62 63 use alpha_std::StorageMode; 64 use anyhow::Context; 65 use colored::Colorize; 66 use futures::stream::{FuturesUnordered, StreamExt}; 67 use indexmap::{IndexMap, IndexSet}; 68 #[cfg(feature = "locktick")] 69 use locktick::{ 70 parking_lot::{Mutex, RwLock}, 71 tokio::Mutex as TMutex, 72 }; 73 #[cfg(not(feature = "locktick"))] 74 use parking_lot::{Mutex, RwLock}; 75 #[cfg(not(feature = "serial"))] 76 use rayon::prelude::*; 77 use std::{ 78 collections::{HashMap, HashSet}, 79 future::Future, 80 net::SocketAddr, 81 sync::Arc, 82 time::Duration, 83 }; 84 #[cfg(not(feature = "locktick"))] 85 use tokio::sync::Mutex as TMutex; 86 use tokio::{sync::OnceCell, task::JoinHandle}; 87 88 /// A helper type for an optional proposed batch. 89 pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>; 90 91 /// The primary logic of a node. 92 /// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2). 93 #[derive(Clone)] 94 pub struct Primary<N: Network> { 95 /// The sync module enables fetching data from other validators. 96 sync: Sync<N>, 97 /// The gateway allows talking to other nodes in the validator set. 98 gateway: Gateway<N>, 99 /// The storage. 100 storage: Storage<N>, 101 /// The ledger service. 102 ledger: Arc<dyn LedgerService<N>>, 103 /// The workers. 104 workers: Arc<[Worker<N>]>, 105 /// The BFT sender. 106 bft_sender: Arc<OnceCell<BFTSender<N>>>, 107 /// The batch proposal, if the primary is currently proposing a batch. 108 proposed_batch: Arc<ProposedBatch<N>>, 109 /// The timestamp of the most recent proposed batch. 110 latest_proposed_batch_timestamp: Arc<RwLock<i64>>, 111 /// The recently-signed batch proposals. 112 signed_proposals: Arc<RwLock<SignedProposals<N>>>, 113 /// The handles for all background tasks spawned by this primary. 114 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 115 /// The lock for propose_batch. 116 propose_lock: Arc<TMutex<u64>>, 117 /// The storage mode of the node. 118 storage_mode: StorageMode, 119 } 120 121 impl<N: Network> Primary<N> { 122 /// The maximum number of unconfirmed transmissions to send to the primary. 123 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2; 124 125 /// Initializes a new primary instance. 126 #[allow(clippy::too_many_arguments)] 127 pub fn new( 128 account: Account<N>, 129 storage: Storage<N>, 130 ledger: Arc<dyn LedgerService<N>>, 131 block_sync: Arc<BlockSync<N>>, 132 ip: Option<SocketAddr>, 133 trusted_validators: &[SocketAddr], 134 trusted_peers_only: bool, 135 storage_mode: StorageMode, 136 dev: Option<u16>, 137 ) -> Result<Self> { 138 // Initialize the gateway. 139 let gateway = Gateway::new( 140 account, 141 storage.clone(), 142 ledger.clone(), 143 ip, 144 trusted_validators, 145 trusted_peers_only, 146 storage_mode.clone(), 147 dev, 148 )?; 149 // Initialize the sync module. 150 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync); 151 152 // Initialize the primary instance. 153 Ok(Self { 154 sync, 155 gateway, 156 storage, 157 ledger, 158 workers: Arc::from(vec![]), 159 bft_sender: Default::default(), 160 proposed_batch: Default::default(), 161 latest_proposed_batch_timestamp: Default::default(), 162 signed_proposals: Default::default(), 163 handles: Default::default(), 164 propose_lock: Default::default(), 165 storage_mode, 166 }) 167 } 168 169 /// Load the proposal cache file and update the Primary state with the stored data. 170 async fn load_proposal_cache(&self) -> Result<()> { 171 // Fetch the signed proposals from the file system if it exists. 172 match ProposalCache::<N>::exists(&self.storage_mode) { 173 // If the proposal cache exists, then process the proposal cache. 174 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) { 175 Ok(proposal_cache) => { 176 // Extract the proposal and signed proposals. 177 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) = 178 proposal_cache.into(); 179 180 // Write the proposed batch. 181 *self.proposed_batch.write() = proposed_batch; 182 // Write the signed proposals. 183 *self.signed_proposals.write() = signed_proposals; 184 // Writ the propose lock. 185 *self.propose_lock.lock().await = latest_certificate_round; 186 187 // Update the storage with the pending certificates. 188 for certificate in pending_certificates { 189 let batch_id = certificate.batch_id(); 190 // We use a dummy IP because the node should not need to request from any peers. 191 // The storage should have stored all the transmissions. If not, we simply 192 // skip the certificate. 193 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await 194 { 195 let err = err.context(format!( 196 "Failed to load stored certificate {} from proposal cache", 197 fmt_id(batch_id) 198 )); 199 warn!("{}", &flatten_error(err)); 200 } 201 } 202 Ok(()) 203 } 204 Err(err) => Err(err.context("Failed to read the signed proposals from the file system")), 205 }, 206 // If the proposal cache does not exist, then return early. 207 false => Ok(()), 208 } 209 } 210 211 /// Run the primary instance. 212 pub async fn run( 213 &mut self, 214 ping: Option<Arc<Ping<N>>>, 215 bft_sender: Option<BFTSender<N>>, 216 primary_sender: PrimarySender<N>, 217 primary_receiver: PrimaryReceiver<N>, 218 ) -> Result<()> { 219 info!("Starting the primary instance of the memory pool..."); 220 221 // Set the BFT sender. 222 if let Some(bft_sender) = &bft_sender { 223 // Set the BFT sender in the primary. 224 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set"); 225 } 226 227 // Construct a map of the worker senders. 228 let mut worker_senders = IndexMap::new(); 229 // Construct a map for the workers. 230 let mut workers = Vec::new(); 231 // Initialize the workers. 232 for id in 0..MAX_WORKERS { 233 // Construct the worker channels. 234 let (tx_worker, rx_worker) = init_worker_channels(); 235 // Construct the worker instance. 236 let worker = Worker::new( 237 id, 238 Arc::new(self.gateway.clone()), 239 self.storage.clone(), 240 self.ledger.clone(), 241 self.proposed_batch.clone(), 242 )?; 243 // Run the worker instance. 244 worker.run(rx_worker); 245 // Add the worker to the list of workers. 246 workers.push(worker); 247 // Add the worker sender to the map. 248 worker_senders.insert(id, tx_worker); 249 } 250 // Set the workers. 251 self.workers = Arc::from(workers); 252 253 // First, initialize the sync channels. 254 let (sync_sender, sync_receiver) = init_sync_channels(); 255 // Next, initialize the sync module and sync the storage from ledger. 256 self.sync.initialize(bft_sender).await?; 257 // Next, load and process the proposal cache before running the sync module. 258 self.load_proposal_cache().await?; 259 // Next, run the sync module. 260 self.sync.run(ping, sync_receiver).await?; 261 // Next, initialize the gateway. 262 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await; 263 // Lastly, start the primary handlers. 264 // Note: This ensures the primary does not start communicating before syncing is complete. 265 self.start_handlers(primary_receiver); 266 267 Ok(()) 268 } 269 270 /// Returns the current round. 271 pub fn current_round(&self) -> u64 { 272 self.storage.current_round() 273 } 274 275 /// Returns `true` if the primary is synced. 276 pub fn is_synced(&self) -> bool { 277 self.sync.is_synced() 278 } 279 280 /// Returns the gateway. 281 pub const fn gateway(&self) -> &Gateway<N> { 282 &self.gateway 283 } 284 285 /// Returns the storage. 286 pub const fn storage(&self) -> &Storage<N> { 287 &self.storage 288 } 289 290 /// Returns the ledger. 291 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> { 292 &self.ledger 293 } 294 295 /// Returns the number of workers. 296 pub fn num_workers(&self) -> u8 { 297 u8::try_from(self.workers.len()).expect("Too many workers") 298 } 299 300 /// Returns the workers. 301 pub const fn workers(&self) -> &Arc<[Worker<N>]> { 302 &self.workers 303 } 304 305 /// Returns the batch proposal of our primary, if one currently exists. 306 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> { 307 &self.proposed_batch 308 } 309 } 310 311 impl<N: Network> Primary<N> { 312 /// Returns the number of unconfirmed transmissions. 313 pub fn num_unconfirmed_transmissions(&self) -> usize { 314 self.workers.iter().map(|worker| worker.num_transmissions()).sum() 315 } 316 317 /// Returns the number of unconfirmed ratifications. 318 pub fn num_unconfirmed_ratifications(&self) -> usize { 319 self.workers.iter().map(|worker| worker.num_ratifications()).sum() 320 } 321 322 /// Returns the number of unconfirmed solutions. 323 pub fn num_unconfirmed_solutions(&self) -> usize { 324 self.workers.iter().map(|worker| worker.num_solutions()).sum() 325 } 326 327 /// Returns the number of unconfirmed transactions. 328 pub fn num_unconfirmed_transactions(&self) -> usize { 329 self.workers.iter().map(|worker| worker.num_transactions()).sum() 330 } 331 } 332 333 impl<N: Network> Primary<N> { 334 /// Returns the worker transmission IDs. 335 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> { 336 self.workers.iter().flat_map(|worker| worker.transmission_ids()) 337 } 338 339 /// Returns the worker transmissions. 340 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> { 341 self.workers.iter().flat_map(|worker| worker.transmissions()) 342 } 343 344 /// Returns the worker solutions. 345 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> { 346 self.workers.iter().flat_map(|worker| worker.solutions()) 347 } 348 349 /// Returns the worker transactions. 350 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> { 351 self.workers.iter().flat_map(|worker| worker.transactions()) 352 } 353 } 354 355 impl<N: Network> Primary<N> { 356 /// Clears the worker solutions. 357 pub fn clear_worker_solutions(&self) { 358 self.workers.iter().for_each(Worker::clear_solutions); 359 } 360 } 361 362 impl<N: Network> Primary<N> { 363 /// Proposes the batch for the current round. 364 /// 365 /// This method performs the following steps: 366 /// 1. Drain the workers. 367 /// 2. Sign the batch. 368 /// 3. Set the batch proposal in the primary. 369 /// 4. Broadcast the batch header to all validators for signing. 370 pub async fn propose_batch(&self) -> Result<()> { 371 // This function isn't re-entrant. 372 let mut lock_guard = self.propose_lock.lock().await; 373 374 // Check if the proposed batch has expired, and clear it if it has expired. 375 if let Err(err) = self 376 .check_proposed_batch_for_expiration() 377 .await 378 .with_context(|| "Failed to check the proposed batch for expiration") 379 { 380 warn!("{}", flatten_error(&err)); 381 return Ok(()); 382 } 383 384 // Retrieve the current round. 385 let round = self.current_round(); 386 // Compute the previous round. 387 let previous_round = round.saturating_sub(1); 388 389 // If the current round is 0, return early. 390 // This can actually never happen, because of the invariant that the current round is never 0 391 // (see [`StorageInner::current_round`]). 392 ensure!(round > 0, "Round 0 cannot have transaction batches"); 393 394 // If the current storage round is below the latest proposal round, then return early. 395 if round < *lock_guard { 396 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard); 397 return Ok(()); 398 } 399 400 // If there is a batch being proposed already, 401 // rebroadcast the batch header to the non-signers, and return early. 402 if let Some(proposal) = self.proposed_batch.read().as_ref() { 403 // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this. 404 if round < proposal.round() 405 || proposal 406 .batch_header() 407 .previous_certificate_ids() 408 .iter() 409 .any(|id| !self.storage.contains_certificate(*id)) 410 { 411 warn!( 412 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.", 413 proposal.round(), 414 ); 415 return Ok(()); 416 } 417 // Construct the event. 418 // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers. 419 let event = Event::BatchPropose(proposal.batch_header().clone().into()); 420 // Iterate through the non-signers. 421 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) { 422 // Resolve the address to the peer IP. 423 match self.gateway.resolver().read().get_peer_ip_for_address(address) { 424 // Resend the batch proposal to the validator for signing. 425 Some(peer_ip) => { 426 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round()); 427 tokio::spawn(async move { 428 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'"); 429 // Resend the batch proposal to the peer. 430 if gateway.send(peer_ip, event_).await.is_none() { 431 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'"); 432 } 433 }); 434 } 435 None => continue, 436 } 437 } 438 debug!("Proposed batch for round {} is still valid", proposal.round()); 439 return Ok(()); 440 } 441 442 #[cfg(feature = "metrics")] 443 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64); 444 445 // Ensure that the primary does not create a new proposal too quickly. 446 if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) { 447 debug!( 448 "{}", 449 flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}"))) 450 ); 451 return Ok(()); 452 } 453 454 // Ensure the primary has not proposed a batch for this round before. 455 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) { 456 // If a BFT sender was provided, attempt to advance the current round. 457 if let Some(bft_sender) = self.bft_sender.get() { 458 match bft_sender.send_primary_round_to_bft(self.current_round()).await { 459 // 'is_ready' is true if the primary is ready to propose a batch for the next round. 460 Ok(true) => (), // continue, 461 // 'is_ready' is false if the primary is not ready to propose a batch for the next round. 462 Ok(false) => return Ok(()), 463 // An error occurred while attempting to advance the current round. 464 Err(err) => { 465 let err = err.context("Failed to update the BFT to the next round"); 466 warn!("{}", &flatten_error(&err)); 467 return Err(err); 468 } 469 } 470 } 471 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed()); 472 return Ok(()); 473 } 474 475 // Determine if the current round has been proposed. 476 // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is 477 // good for network reliability and should not be prevented for the already existing proposed_batch. 478 // If a certificate already exists for the current round, an attempt should be made to advance the 479 // round as early as possible. 480 if round == *lock_guard { 481 debug!("Primary is safely skipping a batch proposal - round {round} already proposed"); 482 return Ok(()); 483 } 484 485 // Retrieve the committee to check against. 486 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?; 487 // Check if the primary is connected to enough validators to reach quorum threshold. 488 { 489 // Retrieve the connected validator addresses. 490 let mut connected_validators = self.gateway.connected_addresses(); 491 // Append the primary to the set. 492 connected_validators.insert(self.gateway.account().address()); 493 // If quorum threshold is not reached, return early. 494 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) { 495 debug!( 496 "Primary is safely skipping a batch proposal for round {round} {}", 497 "(please connect to more validators)".dimmed() 498 ); 499 trace!("Primary is connected to {} validators", connected_validators.len() - 1); 500 return Ok(()); 501 } 502 } 503 504 // Retrieve the previous certificates. 505 let previous_certificates = self.storage.get_certificates_for_round(previous_round); 506 507 // Check if the batch is ready to be proposed. 508 // Note: The primary starts at round 1, and round 0 contains no certificates, by definition. 509 let mut is_ready = previous_round == 0; 510 // If the previous round is not 0, check if the previous certificates have reached the quorum threshold. 511 if previous_round > 0 { 512 // Retrieve the committee lookback for the round. 513 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else { 514 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet") 515 }; 516 // Construct a set over the authors. 517 let authors = previous_certificates.iter().map(BatchCertificate::author).collect(); 518 // Check if the previous certificates have reached the quorum threshold. 519 if previous_committee_lookback.is_quorum_threshold_reached(&authors) { 520 is_ready = true; 521 } 522 } 523 // If the batch is not ready to be proposed, return early. 524 if !is_ready { 525 debug!( 526 "Primary is safely skipping a batch proposal for round {round} {}", 527 format!("(previous round {previous_round} has not reached quorum)").dimmed() 528 ); 529 return Ok(()); 530 } 531 532 // Initialize the map of transmissions. 533 let mut transmissions: IndexMap<_, _> = Default::default(); 534 // Track the total execution costs of the batch proposal as it is being constructed. 535 let mut proposal_cost = 0u64; 536 // Note: worker draining and transaction inclusion needs to be thought 537 // through carefully when there is more than one worker. The fairness 538 // provided by one worker (FIFO) is no longer guaranteed with multiple workers. 539 debug_assert_eq!(MAX_WORKERS, 1); 540 541 'outer: for worker in self.workers().iter() { 542 let mut num_worker_transmissions = 0usize; 543 544 while let Some((id, transmission)) = worker.remove_front() { 545 // Check the selected transmissions are below the batch limit. 546 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH { 547 // Reinsert the transmission into the worker. 548 worker.insert_front(id, transmission); 549 break 'outer; 550 } 551 552 // Check the max transmissions per worker is not exceeded. 553 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER { 554 // Reinsert the transmission into the worker. 555 worker.insert_front(id, transmission); 556 continue 'outer; 557 } 558 559 // Check if the ledger already contains the transmission. 560 if self.ledger.contains_transmission(&id).unwrap_or(true) { 561 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); 562 continue; 563 } 564 565 // Check if the storage already contain the transmission. 566 // Note: We do not skip if this is the first transmission in the proposal, to ensure that 567 // the primary does not propose a batch with no transmissions. 568 if !transmissions.is_empty() && self.storage.contains_transmission(id) { 569 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); 570 continue; 571 } 572 573 // Check the transmission is still valid. 574 match (id, transmission.clone()) { 575 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => { 576 // Ensure the checksum matches. If not, skip the solution. 577 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum) 578 { 579 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id)); 580 continue; 581 } 582 // Check if the solution is still valid. 583 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { 584 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); 585 continue; 586 } 587 } 588 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => { 589 // Ensure the checksum matches. If not, skip the transaction. 590 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum ) 591 { 592 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id)); 593 continue; 594 } 595 596 // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error. 597 let transaction = spawn_blocking!({ 598 match transaction { 599 Data::Object(transaction) => Ok(transaction), 600 Data::Buffer(bytes) => { 601 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?) 602 } 603 } 604 })?; 605 606 // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached. 607 // ConsensusVersion V8 Migration logic - 608 // Do not include deployments in a batch proposal. 609 let current_block_height = self.ledger.latest_block_height(); 610 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?; 611 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?; 612 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?; 613 if current_block_height > consensus_version_v7_height 614 && current_block_height <= consensus_version_v8_height 615 && transaction.is_deploy() 616 { 617 trace!( 618 "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})", 619 fmt_id(transaction_id) 620 ); 621 continue; 622 } 623 624 // Compute the transaction spent cost (in microcredits). 625 // Note: We purposefully discard this transaction if we are unable to compute the spent cost. 626 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version) 627 else { 628 debug!( 629 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost", 630 fmt_id(transaction_id) 631 ); 632 continue; 633 }; 634 635 // Check if the transaction is still valid. 636 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { 637 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); 638 continue; 639 } 640 641 // Compute the next proposal cost. 642 // Note: We purposefully discard this transaction if the proposal cost overflows. 643 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else { 644 debug!( 645 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed", 646 fmt_id(transaction_id) 647 ); 648 continue; 649 }; 650 651 // Check if the next proposal cost exceeds the batch proposal spend limit. 652 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height); 653 if next_proposal_cost > batch_spend_limit { 654 debug!( 655 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})", 656 fmt_id(transaction_id), 657 batch_spend_limit 658 ); 659 660 // Reinsert the transmission into the worker. 661 worker.insert_front(id, transmission); 662 break 'outer; 663 } 664 665 // Update the proposal cost. 666 proposal_cost = next_proposal_cost; 667 } 668 669 // Note: We explicitly forbid including ratifications, 670 // as the protocol currently does not support ratifications. 671 (TransmissionID::Ratification, Transmission::Ratification) => continue, 672 // All other combinations are clearly invalid. 673 _ => continue, 674 } 675 676 // If the transmission is valid, insert it into the proposal's transmission list. 677 transmissions.insert(id, transmission); 678 num_worker_transmissions = num_worker_transmissions.saturating_add(1); 679 } 680 } 681 682 // Determine the current timestamp. 683 let current_timestamp = now(); 684 685 *lock_guard = round; 686 687 /* Proceeding to sign & propose the batch. */ 688 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len()); 689 690 // Retrieve the private key. 691 let private_key = *self.gateway.account().private_key(); 692 // Retrieve the committee ID. 693 let committee_id = committee_lookback.id(); 694 // Prepare the transmission IDs. 695 let transmission_ids = transmissions.keys().copied().collect(); 696 // Prepare the previous batch certificate IDs. 697 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect(); 698 // Sign the batch header and construct the proposal. 699 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new( 700 &private_key, 701 round, 702 current_timestamp, 703 committee_id, 704 transmission_ids, 705 previous_certificate_ids, 706 &mut rand::thread_rng() 707 )) 708 .and_then(|batch_header| { 709 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone()) 710 .map(|proposal| (batch_header, proposal)) 711 }) 712 .inspect_err(|_| { 713 // On error, reinsert the transmissions and then propagate the error. 714 if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) { 715 error!("{}", flatten_error(err.context("Failed to reinsert transmissions"))); 716 } 717 })?; 718 // Broadcast the batch to all validators for signing. 719 self.gateway.broadcast(Event::BatchPropose(batch_header.into())); 720 // Set the timestamp of the latest proposed batch. 721 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp(); 722 // Set the proposed batch. 723 *self.proposed_batch.write() = Some(proposal); 724 Ok(()) 725 } 726 727 /// Processes a batch propose from a peer. 728 /// 729 /// This method performs the following steps: 730 /// 1. Verify the batch. 731 /// 2. Sign the batch. 732 /// 3. Broadcast the signature back to the validator. 733 /// 734 /// If our primary is ahead of the peer, we will not sign the batch. 735 /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch. 736 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> { 737 let BatchPropose { round: batch_round, batch_header } = batch_propose; 738 739 // Deserialize the batch header. 740 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?; 741 // Ensure the round matches in the batch header. 742 if batch_round != batch_header.round() { 743 // Proceed to disconnect the validator. 744 self.gateway.disconnect(peer_ip); 745 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round()); 746 } 747 748 // Retrieve the batch author. 749 let batch_author = batch_header.author(); 750 751 // Ensure the batch proposal is from the validator. 752 match self.gateway.resolve_to_aleo_addr(peer_ip) { 753 // If the peer is a validator, then ensure the batch proposal is from the validator. 754 Some(address) => { 755 if address != batch_author { 756 // Proceed to disconnect the validator. 757 self.gateway.disconnect(peer_ip); 758 bail!("Malicious peer - proposed batch from a different validator ({batch_author})"); 759 } 760 } 761 None => bail!("Batch proposal from a disconnected validator"), 762 } 763 // Ensure the batch author is a current committee member. 764 if !self.gateway.is_authorized_validator_address(batch_author) { 765 // Proceed to disconnect the validator. 766 self.gateway.disconnect(peer_ip); 767 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})"); 768 } 769 // Ensure the batch proposal is not from the current primary. 770 if self.gateway.account().address() == batch_author { 771 bail!("Invalid peer - proposed batch from myself ({batch_author})"); 772 } 773 774 // Ensure that the batch proposal's committee ID matches the expected committee ID. 775 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id(); 776 if expected_committee_id != batch_header.committee_id() { 777 // Proceed to disconnect the validator. 778 self.gateway.disconnect(peer_ip); 779 bail!( 780 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})", 781 batch_header.committee_id() 782 ); 783 } 784 785 // Retrieve the cached round and batch ID for this validator. 786 if let Some((signed_round, signed_batch_id, signature)) = 787 self.signed_proposals.read().get(&batch_author).copied() 788 { 789 // If the signed round is ahead of the peer's batch round, do not sign the proposal. 790 // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it. 791 if signed_round > batch_header.round() { 792 bail!( 793 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}", 794 batch_header.round() 795 ); 796 } 797 798 // If the round matches and the batch ID differs, then the validator is malicious. 799 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() { 800 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})"); 801 } 802 // If the round and batch ID matches, then skip signing the batch a second time. 803 // Instead, rebroadcast the cached signature to the peer. 804 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() { 805 let gateway = self.gateway.clone(); 806 tokio::spawn(async move { 807 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'"); 808 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature)); 809 // Resend the batch signature to the peer. 810 if gateway.send(peer_ip, event).await.is_none() { 811 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'"); 812 } 813 }); 814 // Return early. 815 return Ok(()); 816 } 817 } 818 819 // Ensure that the batch header doesn't already exist in storage. 820 // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task. 821 if self.storage.contains_batch(batch_header.batch_id()) { 822 debug!( 823 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}", 824 format!("batch for round {batch_round} already exists in storage").dimmed() 825 ); 826 return Ok(()); 827 } 828 829 // Compute the previous round. 830 let previous_round = batch_round.saturating_sub(1); 831 // Ensure that the peer did not propose a batch too quickly. 832 if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) { 833 // Proceed to disconnect the validator. 834 self.gateway.disconnect(peer_ip); 835 return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'"))); 836 } 837 838 // Ensure the batch header does not contain any ratifications. 839 if batch_header.contains(TransmissionID::Ratification) { 840 // Proceed to disconnect the validator. 841 self.gateway.disconnect(peer_ip); 842 bail!( 843 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'", 844 ); 845 } 846 847 // If the peer is ahead, use the batch header to sync up to the peer. 848 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?; 849 850 // Check that the transmission ids match and are not fee transactions. 851 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| { 852 // If the transmission is not well-formed, then return early. 853 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission) 854 }) { 855 let err = err.context(format!( 856 "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission" 857 )); 858 debug!("{}", flatten_error(err)); 859 return Ok(()); 860 } 861 862 // Ensure the batch is for the current round. 863 // This method must be called after fetching previous certificates (above), 864 // and prior to checking the batch header (below). 865 if let Err(e) = self.ensure_is_signing_round(batch_round) { 866 // If the primary is not signing for the peer's round, then return early. 867 debug!("{e} from '{peer_ip}'"); 868 return Ok(()); 869 } 870 871 // Ensure the batch header from the peer is valid. 872 let (storage, header) = (self.storage.clone(), batch_header.clone()); 873 874 // Check the batch header, and return early if it already exists in storage. 875 let Some(missing_transmissions) = 876 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))? 877 else { 878 return Ok(()); 879 }; 880 881 // Inserts the missing transmissions into the workers. 882 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?; 883 884 // Ensure the transaction doesn't bring the proposal above the spend limit. 885 let block_height = self.ledger.latest_block_height() + 1; 886 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 { 887 let mut proposal_cost = 0u64; 888 for transmission_id in batch_header.transmission_ids() { 889 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?; 890 let Some(worker) = self.workers.get(worker_id as usize) else { 891 debug!("Unable to find worker {worker_id}"); 892 return Ok(()); 893 }; 894 895 let Some(transmission) = worker.get_transmission(*transmission_id) else { 896 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id)); 897 return Ok(()); 898 }; 899 900 // If the transmission is a transaction, compute its execution cost. 901 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) = 902 (transmission_id, transmission) 903 { 904 // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error. 905 let transaction = spawn_blocking!({ 906 match transaction { 907 Data::Object(transaction) => Ok(transaction), 908 Data::Buffer(bytes) => { 909 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?) 910 } 911 } 912 })?; 913 914 // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached. 915 // ConsensusVersion V8 Migration logic - 916 // Do not include deployments in a batch proposal. 917 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?; 918 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?; 919 let consensus_version = N::CONSENSUS_VERSION(block_height)?; 920 if block_height > consensus_version_v7_height 921 && block_height <= consensus_version_v8_height 922 && transaction.is_deploy() 923 { 924 bail!( 925 "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})", 926 ) 927 } 928 929 // Compute the transaction spent cost (in microcredits). 930 // Note: We purposefully discard this transaction if we are unable to compute the spent cost. 931 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version) 932 else { 933 bail!( 934 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'", 935 fmt_id(transaction_id) 936 ) 937 }; 938 939 // Compute the next proposal cost. 940 // Note: We purposefully discard this transaction if the proposal cost overflows. 941 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else { 942 bail!( 943 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'", 944 fmt_id(transaction_id) 945 ) 946 }; 947 948 // Check if the next proposal cost exceeds the batch proposal spend limit. 949 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height); 950 if next_proposal_cost > batch_spend_limit { 951 bail!( 952 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})", 953 fmt_id(transaction_id), 954 batch_spend_limit 955 ); 956 } 957 958 // Update the proposal cost. 959 proposal_cost = next_proposal_cost; 960 } 961 } 962 } 963 964 /* Proceeding to sign the batch. */ 965 966 // Retrieve the batch ID. 967 let batch_id = batch_header.batch_id(); 968 // Sign the batch ID. 969 let account = self.gateway.account().clone(); 970 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?; 971 972 // Ensure the proposal has not already been signed. 973 // 974 // Note: Due to the need to sync the batch header with the peer, it is possible 975 // for the primary to receive the same 'BatchPropose' event again, whereby only 976 // one instance of this handler should sign the batch. This check guarantees this. 977 match self.signed_proposals.write().0.entry(batch_author) { 978 std::collections::hash_map::Entry::Occupied(mut entry) => { 979 // If the validator has already signed a batch for this round, then return early, 980 // since, if the peer still has not received the signature, they will request it again, 981 // and the logic at the start of this function will resend the (now cached) signature 982 // to the peer if asked to sign this batch proposal again. 983 if entry.get().0 == batch_round { 984 return Ok(()); 985 } 986 // Otherwise, cache the round, batch ID, and signature for this validator. 987 entry.insert((batch_round, batch_id, signature)); 988 } 989 // If the validator has not signed a batch before, then continue. 990 std::collections::hash_map::Entry::Vacant(entry) => { 991 // Cache the round, batch ID, and signature for this validator. 992 entry.insert((batch_round, batch_id, signature)); 993 } 994 }; 995 996 // Broadcast the signature back to the validator. 997 let self_ = self.clone(); 998 tokio::spawn(async move { 999 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature)); 1000 // Send the batch signature to the peer. 1001 if self_.gateway.send(peer_ip, event).await.is_some() { 1002 debug!("Signed a batch for round {batch_round} from '{peer_ip}'"); 1003 } 1004 }); 1005 1006 Ok(()) 1007 } 1008 1009 /// Processes a batch signature from a peer. 1010 /// 1011 /// This method performs the following steps: 1012 /// 1. Ensure the proposed batch has not expired. 1013 /// 2. Verify the signature, ensuring it corresponds to the proposed batch. 1014 /// 3. Store the signature. 1015 /// 4. Certify the batch if enough signatures have been received. 1016 /// 5. Broadcast the batch certificate to all validators. 1017 async fn process_batch_signature_from_peer( 1018 &self, 1019 peer_ip: SocketAddr, 1020 batch_signature: BatchSignature<N>, 1021 ) -> Result<()> { 1022 // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired. 1023 self.check_proposed_batch_for_expiration().await?; 1024 1025 // Retrieve the signature and timestamp. 1026 let BatchSignature { batch_id, signature } = batch_signature; 1027 1028 // Retrieve the signer. 1029 let signer = signature.to_address(); 1030 1031 // Ensure the batch signature is signed by the validator. 1032 if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) { 1033 // Proceed to disconnect the validator. 1034 self.gateway.disconnect(peer_ip); 1035 bail!("Malicious peer - batch signature is from a different validator ({signer})"); 1036 } 1037 // Ensure the batch signature is not from the current primary. 1038 if self.gateway.account().address() == signer { 1039 bail!("Invalid peer - received a batch signature from myself ({signer})"); 1040 } 1041 1042 let self_ = self.clone(); 1043 let Some(proposal) = spawn_blocking!({ 1044 // Acquire the write lock. 1045 let mut proposed_batch = self_.proposed_batch.write(); 1046 // Add the signature to the batch, and determine if the batch is ready to be certified. 1047 match proposed_batch.as_mut() { 1048 Some(proposal) => { 1049 // Ensure the batch ID matches the currently proposed batch ID. 1050 if proposal.batch_id() != batch_id { 1051 match self_.storage.contains_batch(batch_id) { 1052 // If this batch was already certified, return early. 1053 true => { 1054 debug!( 1055 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified", 1056 proposal.round() 1057 ); 1058 return Ok(None); 1059 } 1060 // If the batch ID is unknown, return an error. 1061 false => bail!( 1062 "Unknown batch ID '{batch_id}', expected '{}' for round {}", 1063 proposal.batch_id(), 1064 proposal.round() 1065 ), 1066 } 1067 } 1068 // Retrieve the committee lookback for the round. 1069 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?; 1070 // Retrieve the address of the validator. 1071 let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else { 1072 bail!("Signature is from a disconnected validator"); 1073 }; 1074 // Add the signature to the batch. 1075 let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?; 1076 1077 if new_signature { 1078 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round()); 1079 // Check if the batch is ready to be certified. 1080 if !proposal.is_quorum_threshold_reached(&committee_lookback) { 1081 // If the batch is not ready to be certified, return early. 1082 return Ok(None); 1083 } 1084 } else { 1085 debug!( 1086 "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}", 1087 round = proposal.round() 1088 ); 1089 return Ok(None); 1090 } 1091 } 1092 // There is no proposed batch, so return early. 1093 None => return Ok(None), 1094 }; 1095 // Retrieve the batch proposal, clearing the proposed batch. 1096 match proposed_batch.take() { 1097 Some(proposal) => Ok(Some(proposal)), 1098 None => Ok(None), 1099 } 1100 })? 1101 else { 1102 return Ok(()); 1103 }; 1104 1105 /* Proceeding to certify the batch. */ 1106 1107 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round()); 1108 1109 // Retrieve the committee lookback for the round. 1110 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?; 1111 // Store the certified batch and broadcast it to all validators. 1112 // If there was an error storing the certificate, reinsert the transmissions back into the ready queue. 1113 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await { 1114 // Reinsert the transmissions back into the ready queue for the next proposal. 1115 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?; 1116 return Err(e); 1117 } 1118 1119 #[cfg(feature = "metrics")] 1120 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0); 1121 Ok(()) 1122 } 1123 1124 /// Processes a batch certificate from a peer. 1125 /// 1126 /// This method performs the following steps: 1127 /// 1. Stores the given batch certificate, after ensuring it is valid. 1128 /// 2. If there are enough certificates to reach quorum threshold for the current round, 1129 /// then proceed to advance to the next round. 1130 async fn process_batch_certificate_from_peer( 1131 &self, 1132 peer_ip: SocketAddr, 1133 certificate: BatchCertificate<N>, 1134 ) -> Result<()> { 1135 // Ensure the batch certificate is from an authorized validator. 1136 if !self.gateway.is_authorized_validator_ip(peer_ip) { 1137 // Proceed to disconnect the validator. 1138 self.gateway.disconnect(peer_ip); 1139 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})"); 1140 } 1141 // Ensure storage does not already contain the certificate. 1142 if self.storage.contains_certificate(certificate.id()) { 1143 return Ok(()); 1144 // Otherwise, ensure ephemeral storage contains the certificate. 1145 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) { 1146 self.storage.insert_unprocessed_certificate(certificate.clone())?; 1147 } 1148 1149 // Retrieve the batch certificate author. 1150 let author = certificate.author(); 1151 // Retrieve the batch certificate round. 1152 let certificate_round = certificate.round(); 1153 // Retrieve the batch certificate committee ID. 1154 let committee_id = certificate.committee_id(); 1155 1156 // Ensure the batch certificate is not from the current primary. 1157 if self.gateway.account().address() == author { 1158 bail!("Received a batch certificate for myself ({author})"); 1159 } 1160 1161 // Ensure that the incoming certificate is valid. 1162 self.storage.check_incoming_certificate(&certificate)?; 1163 1164 // Store the certificate, after ensuring it is valid above. 1165 // The following call recursively fetches and stores 1166 // the previous certificates referenced from this certificate. 1167 // It is critical to make the following call this after validating the certificate above. 1168 // The reason is that a sequence of malformed certificates, 1169 // with references to previous certificates with non-decreasing rounds, 1170 // cause the recursive fetching of certificates to crash the validator due to resource exhaustion. 1171 // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG 1172 // (i.e. that all the referenced previous certificates are in the DAG before storing this one), 1173 // then all the validity checks in [`Storage::check_certificate`] should be redundant. 1174 // TODO: eliminate those redundant checks 1175 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?; 1176 1177 // If there are enough certificates to reach quorum threshold for the certificate round, 1178 // then proceed to advance to the next round. 1179 1180 // Retrieve the committee lookback. 1181 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?; 1182 1183 // Retrieve the certificate authors. 1184 let authors = self.storage.get_certificate_authors_for_round(certificate_round); 1185 // Check if the certificates have reached the quorum threshold. 1186 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors); 1187 1188 // Ensure that the batch certificate's committee ID matches the expected committee ID. 1189 let expected_committee_id = committee_lookback.id(); 1190 if expected_committee_id != committee_id { 1191 // Proceed to disconnect the validator. 1192 self.gateway.disconnect(peer_ip); 1193 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})"); 1194 } 1195 1196 // Determine if we are currently proposing a round that is relevant. 1197 // Note: This is important, because while our peers have advanced, 1198 // they may not be proposing yet, and thus still able to sign our proposed batch. 1199 let should_advance = match &*self.proposed_batch.read() { 1200 // We advance if the proposal round is less than the current round that was just certified. 1201 Some(proposal) => proposal.round() < certificate_round, 1202 // If there's no proposal, we consider advancing. 1203 None => true, 1204 }; 1205 1206 // Retrieve the current round. 1207 let current_round = self.current_round(); 1208 1209 // Determine whether to advance to the next round. 1210 if is_quorum && should_advance && certificate_round >= current_round { 1211 // If we have reached the quorum threshold and the round should advance, then proceed to the next round. 1212 self.try_increment_to_the_next_round(current_round + 1).await?; 1213 } 1214 Ok(()) 1215 } 1216 } 1217 1218 impl<N: Network> Primary<N> { 1219 /// Starts the primary handlers. 1220 /// 1221 /// For each receiver in the `primary_receiver` struct, there will be a dedicated task 1222 /// that awaits new data and handles it accordingly. 1223 /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically 1224 /// tries to move the the next round of batches. 1225 /// 1226 /// This function is called exactly once, in `Self::run()`. 1227 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) { 1228 let PrimaryReceiver { 1229 mut rx_batch_propose, 1230 mut rx_batch_signature, 1231 mut rx_batch_certified, 1232 mut rx_primary_ping, 1233 mut rx_unconfirmed_solution, 1234 mut rx_unconfirmed_transaction, 1235 } = primary_receiver; 1236 1237 // Start the primary ping sender. 1238 let self_ = self.clone(); 1239 self.spawn(async move { 1240 loop { 1241 // Sleep briefly. 1242 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await; 1243 1244 // Retrieve the block locators. 1245 let self__ = self_.clone(); 1246 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) { 1247 Ok(block_locators) => block_locators, 1248 Err(e) => { 1249 warn!("Failed to retrieve block locators - {e}"); 1250 continue; 1251 } 1252 }; 1253 1254 // Retrieve the latest certificate of the primary. 1255 let primary_certificate = { 1256 // Retrieve the primary address. 1257 let primary_address = self_.gateway.account().address(); 1258 1259 // Iterate backwards from the latest round to find the primary certificate. 1260 let mut certificate = None; 1261 let mut current_round = self_.current_round(); 1262 while certificate.is_none() { 1263 // If the current round is 0, then break the while loop. 1264 if current_round == 0 { 1265 break; 1266 } 1267 // Retrieve the primary certificates. 1268 if let Some(primary_certificate) = 1269 self_.storage.get_certificate_for_round_with_author(current_round, primary_address) 1270 { 1271 certificate = Some(primary_certificate); 1272 // If the primary certificate was not found, decrement the round. 1273 } else { 1274 current_round = current_round.saturating_sub(1); 1275 } 1276 } 1277 1278 // Determine if the primary certificate was found. 1279 match certificate { 1280 Some(certificate) => certificate, 1281 // Skip this iteration of the loop (do not send a primary ping). 1282 None => continue, 1283 } 1284 }; 1285 1286 // Construct the primary ping. 1287 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate)); 1288 // Broadcast the event. 1289 self_.gateway.broadcast(Event::PrimaryPing(primary_ping)); 1290 } 1291 }); 1292 1293 // Start the primary ping handler. 1294 let self_ = self.clone(); 1295 self.spawn(async move { 1296 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await { 1297 // If the primary is not synced, then do not process the primary ping. 1298 if self_.sync.is_synced() { 1299 trace!("Processing new primary ping from '{peer_ip}'"); 1300 } else { 1301 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1302 continue; 1303 } 1304 1305 // Spawn a task to process the primary certificate. 1306 { 1307 let self_ = self_.clone(); 1308 tokio::spawn(async move { 1309 // Deserialize the primary certificate in the primary ping. 1310 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking()) 1311 else { 1312 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'"); 1313 return; 1314 }; 1315 // Process the primary certificate. 1316 let id = fmt_id(primary_certificate.id()); 1317 let round = primary_certificate.round(); 1318 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await { 1319 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}"); 1320 } 1321 }); 1322 } 1323 } 1324 }); 1325 1326 // Start the worker ping(s). 1327 let self_ = self.clone(); 1328 self.spawn(async move { 1329 loop { 1330 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await; 1331 // If the primary is not synced, then do not broadcast the worker ping(s). 1332 if !self_.sync.is_synced() { 1333 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed()); 1334 continue; 1335 } 1336 // Broadcast the worker ping(s). 1337 for worker in self_.workers.iter() { 1338 worker.broadcast_ping(); 1339 } 1340 } 1341 }); 1342 1343 // Start the batch proposer. 1344 let self_ = self.clone(); 1345 self.spawn(async move { 1346 loop { 1347 // Sleep briefly, but longer than if there were no batch. 1348 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; 1349 let current_round = self_.current_round(); 1350 // If the primary is not synced, then do not propose a batch. 1351 if !self_.sync.is_synced() { 1352 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed()); 1353 continue; 1354 } 1355 // A best-effort attempt to skip the scheduled batch proposal if 1356 // round progression already triggered one. 1357 if self_.propose_lock.try_lock().is_err() { 1358 trace!( 1359 "Skipping batch proposal for round {current_round} {}", 1360 "(node is already proposing)".dimmed() 1361 ); 1362 continue; 1363 }; 1364 // If there is no proposed batch, attempt to propose a batch. 1365 // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path, 1366 // and only one batch needs to be proposed at a time. 1367 if let Err(e) = self_.propose_batch().await { 1368 warn!("Cannot propose a batch - {e}"); 1369 } 1370 } 1371 }); 1372 1373 // Start the proposed batch handler. 1374 let self_ = self.clone(); 1375 self.spawn(async move { 1376 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await { 1377 // If the primary is not synced, then do not sign the batch. 1378 if !self_.sync.is_synced() { 1379 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1380 continue; 1381 } 1382 // Spawn a task to process the proposed batch. 1383 let self_ = self_.clone(); 1384 tokio::spawn(async move { 1385 // Process the batch proposal. 1386 let round = batch_propose.round; 1387 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await { 1388 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}"); 1389 } 1390 }); 1391 } 1392 }); 1393 1394 // Start the batch signature handler. 1395 let self_ = self.clone(); 1396 self.spawn(async move { 1397 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await { 1398 // If the primary is not synced, then do not store the signature. 1399 if !self_.sync.is_synced() { 1400 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1401 continue; 1402 } 1403 // Process the batch signature. 1404 // Note: Do NOT spawn a task around this function call. Processing signatures from peers 1405 // is a critical path, and we should only store the minimum required number of signatures. 1406 // In addition, spawning a task can cause concurrent processing of signatures (even with a lock), 1407 // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe. 1408 let id = fmt_id(batch_signature.batch_id); 1409 if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await { 1410 let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'")); 1411 warn!("{}", flatten_error(err)); 1412 } 1413 } 1414 }); 1415 1416 // Start the certified batch handler. 1417 let self_ = self.clone(); 1418 self.spawn(async move { 1419 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await { 1420 // If the primary is not synced, then do not store the certificate. 1421 if !self_.sync.is_synced() { 1422 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed()); 1423 continue; 1424 } 1425 // Spawn a task to process the batch certificate. 1426 let self_ = self_.clone(); 1427 tokio::spawn(async move { 1428 // Deserialize the batch certificate. 1429 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else { 1430 warn!("Failed to deserialize the batch certificate from '{peer_ip}'"); 1431 return; 1432 }; 1433 // Process the batch certificate. 1434 let id = fmt_id(batch_certificate.id()); 1435 let round = batch_certificate.round(); 1436 if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await { 1437 warn!( 1438 "{}", 1439 flatten_error(err.context(format!( 1440 "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'" 1441 ))) 1442 ); 1443 } 1444 }); 1445 } 1446 }); 1447 1448 // This task periodically tries to move to the next round. 1449 // 1450 // Note: This is necessary to ensure that the primary is not stuck on a previous round 1451 // despite having received enough certificates to advance to the next round. 1452 let self_ = self.clone(); 1453 self.spawn(async move { 1454 loop { 1455 // Sleep briefly. 1456 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; 1457 // If the primary is not synced, then do not increment to the next round. 1458 if !self_.sync.is_synced() { 1459 trace!("Skipping round increment {}", "(node is syncing)".dimmed()); 1460 continue; 1461 } 1462 // Attempt to increment to the next round. 1463 let current_round = self_.current_round(); 1464 let next_round = current_round.saturating_add(1); 1465 // Determine if the quorum threshold is reached for the current round. 1466 let is_quorum_threshold_reached = { 1467 // Retrieve the certificate authors for the current round. 1468 let authors = self_.storage.get_certificate_authors_for_round(current_round); 1469 // If there are no certificates, then skip this check. 1470 if authors.is_empty() { 1471 continue; 1472 } 1473 // Retrieve the committee lookback for the current round. 1474 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else { 1475 warn!("Failed to retrieve the committee lookback for round {current_round}"); 1476 continue; 1477 }; 1478 // Check if the quorum threshold is reached for the current round. 1479 committee_lookback.is_quorum_threshold_reached(&authors) 1480 }; 1481 // Attempt to increment to the next round if the quorum threshold is reached. 1482 if is_quorum_threshold_reached { 1483 debug!("Quorum threshold reached for round {current_round}"); 1484 if let Err(err) = self_.try_increment_to_the_next_round(next_round).await { 1485 warn!("{}", flatten_error(err.context("Failed to increment to the next round"))); 1486 } 1487 } 1488 } 1489 }); 1490 1491 // Start a handler to process new unconfirmed solutions. 1492 let self_ = self.clone(); 1493 self.spawn(async move { 1494 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await { 1495 // Compute the checksum for the solution. 1496 let Ok(checksum) = solution.to_checksum::<N>() else { 1497 error!("Failed to compute the checksum for the unconfirmed solution"); 1498 continue; 1499 }; 1500 // Compute the worker ID. 1501 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else { 1502 error!("Unable to determine the worker ID for the unconfirmed solution"); 1503 continue; 1504 }; 1505 let self_ = self_.clone(); 1506 tokio::spawn(async move { 1507 // Retrieve the worker. 1508 let worker = &self_.workers[worker_id as usize]; 1509 // Process the unconfirmed solution. 1510 let result = worker.process_unconfirmed_solution(solution_id, solution).await; 1511 // Send the result to the callback. 1512 callback.send(result).ok(); 1513 }); 1514 } 1515 }); 1516 1517 // Start a handler to process new unconfirmed transactions. 1518 let self_ = self.clone(); 1519 self.spawn(async move { 1520 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await { 1521 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id)); 1522 // Compute the checksum for the transaction. 1523 let Ok(checksum) = transaction.to_checksum::<N>() else { 1524 error!("Failed to compute the checksum for the unconfirmed transaction"); 1525 continue; 1526 }; 1527 // Compute the worker ID. 1528 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else { 1529 error!("Unable to determine the worker ID for the unconfirmed transaction"); 1530 continue; 1531 }; 1532 let self_ = self_.clone(); 1533 tokio::spawn(async move { 1534 // Retrieve the worker. 1535 let worker = &self_.workers[worker_id as usize]; 1536 // Process the unconfirmed transaction. 1537 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await; 1538 // Send the result to the callback. 1539 callback.send(result).ok(); 1540 }); 1541 } 1542 }); 1543 } 1544 1545 /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired. 1546 async fn check_proposed_batch_for_expiration(&self) -> Result<()> { 1547 // Check if the proposed batch is timed out or stale. 1548 let is_expired = match self.proposed_batch.read().as_ref() { 1549 Some(proposal) => proposal.round() < self.current_round(), 1550 None => false, 1551 }; 1552 // If the batch is expired, clear the proposed batch. 1553 if is_expired { 1554 // Reset the proposed batch. 1555 let proposal = self.proposed_batch.write().take(); 1556 if let Some(proposal) = proposal { 1557 debug!("Cleared expired proposal for round {}", proposal.round()); 1558 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?; 1559 } 1560 } 1561 Ok(()) 1562 } 1563 1564 /// Increments to the next round. 1565 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> { 1566 // If the next round is within GC range, then iterate to the penultimate round. 1567 if self.current_round() + self.storage.max_gc_rounds() >= next_round { 1568 let mut fast_forward_round = self.current_round(); 1569 // Iterate until the penultimate round is reached. 1570 while fast_forward_round < next_round.saturating_sub(1) { 1571 // Update to the next round in storage. 1572 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?; 1573 // Clear the proposed batch. 1574 *self.proposed_batch.write() = None; 1575 } 1576 } 1577 1578 // Retrieve the current round. 1579 let current_round = self.current_round(); 1580 // Attempt to advance to the next round. 1581 if current_round < next_round { 1582 // If a BFT sender was provided, send the current round to the BFT. 1583 let is_ready = if let Some(bft_sender) = self.bft_sender.get() { 1584 match bft_sender.send_primary_round_to_bft(current_round).await { 1585 Ok(is_ready) => is_ready, 1586 Err(err) => { 1587 let err = err.context("Failed to update the BFT to the next round"); 1588 warn!("{}", flatten_error(&err)); 1589 return Err(err); 1590 } 1591 } 1592 } 1593 // Otherwise, handle the Narwhal case. 1594 else { 1595 // Update to the next round in storage. 1596 self.storage.increment_to_next_round(current_round)?; 1597 // Set 'is_ready' to 'true'. 1598 true 1599 }; 1600 1601 // Log whether the next round is ready. 1602 match is_ready { 1603 true => debug!("Primary is ready to propose the next round"), 1604 false => debug!("Primary is not ready to propose the next round"), 1605 } 1606 1607 // If the node is ready, propose a batch for the next round. 1608 if is_ready { 1609 self.propose_batch().await?; 1610 } 1611 } 1612 Ok(()) 1613 } 1614 1615 /// Ensures the primary is signing for the specified batch round. 1616 /// This method is used to ensure: for a given round, as soon as the primary starts proposing, 1617 /// it will no longer sign for the previous round (as it has enough previous certificates to proceed). 1618 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> { 1619 // Retrieve the current round. 1620 let current_round = self.current_round(); 1621 // Ensure the batch round is within GC range of the current round. 1622 if current_round + self.storage.max_gc_rounds() <= batch_round { 1623 bail!("Round {batch_round} is too far in the future") 1624 } 1625 // Ensure the batch round is at or one before the current round. 1626 // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing, 1627 // so we can still sign for the previous round. If we have started proposing, the next check will fail. 1628 if current_round > batch_round + 1 { 1629 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}") 1630 } 1631 // Check if the primary is still signing for the batch round. 1632 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) { 1633 if signing_round > batch_round { 1634 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}") 1635 } 1636 } 1637 Ok(()) 1638 } 1639 1640 /// Ensure the primary is not creating batch proposals too frequently. 1641 /// This checks that the certificate timestamp for the previous round is within the expected range. 1642 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> { 1643 // Retrieve the timestamp of the previous timestamp to check against. 1644 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) { 1645 // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. 1646 Some(certificate) => certificate.timestamp(), 1647 None => match self.gateway.account().address() == author { 1648 // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. 1649 true => *self.latest_proposed_batch_timestamp.read(), 1650 // If we do not see a previous certificate for the author, then proceed optimistically. 1651 false => return Ok(()), 1652 }, 1653 }; 1654 1655 // Determine the elapsed time since the previous timestamp. 1656 let elapsed = timestamp 1657 .checked_sub(previous_timestamp) 1658 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; 1659 // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. 1660 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 { 1661 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"), 1662 false => Ok(()), 1663 } 1664 } 1665 1666 /// Stores the certified batch and broadcasts it to all validators, returning the certificate. 1667 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> { 1668 // Create the batch certificate and transmissions. 1669 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?; 1670 // Convert the transmissions into a HashMap. 1671 // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety. 1672 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>(); 1673 // Store the certified batch. 1674 let (storage, certificate_) = (self.storage.clone(), certificate.clone()); 1675 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?; 1676 debug!("Stored a batch certificate for round {}", certificate.round()); 1677 // If a BFT sender was provided, send the certificate to the BFT. 1678 if let Some(bft_sender) = self.bft_sender.get() { 1679 // Await the callback to continue. 1680 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await { 1681 let err = err.context("Failed to update the BFT DAG from primary"); 1682 warn!("{}", flatten_error(&err)); 1683 return Err(err); 1684 }; 1685 } 1686 // Broadcast the certified batch to all validators. 1687 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into())); 1688 // Log the certified batch. 1689 let num_transmissions = certificate.transmission_ids().len(); 1690 let round = certificate.round(); 1691 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n"); 1692 // Increment to the next round. 1693 self.try_increment_to_the_next_round(round + 1).await 1694 } 1695 1696 /// Inserts the missing transmissions from the proposal into the workers. 1697 fn insert_missing_transmissions_into_workers( 1698 &self, 1699 peer_ip: SocketAddr, 1700 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>, 1701 ) -> Result<()> { 1702 // Insert the transmissions into the workers. 1703 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| { 1704 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission); 1705 }) 1706 } 1707 1708 /// Re-inserts the transmissions from the proposal into the workers. 1709 fn reinsert_transmissions_into_workers( 1710 &self, 1711 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>, 1712 ) -> Result<()> { 1713 // Re-insert the transmissions into the workers. 1714 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| { 1715 worker.reinsert(transmission_id, transmission); 1716 }) 1717 } 1718 1719 /// Recursively stores a given batch certificate, after ensuring: 1720 /// - Ensure the round matches the committee round. 1721 /// - Ensure the address is a member of the committee. 1722 /// - Ensure the timestamp is within range. 1723 /// - Ensure we have all of the transmissions. 1724 /// - Ensure we have all of the previous certificates. 1725 /// - Ensure the previous certificates are for the previous round (i.e. round - 1). 1726 /// - Ensure the previous certificates have reached the quorum threshold. 1727 /// - Ensure we have not already signed the batch ID. 1728 #[async_recursion::async_recursion] 1729 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>( 1730 &self, 1731 peer_ip: SocketAddr, 1732 certificate: BatchCertificate<N>, 1733 ) -> Result<()> { 1734 // Retrieve the batch header. 1735 let batch_header = certificate.batch_header(); 1736 // Retrieve the batch round. 1737 let batch_round = batch_header.round(); 1738 1739 // If the certificate round is outdated, do not store it. 1740 if batch_round <= self.storage.gc_round() { 1741 return Ok(()); 1742 } 1743 // If the certificate already exists in storage, return early. 1744 if self.storage.contains_certificate(certificate.id()) { 1745 return Ok(()); 1746 } 1747 1748 // If node is not in sync mode and the node is not synced. Then return an error. 1749 if !IS_SYNCING && !self.is_synced() { 1750 bail!( 1751 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)", 1752 fmt_id(certificate.id()) 1753 ); 1754 } 1755 1756 // If the peer is ahead, use the batch header to sync up to the peer. 1757 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?; 1758 1759 // Check if the certificate needs to be stored. 1760 if !self.storage.contains_certificate(certificate.id()) { 1761 // Store the batch certificate. 1762 let (storage, certificate_) = (self.storage.clone(), certificate.clone()); 1763 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?; 1764 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'"); 1765 // If a BFT sender was provided, send the round and certificate to the BFT. 1766 if let Some(bft_sender) = self.bft_sender.get() { 1767 // Send the certificate to the BFT. 1768 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await { 1769 let err = err.context("Failed to update the BFT DAG from sync"); 1770 warn!("{}", &flatten_error(&err)); 1771 return Err(err); 1772 }; 1773 } 1774 } 1775 Ok(()) 1776 } 1777 1778 /// Recursively syncs using the given batch header. 1779 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>( 1780 &self, 1781 peer_ip: SocketAddr, 1782 batch_header: &BatchHeader<N>, 1783 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> { 1784 // Retrieve the batch round. 1785 let batch_round = batch_header.round(); 1786 1787 // If the certificate round is outdated, do not store it. 1788 if batch_round <= self.storage.gc_round() { 1789 bail!("Round {batch_round} is too far in the past") 1790 } 1791 1792 // If node is not in sync mode and the node is not synced, then return an error. 1793 if !IS_SYNCING && !self.is_synced() { 1794 bail!( 1795 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)", 1796 fmt_id(batch_header.batch_id()) 1797 ); 1798 } 1799 1800 // Determine if quorum threshold is reached on the batch round. 1801 let is_quorum_threshold_reached = { 1802 let authors = self.storage.get_certificate_authors_for_round(batch_round); 1803 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?; 1804 committee_lookback.is_quorum_threshold_reached(&authors) 1805 }; 1806 1807 // Check if our primary should move to the next round. 1808 // Note: Checking that quorum threshold is reached is important for mitigating a race condition, 1809 // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary 1810 // will advance to the next round assuming f+1, not N-f, which can lead to a network stall. 1811 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round(); 1812 // Check if our primary is far behind the peer. 1813 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds(); 1814 // If our primary is far behind the peer, update our committee to the batch round. 1815 if is_behind_schedule || is_peer_far_in_future { 1816 // If the batch round is greater than the current committee round, update the committee. 1817 self.try_increment_to_the_next_round(batch_round).await?; 1818 } 1819 1820 // Ensure the primary has all of the transmissions. 1821 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header); 1822 1823 // Ensure the primary has all of the previous certificates. 1824 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header); 1825 1826 // Wait for the missing transmissions and previous certificates to be fetched. 1827 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!( 1828 missing_transmissions_handle, 1829 missing_previous_certificates_handle, 1830 ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?; 1831 1832 // Iterate through the missing previous certificates. 1833 for batch_certificate in missing_previous_certificates { 1834 // Store the batch certificate (recursively fetching any missing previous certificates). 1835 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?; 1836 } 1837 Ok(missing_transmissions) 1838 } 1839 1840 /// Fetches any missing transmissions for the specified batch header. 1841 /// If a transmission does not exist, it will be fetched from the specified peer IP. 1842 async fn fetch_missing_transmissions( 1843 &self, 1844 peer_ip: SocketAddr, 1845 batch_header: &BatchHeader<N>, 1846 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> { 1847 // If the round is <= the GC round, return early. 1848 if batch_header.round() <= self.storage.gc_round() { 1849 return Ok(Default::default()); 1850 } 1851 1852 // Ensure this batch ID is new, otherwise return early. 1853 if self.storage.contains_batch(batch_header.batch_id()) { 1854 trace!("Batch for round {} from peer has already been processed", batch_header.round()); 1855 return Ok(Default::default()); 1856 } 1857 1858 // Retrieve the workers. 1859 let workers = self.workers.clone(); 1860 1861 // Initialize a list for the transmissions. 1862 let mut fetch_transmissions = FuturesUnordered::new(); 1863 1864 // Retrieve the number of workers. 1865 let num_workers = self.num_workers(); 1866 // Iterate through the transmission IDs. 1867 for transmission_id in batch_header.transmission_ids() { 1868 // If the transmission does not exist in storage, proceed to fetch the transmission. 1869 if !self.storage.contains_transmission(*transmission_id) { 1870 // Determine the worker ID. 1871 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else { 1872 bail!("Unable to assign transmission ID '{transmission_id}' to a worker") 1873 }; 1874 // Retrieve the worker. 1875 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") }; 1876 // Push the callback onto the list. 1877 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id)); 1878 } 1879 } 1880 1881 // Initialize a set for the transmissions. 1882 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len()); 1883 // Wait for all of the transmissions to be fetched. 1884 while let Some(result) = fetch_transmissions.next().await { 1885 // Retrieve the transmission. 1886 let (transmission_id, transmission) = result?; 1887 // Insert the transmission into the set. 1888 transmissions.insert(transmission_id, transmission); 1889 } 1890 // Return the transmissions. 1891 Ok(transmissions) 1892 } 1893 1894 /// Fetches any missing previous certificates for the specified batch header from the specified peer. 1895 async fn fetch_missing_previous_certificates( 1896 &self, 1897 peer_ip: SocketAddr, 1898 batch_header: &BatchHeader<N>, 1899 ) -> Result<HashSet<BatchCertificate<N>>> { 1900 // Retrieve the round. 1901 let round = batch_header.round(); 1902 // If the previous round is 0, or is <= the GC round, return early. 1903 if round == 1 || round <= self.storage.gc_round() + 1 { 1904 return Ok(Default::default()); 1905 } 1906 1907 // Fetch the missing previous certificates. 1908 let missing_previous_certificates = 1909 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?; 1910 if !missing_previous_certificates.is_empty() { 1911 debug!( 1912 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'", 1913 missing_previous_certificates.len(), 1914 ); 1915 } 1916 // Return the missing previous certificates. 1917 Ok(missing_previous_certificates) 1918 } 1919 1920 /// Fetches any missing certificates for the specified batch header from the specified peer. 1921 async fn fetch_missing_certificates( 1922 &self, 1923 peer_ip: SocketAddr, 1924 round: u64, 1925 certificate_ids: &IndexSet<Field<N>>, 1926 ) -> Result<HashSet<BatchCertificate<N>>> { 1927 // Initialize a list for the missing certificates. 1928 let mut fetch_certificates = FuturesUnordered::new(); 1929 // Initialize a set for the missing certificates. 1930 let mut missing_certificates = HashSet::default(); 1931 // Iterate through the certificate IDs. 1932 for certificate_id in certificate_ids { 1933 // Check if the certificate already exists in the ledger. 1934 if self.ledger.contains_certificate(certificate_id)? { 1935 continue; 1936 } 1937 // Check if the certificate already exists in storage. 1938 if self.storage.contains_certificate(*certificate_id) { 1939 continue; 1940 } 1941 // If we have not fully processed the certificate yet, store it. 1942 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) { 1943 missing_certificates.insert(certificate); 1944 } else { 1945 // If we do not have the certificate, request it. 1946 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'"); 1947 // TODO (howardwu): Limit the number of open requests we send to a peer. 1948 // Send an certificate request to the peer. 1949 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id)); 1950 } 1951 } 1952 1953 // If there are no certificates to fetch, return early with the existing unprocessed certificates. 1954 match fetch_certificates.is_empty() { 1955 true => return Ok(missing_certificates), 1956 false => trace!( 1957 "Fetching {} missing certificates for round {round} from '{peer_ip}'...", 1958 fetch_certificates.len(), 1959 ), 1960 } 1961 1962 // Wait for all of the missing certificates to be fetched. 1963 while let Some(result) = fetch_certificates.next().await { 1964 // Insert the missing certificate into the set. 1965 missing_certificates.insert(result?); 1966 } 1967 // Return the missing certificates. 1968 Ok(missing_certificates) 1969 } 1970 } 1971 1972 impl<N: Network> Primary<N> { 1973 /// Spawns a task with the given future; it should only be used for long-running tasks. 1974 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 1975 self.handles.lock().push(tokio::spawn(future)); 1976 } 1977 1978 /// Shuts down the primary. 1979 pub async fn shut_down(&self) { 1980 info!("Shutting down the primary..."); 1981 // Shut down the workers. 1982 self.workers.iter().for_each(|worker| worker.shut_down()); 1983 // Abort the tasks. 1984 self.handles.lock().iter().for_each(|handle| handle.abort()); 1985 // Save the current proposal cache to disk. 1986 let proposal_cache = { 1987 let proposal = self.proposed_batch.write().take(); 1988 let signed_proposals = self.signed_proposals.read().clone(); 1989 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await); 1990 let pending_certificates = self.storage.get_pending_certificates(); 1991 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates) 1992 }; 1993 if let Err(err) = proposal_cache.store(&self.storage_mode) { 1994 error!("{}", flatten_error(err.context("Failed to store the current proposal cache"))); 1995 } 1996 // Close the gateway. 1997 self.gateway.shut_down().await; 1998 } 1999 } 2000 2001 #[cfg(test)] 2002 mod tests { 2003 use super::*; 2004 use alphaos_node_bft_ledger_service::MockLedgerService; 2005 use alphaos_node_bft_storage_service::BFTMemoryService; 2006 use alphaos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators}; 2007 use alphavm::{ 2008 ledger::{ 2009 committee::{Committee, MIN_VALIDATOR_STAKE}, 2010 test_helpers::sample_execution_transaction_with_fee, 2011 }, 2012 prelude::{Address, Signature}, 2013 }; 2014 2015 use bytes::Bytes; 2016 use indexmap::IndexSet; 2017 use rand::RngCore; 2018 2019 type CurrentNetwork = alphavm::prelude::MainnetV0; 2020 2021 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) { 2022 // Create a committee containing the primary's account. 2023 const COMMITTEE_SIZE: usize = 4; 2024 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE); 2025 let mut members = IndexMap::new(); 2026 2027 for i in 0..COMMITTEE_SIZE { 2028 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap(); 2029 let account = Account::new(rng).unwrap(); 2030 2031 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100))); 2032 accounts.push((socket_addr, account)); 2033 } 2034 2035 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap()) 2036 } 2037 2038 // Returns a primary and a list of accounts in the configured committee. 2039 fn primary_with_committee( 2040 account_index: usize, 2041 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2042 committee: Committee<CurrentNetwork>, 2043 height: u32, 2044 ) -> Primary<CurrentNetwork> { 2045 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height)); 2046 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10); 2047 2048 // Initialize the primary. 2049 let account = accounts[account_index].1.clone(); 2050 let block_sync = Arc::new(BlockSync::new(ledger.clone())); 2051 let mut primary = 2052 Primary::new(account, storage, ledger, block_sync, None, &[], false, StorageMode::new_test(None), None) 2053 .unwrap(); 2054 2055 // Construct a worker instance. 2056 primary.workers = Arc::from([Worker::new( 2057 0, // id 2058 Arc::new(primary.gateway.clone()), 2059 primary.storage.clone(), 2060 primary.ledger.clone(), 2061 primary.proposed_batch.clone(), 2062 ) 2063 .unwrap()]); 2064 for a in accounts.iter().skip(account_index) { 2065 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address()); 2066 } 2067 2068 primary 2069 } 2070 2071 fn primary_without_handlers( 2072 rng: &mut TestRng, 2073 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) { 2074 let (accounts, committee) = sample_committee(rng); 2075 let primary = primary_with_committee( 2076 0, // index of primary's account 2077 &accounts, 2078 committee, 2079 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(), 2080 ); 2081 2082 (primary, accounts) 2083 } 2084 2085 // Creates a mock solution. 2086 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) { 2087 // Sample a random fake solution ID. 2088 let solution_id = rng.r#gen::<u64>().into(); 2089 // Vary the size of the solutions. 2090 let size = rng.gen_range(1024..10 * 1024); 2091 // Sample random fake solution bytes. 2092 let mut vec = vec![0u8; size]; 2093 rng.fill_bytes(&mut vec); 2094 let solution = Data::Buffer(Bytes::from(vec)); 2095 // Return the solution ID and solution. 2096 (solution_id, solution) 2097 } 2098 2099 // Samples a test transaction. 2100 fn sample_unconfirmed_transaction( 2101 rng: &mut TestRng, 2102 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) { 2103 let transaction = sample_execution_transaction_with_fee(false, rng, 0); 2104 let id = transaction.id(); 2105 2106 (id, Data::Object(transaction)) 2107 } 2108 2109 // Creates a batch proposal with one solution and one transaction. 2110 fn create_test_proposal( 2111 author: &Account<CurrentNetwork>, 2112 committee: Committee<CurrentNetwork>, 2113 round: u64, 2114 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>, 2115 timestamp: i64, 2116 num_transactions: u64, 2117 rng: &mut TestRng, 2118 ) -> Proposal<CurrentNetwork> { 2119 let mut transmission_ids = IndexSet::new(); 2120 let mut transmissions = IndexMap::new(); 2121 2122 // Prepare the solution and insert into the sets. 2123 let (solution_id, solution) = sample_unconfirmed_solution(rng); 2124 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 2125 let solution_transmission_id = (solution_id, solution_checksum).into(); 2126 transmission_ids.insert(solution_transmission_id); 2127 transmissions.insert(solution_transmission_id, Transmission::Solution(solution)); 2128 2129 // Prepare the transactions and insert into the sets. 2130 for _ in 0..num_transactions { 2131 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng); 2132 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap(); 2133 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into(); 2134 transmission_ids.insert(transaction_transmission_id); 2135 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction)); 2136 } 2137 2138 // Retrieve the private key. 2139 let private_key = author.private_key(); 2140 // Sign the batch header. 2141 let batch_header = BatchHeader::new( 2142 private_key, 2143 round, 2144 timestamp, 2145 committee.id(), 2146 transmission_ids, 2147 previous_certificate_ids, 2148 rng, 2149 ) 2150 .unwrap(); 2151 // Construct the proposal. 2152 Proposal::new(committee, batch_header, transmissions).unwrap() 2153 } 2154 2155 // Creates a signature of the primary's current proposal for each committee member (excluding 2156 // the primary). 2157 fn peer_signatures_for_proposal( 2158 primary: &Primary<CurrentNetwork>, 2159 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2160 rng: &mut TestRng, 2161 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> { 2162 // Each committee member signs the batch. 2163 let mut signatures = Vec::with_capacity(accounts.len() - 1); 2164 for (socket_addr, account) in accounts { 2165 if account.address() == primary.gateway.account().address() { 2166 continue; 2167 } 2168 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id(); 2169 let signature = account.sign(&[batch_id], rng).unwrap(); 2170 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature))); 2171 } 2172 2173 signatures 2174 } 2175 2176 /// Creates a signature of the batch ID for each committee member (excluding the primary). 2177 fn peer_signatures_for_batch( 2178 primary_address: Address<CurrentNetwork>, 2179 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2180 batch_id: Field<CurrentNetwork>, 2181 rng: &mut TestRng, 2182 ) -> IndexSet<Signature<CurrentNetwork>> { 2183 let mut signatures = IndexSet::new(); 2184 for (_, account) in accounts { 2185 if account.address() == primary_address { 2186 continue; 2187 } 2188 let signature = account.sign(&[batch_id], rng).unwrap(); 2189 signatures.insert(signature); 2190 } 2191 signatures 2192 } 2193 2194 // Creates a batch certificate. 2195 fn create_batch_certificate( 2196 primary_address: Address<CurrentNetwork>, 2197 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2198 round: u64, 2199 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>, 2200 rng: &mut TestRng, 2201 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) { 2202 let timestamp = now(); 2203 2204 let author = 2205 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap(); 2206 let private_key = author.private_key(); 2207 2208 let committee_id = Field::rand(rng); 2209 let (solution_id, solution) = sample_unconfirmed_solution(rng); 2210 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng); 2211 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 2212 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap(); 2213 2214 let solution_transmission_id = (solution_id, solution_checksum).into(); 2215 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into(); 2216 2217 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into(); 2218 let transmissions = [ 2219 (solution_transmission_id, Transmission::Solution(solution)), 2220 (transaction_transmission_id, Transmission::Transaction(transaction)), 2221 ] 2222 .into(); 2223 2224 let batch_header = BatchHeader::new( 2225 private_key, 2226 round, 2227 timestamp, 2228 committee_id, 2229 transmission_ids, 2230 previous_certificate_ids, 2231 rng, 2232 ) 2233 .unwrap(); 2234 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng); 2235 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap(); 2236 (certificate, transmissions) 2237 } 2238 2239 // Create a certificate chain up to, but not including, the specified round in the primary storage. 2240 fn store_certificate_chain( 2241 primary: &Primary<CurrentNetwork>, 2242 accounts: &[(SocketAddr, Account<CurrentNetwork>)], 2243 round: u64, 2244 rng: &mut TestRng, 2245 ) -> IndexSet<Field<CurrentNetwork>> { 2246 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new(); 2247 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new(); 2248 for cur_round in 1..round { 2249 for (_, account) in accounts.iter() { 2250 let (certificate, transmissions) = create_batch_certificate( 2251 account.address(), 2252 accounts, 2253 cur_round, 2254 previous_certificates.clone(), 2255 rng, 2256 ); 2257 next_certificates.insert(certificate.id()); 2258 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok()); 2259 } 2260 2261 assert!(primary.storage.increment_to_next_round(cur_round).is_ok()); 2262 previous_certificates = next_certificates; 2263 next_certificates = IndexSet::<Field<CurrentNetwork>>::new(); 2264 } 2265 2266 previous_certificates 2267 } 2268 2269 // Insert the account socket addresses into the resolver so that 2270 // they are recognized as "connected". 2271 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) { 2272 // First account is primary, which doesn't need to resolve. 2273 for (addr, acct) in accounts.iter().skip(1) { 2274 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address())); 2275 } 2276 } 2277 2278 #[tokio::test] 2279 async fn test_propose_batch() { 2280 let mut rng = TestRng::default(); 2281 let (primary, _) = primary_without_handlers(&mut rng); 2282 2283 // Check there is no batch currently proposed. 2284 assert!(primary.proposed_batch.read().is_none()); 2285 2286 // Generate a solution and a transaction. 2287 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2288 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2289 2290 // Store it on one of the workers. 2291 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2292 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2293 2294 // Try to propose a batch again. This time, it should succeed. 2295 assert!(primary.propose_batch().await.is_ok()); 2296 assert!(primary.proposed_batch.read().is_some()); 2297 } 2298 2299 #[tokio::test] 2300 async fn test_propose_batch_with_no_transmissions() { 2301 let mut rng = TestRng::default(); 2302 let (primary, _) = primary_without_handlers(&mut rng); 2303 2304 // Check there is no batch currently proposed. 2305 assert!(primary.proposed_batch.read().is_none()); 2306 2307 // Try to propose a batch with no transmissions. 2308 assert!(primary.propose_batch().await.is_ok()); 2309 assert!(primary.proposed_batch.read().is_some()); 2310 } 2311 2312 #[tokio::test] 2313 async fn test_propose_batch_in_round() { 2314 let round = 3; 2315 let mut rng = TestRng::default(); 2316 let (primary, accounts) = primary_without_handlers(&mut rng); 2317 2318 // Fill primary storage. 2319 store_certificate_chain(&primary, &accounts, round, &mut rng); 2320 2321 // Sleep for a while to ensure the primary is ready to propose the next round. 2322 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; 2323 2324 // Generate a solution and a transaction. 2325 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2326 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2327 2328 // Store it on one of the workers. 2329 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2330 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2331 2332 // Propose a batch again. This time, it should succeed. 2333 assert!(primary.propose_batch().await.is_ok()); 2334 assert!(primary.proposed_batch.read().is_some()); 2335 } 2336 2337 #[tokio::test] 2338 async fn test_propose_batch_skip_transmissions_from_previous_certificates() { 2339 let round = 3; 2340 let prev_round = round - 1; 2341 let mut rng = TestRng::default(); 2342 let (primary, accounts) = primary_without_handlers(&mut rng); 2343 let peer_account = &accounts[1]; 2344 let peer_ip = peer_account.0; 2345 2346 // Fill primary storage. 2347 store_certificate_chain(&primary, &accounts, round, &mut rng); 2348 2349 // Get transmissions from previous certificates. 2350 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round); 2351 2352 // Track the number of transmissions in the previous round. 2353 let mut num_transmissions_in_previous_round = 0; 2354 2355 // Generate a solution and a transaction. 2356 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); 2357 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2358 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 2359 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap(); 2360 2361 // Store it on one of the workers. 2362 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); 2363 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2364 2365 // Check that the worker has 2 transmissions. 2366 assert_eq!(primary.workers[0].num_transmissions(), 2); 2367 2368 // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage. 2369 for (_, account) in accounts.iter() { 2370 let (certificate, transmissions) = create_batch_certificate( 2371 account.address(), 2372 &accounts, 2373 round, 2374 previous_certificate_ids.clone(), 2375 &mut rng, 2376 ); 2377 2378 // Add the transmissions to the worker. 2379 for (transmission_id, transmission) in transmissions.iter() { 2380 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 2381 } 2382 2383 // Insert the certificate to storage. 2384 num_transmissions_in_previous_round += transmissions.len(); 2385 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap(); 2386 } 2387 2388 // Sleep for a while to ensure the primary is ready to propose the next round. 2389 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; 2390 2391 // Advance to the next round. 2392 assert!(primary.storage.increment_to_next_round(round).is_ok()); 2393 2394 // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions. 2395 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2); 2396 2397 // Propose the batch. 2398 assert!(primary.propose_batch().await.is_ok()); 2399 2400 // Check that the proposal only contains the new transmissions that were not in previous certificates. 2401 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone(); 2402 assert_eq!(proposed_transmissions.len(), 2); 2403 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum))); 2404 assert!( 2405 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum)) 2406 ); 2407 } 2408 2409 #[tokio::test] 2410 async fn test_propose_batch_over_spend_limit() { 2411 let mut rng = TestRng::default(); 2412 2413 // Create a primary to test spend limit backwards compatibility with V4. 2414 let (accounts, committee) = sample_committee(&mut rng); 2415 let primary = primary_with_committee( 2416 0, 2417 &accounts, 2418 committee.clone(), 2419 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(), 2420 ); 2421 2422 // Check there is no batch currently proposed. 2423 assert!(primary.proposed_batch.read().is_none()); 2424 // Check the workers are empty. 2425 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty())); 2426 2427 // Generate a solution and transactions. 2428 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2429 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2430 2431 for _i in 0..5 { 2432 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2433 // Store it on one of the workers. 2434 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2435 } 2436 2437 // Try to propose a batch again. This time, it should succeed. 2438 assert!(primary.propose_batch().await.is_ok()); 2439 // Expect 2/5 transactions to be included in the proposal in addition to the solution. 2440 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3); 2441 // Check the transmissions were correctly drained from the workers. 2442 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3); 2443 } 2444 2445 #[tokio::test] 2446 async fn test_batch_propose_from_peer() { 2447 let mut rng = TestRng::default(); 2448 let (primary, accounts) = primary_without_handlers(&mut rng); 2449 2450 // Create a valid proposal with an author that isn't the primary. 2451 let round = 1; 2452 let peer_account = &accounts[1]; 2453 let peer_ip = peer_account.0; 2454 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2455 let proposal = create_test_proposal( 2456 &peer_account.1, 2457 primary.ledger.current_committee().unwrap(), 2458 round, 2459 Default::default(), 2460 timestamp, 2461 1, 2462 &mut rng, 2463 ); 2464 2465 // Make sure the primary is aware of the transmissions in the proposal. 2466 for (transmission_id, transmission) in proposal.transmissions() { 2467 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2468 } 2469 2470 // The author must be known to resolver to pass propose checks. 2471 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2472 2473 // The primary will only consider itself synced if we received 2474 // block locators from a peer. 2475 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2476 primary.sync.testing_only_try_block_sync_testing_only().await; 2477 2478 // Try to process the batch proposal from the peer, should succeed. 2479 assert!( 2480 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok() 2481 ); 2482 } 2483 2484 #[tokio::test] 2485 async fn test_batch_propose_from_peer_when_not_synced() { 2486 let mut rng = TestRng::default(); 2487 let (primary, accounts) = primary_without_handlers(&mut rng); 2488 2489 // Create a valid proposal with an author that isn't the primary. 2490 let round = 1; 2491 let peer_account = &accounts[1]; 2492 let peer_ip = peer_account.0; 2493 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2494 let proposal = create_test_proposal( 2495 &peer_account.1, 2496 primary.ledger.current_committee().unwrap(), 2497 round, 2498 Default::default(), 2499 timestamp, 2500 1, 2501 &mut rng, 2502 ); 2503 2504 // Make sure the primary is aware of the transmissions in the proposal. 2505 for (transmission_id, transmission) in proposal.transmissions() { 2506 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2507 } 2508 2509 // The author must be known to resolver to pass propose checks. 2510 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2511 2512 // Add a high block locator to indicate we are not synced. 2513 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap(); 2514 2515 // Try to process the batch proposal from the peer, should fail 2516 assert!( 2517 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err() 2518 ); 2519 } 2520 2521 #[tokio::test] 2522 async fn test_batch_propose_from_peer_in_round() { 2523 let round = 2; 2524 let mut rng = TestRng::default(); 2525 let (primary, accounts) = primary_without_handlers(&mut rng); 2526 2527 // Generate certificates. 2528 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2529 2530 // Create a valid proposal with an author that isn't the primary. 2531 let peer_account = &accounts[1]; 2532 let peer_ip = peer_account.0; 2533 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2534 let proposal = create_test_proposal( 2535 &peer_account.1, 2536 primary.ledger.current_committee().unwrap(), 2537 round, 2538 previous_certificates, 2539 timestamp, 2540 1, 2541 &mut rng, 2542 ); 2543 2544 // Make sure the primary is aware of the transmissions in the proposal. 2545 for (transmission_id, transmission) in proposal.transmissions() { 2546 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2547 } 2548 2549 // The author must be known to resolver to pass propose checks. 2550 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2551 2552 // The primary will only consider itself synced if we received 2553 // block locators from a peer. 2554 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2555 primary.sync.testing_only_try_block_sync_testing_only().await; 2556 2557 // Try to process the batch proposal from the peer, should succeed. 2558 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap(); 2559 } 2560 2561 #[tokio::test] 2562 async fn test_batch_propose_from_peer_wrong_round() { 2563 let mut rng = TestRng::default(); 2564 let (primary, accounts) = primary_without_handlers(&mut rng); 2565 2566 // Create a valid proposal with an author that isn't the primary. 2567 let round = 1; 2568 let peer_account = &accounts[1]; 2569 let peer_ip = peer_account.0; 2570 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2571 let proposal = create_test_proposal( 2572 &peer_account.1, 2573 primary.ledger.current_committee().unwrap(), 2574 round, 2575 Default::default(), 2576 timestamp, 2577 1, 2578 &mut rng, 2579 ); 2580 2581 // Make sure the primary is aware of the transmissions in the proposal. 2582 for (transmission_id, transmission) in proposal.transmissions() { 2583 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2584 } 2585 2586 // The author must be known to resolver to pass propose checks. 2587 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2588 // The primary must be considered synced. 2589 primary.sync.testing_only_try_block_sync_testing_only().await; 2590 2591 // Try to process the batch proposal from the peer, should error. 2592 assert!( 2593 primary 2594 .process_batch_propose_from_peer(peer_ip, BatchPropose { 2595 round: round + 1, 2596 batch_header: Data::Object(proposal.batch_header().clone()) 2597 }) 2598 .await 2599 .is_err() 2600 ); 2601 } 2602 2603 #[tokio::test] 2604 async fn test_batch_propose_from_peer_in_round_wrong_round() { 2605 let round = 4; 2606 let mut rng = TestRng::default(); 2607 let (primary, accounts) = primary_without_handlers(&mut rng); 2608 2609 // Generate certificates. 2610 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2611 2612 // Create a valid proposal with an author that isn't the primary. 2613 let peer_account = &accounts[1]; 2614 let peer_ip = peer_account.0; 2615 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2616 let proposal = create_test_proposal( 2617 &peer_account.1, 2618 primary.ledger.current_committee().unwrap(), 2619 round, 2620 previous_certificates, 2621 timestamp, 2622 1, 2623 &mut rng, 2624 ); 2625 2626 // Make sure the primary is aware of the transmissions in the proposal. 2627 for (transmission_id, transmission) in proposal.transmissions() { 2628 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2629 } 2630 2631 // The author must be known to resolver to pass propose checks. 2632 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2633 // The primary must be considered synced. 2634 primary.sync.testing_only_try_block_sync_testing_only().await; 2635 2636 // Try to process the batch proposal from the peer, should error. 2637 assert!( 2638 primary 2639 .process_batch_propose_from_peer(peer_ip, BatchPropose { 2640 round: round + 1, 2641 batch_header: Data::Object(proposal.batch_header().clone()) 2642 }) 2643 .await 2644 .is_err() 2645 ); 2646 } 2647 2648 /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected. 2649 #[tokio::test] 2650 async fn test_batch_propose_from_peer_with_past_timestamp() { 2651 let round = 2; 2652 let mut rng = TestRng::default(); 2653 let (primary, accounts) = primary_without_handlers(&mut rng); 2654 2655 // Generate certificates. 2656 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2657 2658 // Create a valid proposal with an author that isn't the primary. 2659 let peer_account = &accounts[1]; 2660 let peer_ip = peer_account.0; 2661 2662 // Use a timestamp that is too early. 2663 // Set it to something that is less than the minimum batch delay 2664 // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp 2665 let last_timestamp = primary 2666 .storage 2667 .get_certificate_for_round_with_author(round - 1, peer_account.1.address()) 2668 .expect("No previous proposal exists") 2669 .timestamp(); 2670 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1; 2671 2672 let proposal = create_test_proposal( 2673 &peer_account.1, 2674 primary.ledger.current_committee().unwrap(), 2675 round, 2676 previous_certificates, 2677 invalid_timestamp, 2678 1, 2679 &mut rng, 2680 ); 2681 2682 // Make sure the primary is aware of the transmissions in the proposal. 2683 for (transmission_id, transmission) in proposal.transmissions() { 2684 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) 2685 } 2686 2687 // The author must be known to resolver to pass propose checks. 2688 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2689 // The primary must be considered synced. 2690 primary.sync.testing_only_try_block_sync_testing_only().await; 2691 2692 // Try to process the batch proposal from the peer, should error. 2693 assert!( 2694 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err() 2695 ); 2696 } 2697 2698 /// Check that proposals rejected that have timestamps older than the previous proposal. 2699 #[tokio::test] 2700 async fn test_batch_propose_from_peer_over_spend_limit() { 2701 let mut rng = TestRng::default(); 2702 2703 // Create two primaries to test spend limit activation on V5. 2704 let (accounts, committee) = sample_committee(&mut rng); 2705 let primary_v4 = primary_with_committee( 2706 0, 2707 &accounts, 2708 committee.clone(), 2709 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(), 2710 ); 2711 let primary_v5 = primary_with_committee( 2712 1, 2713 &accounts, 2714 committee.clone(), 2715 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(), 2716 ); 2717 2718 // Create a valid proposal with an author that isn't the primary. 2719 let round = 1; 2720 let peer_account = &accounts[2]; 2721 let peer_ip = peer_account.0; 2722 2723 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2724 2725 let proposal = 2726 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng); 2727 2728 // Make sure the primary is aware of the transmissions in the proposal. 2729 for (transmission_id, transmission) in proposal.transmissions() { 2730 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 2731 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 2732 } 2733 2734 // The author must be known to resolver to pass propose checks. 2735 primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2736 primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); 2737 2738 // primary v4 must be considered synced. 2739 primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2740 primary_v4.sync.testing_only_try_block_sync_testing_only().await; 2741 2742 // primary v5 must be ocnsidered synced. 2743 primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap(); 2744 primary_v5.sync.testing_only_try_block_sync_testing_only().await; 2745 2746 // Check the spend limit is enforced from V5 onwards. 2747 assert!( 2748 primary_v4 2749 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) 2750 .await 2751 .is_ok() 2752 ); 2753 2754 assert!( 2755 primary_v5 2756 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) 2757 .await 2758 .is_err() 2759 ); 2760 } 2761 2762 #[tokio::test] 2763 async fn test_propose_batch_with_storage_round_behind_proposal_lock() { 2764 let round = 3; 2765 let mut rng = TestRng::default(); 2766 let (primary, _) = primary_without_handlers(&mut rng); 2767 2768 // Check there is no batch currently proposed. 2769 assert!(primary.proposed_batch.read().is_none()); 2770 2771 // Generate a solution and a transaction. 2772 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); 2773 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2774 2775 // Store it on one of the workers. 2776 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); 2777 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2778 2779 // Set the proposal lock to a round ahead of the storage. 2780 let old_proposal_lock_round = *primary.propose_lock.lock().await; 2781 *primary.propose_lock.lock().await = round + 1; 2782 2783 // Propose a batch and enforce that it fails. 2784 assert!(primary.propose_batch().await.is_ok()); 2785 assert!(primary.proposed_batch.read().is_none()); 2786 2787 // Set the proposal lock back to the old round. 2788 *primary.propose_lock.lock().await = old_proposal_lock_round; 2789 2790 // Try to propose a batch again. This time, it should succeed. 2791 assert!(primary.propose_batch().await.is_ok()); 2792 assert!(primary.proposed_batch.read().is_some()); 2793 } 2794 2795 #[tokio::test] 2796 async fn test_propose_batch_with_storage_round_behind_proposal() { 2797 let round = 5; 2798 let mut rng = TestRng::default(); 2799 let (primary, accounts) = primary_without_handlers(&mut rng); 2800 2801 // Generate previous certificates. 2802 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2803 2804 // Create a valid proposal. 2805 let timestamp = now(); 2806 let proposal = create_test_proposal( 2807 primary.gateway.account(), 2808 primary.ledger.current_committee().unwrap(), 2809 round + 1, 2810 previous_certificates, 2811 timestamp, 2812 1, 2813 &mut rng, 2814 ); 2815 2816 // Store the proposal on the primary. 2817 *primary.proposed_batch.write() = Some(proposal); 2818 2819 // Try to propose a batch will terminate early because the storage is behind the proposal. 2820 assert!(primary.propose_batch().await.is_ok()); 2821 assert!(primary.proposed_batch.read().is_some()); 2822 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round()); 2823 } 2824 2825 #[tokio::test(flavor = "multi_thread")] 2826 async fn test_batch_signature_from_peer() { 2827 let mut rng = TestRng::default(); 2828 let (primary, accounts) = primary_without_handlers(&mut rng); 2829 map_account_addresses(&primary, &accounts); 2830 2831 // Create a valid proposal. 2832 let round = 1; 2833 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2834 let proposal = create_test_proposal( 2835 primary.gateway.account(), 2836 primary.ledger.current_committee().unwrap(), 2837 round, 2838 Default::default(), 2839 timestamp, 2840 1, 2841 &mut rng, 2842 ); 2843 2844 // Store the proposal on the primary. 2845 *primary.proposed_batch.write() = Some(proposal); 2846 2847 // Each committee member signs the batch. 2848 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2849 2850 // Have the primary process the signatures. 2851 for (socket_addr, signature) in signatures { 2852 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap(); 2853 } 2854 2855 // Check the certificate was created and stored by the primary. 2856 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2857 // Check the round was incremented. 2858 assert_eq!(primary.current_round(), round + 1); 2859 } 2860 2861 #[tokio::test(flavor = "multi_thread")] 2862 async fn test_batch_signature_from_peer_in_round() { 2863 let round = 5; 2864 let mut rng = TestRng::default(); 2865 let (primary, accounts) = primary_without_handlers(&mut rng); 2866 map_account_addresses(&primary, &accounts); 2867 2868 // Generate certificates. 2869 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2870 2871 // Create a valid proposal. 2872 let timestamp = now(); 2873 let proposal = create_test_proposal( 2874 primary.gateway.account(), 2875 primary.ledger.current_committee().unwrap(), 2876 round, 2877 previous_certificates, 2878 timestamp, 2879 1, 2880 &mut rng, 2881 ); 2882 2883 // Store the proposal on the primary. 2884 *primary.proposed_batch.write() = Some(proposal); 2885 2886 // Each committee member signs the batch. 2887 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2888 2889 // Have the primary process the signatures. 2890 for (socket_addr, signature) in signatures { 2891 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap(); 2892 } 2893 2894 // Check the certificate was created and stored by the primary. 2895 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2896 // Check the round was incremented. 2897 assert_eq!(primary.current_round(), round + 1); 2898 } 2899 2900 #[tokio::test] 2901 async fn test_batch_signature_from_peer_no_quorum() { 2902 let mut rng = TestRng::default(); 2903 let (primary, accounts) = primary_without_handlers(&mut rng); 2904 map_account_addresses(&primary, &accounts); 2905 2906 // Create a valid proposal. 2907 let round = 1; 2908 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2909 let proposal = create_test_proposal( 2910 primary.gateway.account(), 2911 primary.ledger.current_committee().unwrap(), 2912 round, 2913 Default::default(), 2914 timestamp, 2915 1, 2916 &mut rng, 2917 ); 2918 2919 // Store the proposal on the primary. 2920 *primary.proposed_batch.write() = Some(proposal); 2921 2922 // Each committee member signs the batch. 2923 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2924 2925 // Have the primary process only one signature, mimicking a lack of quorum. 2926 let (socket_addr, signature) = signatures.first().unwrap(); 2927 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap(); 2928 2929 // Check the certificate was not created and stored by the primary. 2930 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2931 // Check the round was incremented. 2932 assert_eq!(primary.current_round(), round); 2933 } 2934 2935 #[tokio::test] 2936 async fn test_batch_signature_from_peer_in_round_no_quorum() { 2937 let round = 7; 2938 let mut rng = TestRng::default(); 2939 let (primary, accounts) = primary_without_handlers(&mut rng); 2940 map_account_addresses(&primary, &accounts); 2941 2942 // Generate certificates. 2943 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); 2944 2945 // Create a valid proposal. 2946 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; 2947 let proposal = create_test_proposal( 2948 primary.gateway.account(), 2949 primary.ledger.current_committee().unwrap(), 2950 round, 2951 previous_certificates, 2952 timestamp, 2953 1, 2954 &mut rng, 2955 ); 2956 2957 // Store the proposal on the primary. 2958 *primary.proposed_batch.write() = Some(proposal); 2959 2960 // Each committee member signs the batch. 2961 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); 2962 2963 // Have the primary process only one signature, mimicking a lack of quorum. 2964 let (socket_addr, signature) = signatures.first().unwrap(); 2965 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap(); 2966 2967 // Check the certificate was not created and stored by the primary. 2968 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); 2969 // Check the round was incremented. 2970 assert_eq!(primary.current_round(), round); 2971 } 2972 2973 #[tokio::test] 2974 async fn test_insert_certificate_with_aborted_transmissions() { 2975 let round = 3; 2976 let prev_round = round - 1; 2977 let mut rng = TestRng::default(); 2978 let (primary, accounts) = primary_without_handlers(&mut rng); 2979 let peer_account = &accounts[1]; 2980 let peer_ip = peer_account.0; 2981 2982 // Fill primary storage. 2983 store_certificate_chain(&primary, &accounts, round, &mut rng); 2984 2985 // Get transmissions from previous certificates. 2986 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round); 2987 2988 // Generate a solution and a transaction. 2989 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); 2990 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); 2991 2992 // Store it on one of the workers. 2993 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); 2994 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); 2995 2996 // Check that the worker has 2 transmissions. 2997 assert_eq!(primary.workers[0].num_transmissions(), 2); 2998 2999 // Create certificates for the current round. 3000 let account = accounts[0].1.clone(); 3001 let (certificate, transmissions) = 3002 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng); 3003 let certificate_id = certificate.id(); 3004 3005 // Randomly abort some of the transmissions. 3006 let mut aborted_transmissions = HashSet::new(); 3007 let mut transmissions_without_aborted = HashMap::new(); 3008 for (transmission_id, transmission) in transmissions.clone() { 3009 match rng.r#gen::<bool>() || aborted_transmissions.is_empty() { 3010 true => { 3011 // Insert the aborted transmission. 3012 aborted_transmissions.insert(transmission_id); 3013 } 3014 false => { 3015 // Insert the transmission without the aborted transmission. 3016 transmissions_without_aborted.insert(transmission_id, transmission); 3017 } 3018 }; 3019 } 3020 3021 // Add the non-aborted transmissions to the worker. 3022 for (transmission_id, transmission) in transmissions_without_aborted.iter() { 3023 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); 3024 } 3025 3026 // Check that inserting the transmission with missing transmissions fails. 3027 assert!( 3028 primary 3029 .storage 3030 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default()) 3031 .is_err() 3032 ); 3033 assert!( 3034 primary 3035 .storage 3036 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default()) 3037 .is_err() 3038 ); 3039 3040 // Insert the certificate to storage. 3041 primary 3042 .storage 3043 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone()) 3044 .unwrap(); 3045 3046 // Ensure the certificate exists in storage. 3047 assert!(primary.storage.contains_certificate(certificate_id)); 3048 // Ensure that the aborted transmission IDs exist in storage. 3049 for aborted_transmission_id in aborted_transmissions { 3050 assert!(primary.storage.contains_transmission(aborted_transmission_id)); 3051 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none()); 3052 } 3053 } 3054 }