/ node / bft / src / primary.rs
primary.rs
   1  // Copyright (c) 2025 ADnet Contributors
   2  // This file is part of the AlphaOS library.
   3  
   4  // Licensed under the Apache License, Version 2.0 (the "License");
   5  // you may not use this file except in compliance with the License.
   6  // You may obtain a copy of the License at:
   7  
   8  // http://www.apache.org/licenses/LICENSE-2.0
   9  
  10  // Unless required by applicable law or agreed to in writing, software
  11  // distributed under the License is distributed on an "AS IS" BASIS,
  12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13  // See the License for the specific language governing permissions and
  14  // limitations under the License.
  15  
  16  use crate::{
  17      Gateway,
  18      MAX_BATCH_DELAY_IN_MS,
  19      MAX_WORKERS,
  20      MIN_BATCH_DELAY_IN_SECS,
  21      PRIMARY_PING_IN_MS,
  22      Sync,
  23      Transport,
  24      WORKER_PING_IN_MS,
  25      Worker,
  26      events::{BatchPropose, BatchSignature, Event},
  27      helpers::{
  28          BFTSender,
  29          PrimaryReceiver,
  30          PrimarySender,
  31          Proposal,
  32          ProposalCache,
  33          SignedProposals,
  34          Storage,
  35          assign_to_worker,
  36          assign_to_workers,
  37          fmt_id,
  38          init_sync_channels,
  39          init_worker_channels,
  40          now,
  41      },
  42      spawn_blocking,
  43  };
  44  use alphaos_account::Account;
  45  use alphaos_node_bft_events::PrimaryPing;
  46  use alphaos_node_bft_ledger_service::LedgerService;
  47  use alphaos_node_network::PeerPoolHandling;
  48  use alphaos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
  49  use alphavm::{
  50      console::{
  51          prelude::*,
  52          types::{Address, Field},
  53      },
  54      ledger::{
  55          block::Transaction,
  56          narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
  57          puzzle::{Solution, SolutionID},
  58      },
  59      prelude::{ConsensusVersion, committee::Committee},
  60      utilities::flatten_error,
  61  };
  62  
  63  use alpha_std::StorageMode;
  64  use anyhow::Context;
  65  use colored::Colorize;
  66  use futures::stream::{FuturesUnordered, StreamExt};
  67  use indexmap::{IndexMap, IndexSet};
  68  #[cfg(feature = "locktick")]
  69  use locktick::{
  70      parking_lot::{Mutex, RwLock},
  71      tokio::Mutex as TMutex,
  72  };
  73  #[cfg(not(feature = "locktick"))]
  74  use parking_lot::{Mutex, RwLock};
  75  #[cfg(not(feature = "serial"))]
  76  use rayon::prelude::*;
  77  use std::{
  78      collections::{HashMap, HashSet},
  79      future::Future,
  80      net::SocketAddr,
  81      sync::Arc,
  82      time::Duration,
  83  };
  84  #[cfg(not(feature = "locktick"))]
  85  use tokio::sync::Mutex as TMutex;
  86  use tokio::{sync::OnceCell, task::JoinHandle};
  87  
  88  /// A helper type for an optional proposed batch.
  89  pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
  90  
  91  /// The primary logic of a node.
  92  /// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
  93  #[derive(Clone)]
  94  pub struct Primary<N: Network> {
  95      /// The sync module enables fetching data from other validators.
  96      sync: Sync<N>,
  97      /// The gateway allows talking to other nodes in the validator set.
  98      gateway: Gateway<N>,
  99      /// The storage.
 100      storage: Storage<N>,
 101      /// The ledger service.
 102      ledger: Arc<dyn LedgerService<N>>,
 103      /// The workers.
 104      workers: Arc<[Worker<N>]>,
 105      /// The BFT sender.
 106      bft_sender: Arc<OnceCell<BFTSender<N>>>,
 107      /// The batch proposal, if the primary is currently proposing a batch.
 108      proposed_batch: Arc<ProposedBatch<N>>,
 109      /// The timestamp of the most recent proposed batch.
 110      latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
 111      /// The recently-signed batch proposals.
 112      signed_proposals: Arc<RwLock<SignedProposals<N>>>,
 113      /// The handles for all background tasks spawned by this primary.
 114      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
 115      /// The lock for propose_batch.
 116      propose_lock: Arc<TMutex<u64>>,
 117      /// The storage mode of the node.
 118      storage_mode: StorageMode,
 119  }
 120  
 121  impl<N: Network> Primary<N> {
 122      /// The maximum number of unconfirmed transmissions to send to the primary.
 123      pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
 124  
 125      /// Initializes a new primary instance.
 126      #[allow(clippy::too_many_arguments)]
 127      pub fn new(
 128          account: Account<N>,
 129          storage: Storage<N>,
 130          ledger: Arc<dyn LedgerService<N>>,
 131          block_sync: Arc<BlockSync<N>>,
 132          ip: Option<SocketAddr>,
 133          trusted_validators: &[SocketAddr],
 134          trusted_peers_only: bool,
 135          storage_mode: StorageMode,
 136          dev: Option<u16>,
 137      ) -> Result<Self> {
 138          // Initialize the gateway.
 139          let gateway = Gateway::new(
 140              account,
 141              storage.clone(),
 142              ledger.clone(),
 143              ip,
 144              trusted_validators,
 145              trusted_peers_only,
 146              storage_mode.clone(),
 147              dev,
 148          )?;
 149          // Initialize the sync module.
 150          let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
 151  
 152          // Initialize the primary instance.
 153          Ok(Self {
 154              sync,
 155              gateway,
 156              storage,
 157              ledger,
 158              workers: Arc::from(vec![]),
 159              bft_sender: Default::default(),
 160              proposed_batch: Default::default(),
 161              latest_proposed_batch_timestamp: Default::default(),
 162              signed_proposals: Default::default(),
 163              handles: Default::default(),
 164              propose_lock: Default::default(),
 165              storage_mode,
 166          })
 167      }
 168  
 169      /// Load the proposal cache file and update the Primary state with the stored data.
 170      async fn load_proposal_cache(&self) -> Result<()> {
 171          // Fetch the signed proposals from the file system if it exists.
 172          match ProposalCache::<N>::exists(&self.storage_mode) {
 173              // If the proposal cache exists, then process the proposal cache.
 174              true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
 175                  Ok(proposal_cache) => {
 176                      // Extract the proposal and signed proposals.
 177                      let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
 178                          proposal_cache.into();
 179  
 180                      // Write the proposed batch.
 181                      *self.proposed_batch.write() = proposed_batch;
 182                      // Write the signed proposals.
 183                      *self.signed_proposals.write() = signed_proposals;
 184                      // Writ the propose lock.
 185                      *self.propose_lock.lock().await = latest_certificate_round;
 186  
 187                      // Update the storage with the pending certificates.
 188                      for certificate in pending_certificates {
 189                          let batch_id = certificate.batch_id();
 190                          // We use a dummy IP because the node should not need to request from any peers.
 191                          // The storage should have stored all the transmissions. If not, we simply
 192                          // skip the certificate.
 193                          if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
 194                          {
 195                              let err = err.context(format!(
 196                                  "Failed to load stored certificate {} from proposal cache",
 197                                  fmt_id(batch_id)
 198                              ));
 199                              warn!("{}", &flatten_error(err));
 200                          }
 201                      }
 202                      Ok(())
 203                  }
 204                  Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
 205              },
 206              // If the proposal cache does not exist, then return early.
 207              false => Ok(()),
 208          }
 209      }
 210  
 211      /// Run the primary instance.
 212      pub async fn run(
 213          &mut self,
 214          ping: Option<Arc<Ping<N>>>,
 215          bft_sender: Option<BFTSender<N>>,
 216          primary_sender: PrimarySender<N>,
 217          primary_receiver: PrimaryReceiver<N>,
 218      ) -> Result<()> {
 219          info!("Starting the primary instance of the memory pool...");
 220  
 221          // Set the BFT sender.
 222          if let Some(bft_sender) = &bft_sender {
 223              // Set the BFT sender in the primary.
 224              self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
 225          }
 226  
 227          // Construct a map of the worker senders.
 228          let mut worker_senders = IndexMap::new();
 229          // Construct a map for the workers.
 230          let mut workers = Vec::new();
 231          // Initialize the workers.
 232          for id in 0..MAX_WORKERS {
 233              // Construct the worker channels.
 234              let (tx_worker, rx_worker) = init_worker_channels();
 235              // Construct the worker instance.
 236              let worker = Worker::new(
 237                  id,
 238                  Arc::new(self.gateway.clone()),
 239                  self.storage.clone(),
 240                  self.ledger.clone(),
 241                  self.proposed_batch.clone(),
 242              )?;
 243              // Run the worker instance.
 244              worker.run(rx_worker);
 245              // Add the worker to the list of workers.
 246              workers.push(worker);
 247              // Add the worker sender to the map.
 248              worker_senders.insert(id, tx_worker);
 249          }
 250          // Set the workers.
 251          self.workers = Arc::from(workers);
 252  
 253          // First, initialize the sync channels.
 254          let (sync_sender, sync_receiver) = init_sync_channels();
 255          // Next, initialize the sync module and sync the storage from ledger.
 256          self.sync.initialize(bft_sender).await?;
 257          // Next, load and process the proposal cache before running the sync module.
 258          self.load_proposal_cache().await?;
 259          // Next, run the sync module.
 260          self.sync.run(ping, sync_receiver).await?;
 261          // Next, initialize the gateway.
 262          self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
 263          // Lastly, start the primary handlers.
 264          // Note: This ensures the primary does not start communicating before syncing is complete.
 265          self.start_handlers(primary_receiver);
 266  
 267          Ok(())
 268      }
 269  
 270      /// Returns the current round.
 271      pub fn current_round(&self) -> u64 {
 272          self.storage.current_round()
 273      }
 274  
 275      /// Returns `true` if the primary is synced.
 276      pub fn is_synced(&self) -> bool {
 277          self.sync.is_synced()
 278      }
 279  
 280      /// Returns the gateway.
 281      pub const fn gateway(&self) -> &Gateway<N> {
 282          &self.gateway
 283      }
 284  
 285      /// Returns the storage.
 286      pub const fn storage(&self) -> &Storage<N> {
 287          &self.storage
 288      }
 289  
 290      /// Returns the ledger.
 291      pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
 292          &self.ledger
 293      }
 294  
 295      /// Returns the number of workers.
 296      pub fn num_workers(&self) -> u8 {
 297          u8::try_from(self.workers.len()).expect("Too many workers")
 298      }
 299  
 300      /// Returns the workers.
 301      pub const fn workers(&self) -> &Arc<[Worker<N>]> {
 302          &self.workers
 303      }
 304  
 305      /// Returns the batch proposal of our primary, if one currently exists.
 306      pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
 307          &self.proposed_batch
 308      }
 309  }
 310  
 311  impl<N: Network> Primary<N> {
 312      /// Returns the number of unconfirmed transmissions.
 313      pub fn num_unconfirmed_transmissions(&self) -> usize {
 314          self.workers.iter().map(|worker| worker.num_transmissions()).sum()
 315      }
 316  
 317      /// Returns the number of unconfirmed ratifications.
 318      pub fn num_unconfirmed_ratifications(&self) -> usize {
 319          self.workers.iter().map(|worker| worker.num_ratifications()).sum()
 320      }
 321  
 322      /// Returns the number of unconfirmed solutions.
 323      pub fn num_unconfirmed_solutions(&self) -> usize {
 324          self.workers.iter().map(|worker| worker.num_solutions()).sum()
 325      }
 326  
 327      /// Returns the number of unconfirmed transactions.
 328      pub fn num_unconfirmed_transactions(&self) -> usize {
 329          self.workers.iter().map(|worker| worker.num_transactions()).sum()
 330      }
 331  }
 332  
 333  impl<N: Network> Primary<N> {
 334      /// Returns the worker transmission IDs.
 335      pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
 336          self.workers.iter().flat_map(|worker| worker.transmission_ids())
 337      }
 338  
 339      /// Returns the worker transmissions.
 340      pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
 341          self.workers.iter().flat_map(|worker| worker.transmissions())
 342      }
 343  
 344      /// Returns the worker solutions.
 345      pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
 346          self.workers.iter().flat_map(|worker| worker.solutions())
 347      }
 348  
 349      /// Returns the worker transactions.
 350      pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
 351          self.workers.iter().flat_map(|worker| worker.transactions())
 352      }
 353  }
 354  
 355  impl<N: Network> Primary<N> {
 356      /// Clears the worker solutions.
 357      pub fn clear_worker_solutions(&self) {
 358          self.workers.iter().for_each(Worker::clear_solutions);
 359      }
 360  }
 361  
 362  impl<N: Network> Primary<N> {
 363      /// Proposes the batch for the current round.
 364      ///
 365      /// This method performs the following steps:
 366      /// 1. Drain the workers.
 367      /// 2. Sign the batch.
 368      /// 3. Set the batch proposal in the primary.
 369      /// 4. Broadcast the batch header to all validators for signing.
 370      pub async fn propose_batch(&self) -> Result<()> {
 371          // This function isn't re-entrant.
 372          let mut lock_guard = self.propose_lock.lock().await;
 373  
 374          // Check if the proposed batch has expired, and clear it if it has expired.
 375          if let Err(err) = self
 376              .check_proposed_batch_for_expiration()
 377              .await
 378              .with_context(|| "Failed to check the proposed batch for expiration")
 379          {
 380              warn!("{}", flatten_error(&err));
 381              return Ok(());
 382          }
 383  
 384          // Retrieve the current round.
 385          let round = self.current_round();
 386          // Compute the previous round.
 387          let previous_round = round.saturating_sub(1);
 388  
 389          // If the current round is 0, return early.
 390          // This can actually never happen, because of the invariant that the current round is never 0
 391          // (see [`StorageInner::current_round`]).
 392          ensure!(round > 0, "Round 0 cannot have transaction batches");
 393  
 394          // If the current storage round is below the latest proposal round, then return early.
 395          if round < *lock_guard {
 396              warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
 397              return Ok(());
 398          }
 399  
 400          // If there is a batch being proposed already,
 401          // rebroadcast the batch header to the non-signers, and return early.
 402          if let Some(proposal) = self.proposed_batch.read().as_ref() {
 403              // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
 404              if round < proposal.round()
 405                  || proposal
 406                      .batch_header()
 407                      .previous_certificate_ids()
 408                      .iter()
 409                      .any(|id| !self.storage.contains_certificate(*id))
 410              {
 411                  warn!(
 412                      "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
 413                      proposal.round(),
 414                  );
 415                  return Ok(());
 416              }
 417              // Construct the event.
 418              // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
 419              let event = Event::BatchPropose(proposal.batch_header().clone().into());
 420              // Iterate through the non-signers.
 421              for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
 422                  // Resolve the address to the peer IP.
 423                  match self.gateway.resolver().read().get_peer_ip_for_address(address) {
 424                      // Resend the batch proposal to the validator for signing.
 425                      Some(peer_ip) => {
 426                          let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
 427                          tokio::spawn(async move {
 428                              debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
 429                              // Resend the batch proposal to the peer.
 430                              if gateway.send(peer_ip, event_).await.is_none() {
 431                                  warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
 432                              }
 433                          });
 434                      }
 435                      None => continue,
 436                  }
 437              }
 438              debug!("Proposed batch for round {} is still valid", proposal.round());
 439              return Ok(());
 440          }
 441  
 442          #[cfg(feature = "metrics")]
 443          metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
 444  
 445          // Ensure that the primary does not create a new proposal too quickly.
 446          if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
 447              debug!(
 448                  "{}",
 449                  flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
 450              );
 451              return Ok(());
 452          }
 453  
 454          // Ensure the primary has not proposed a batch for this round before.
 455          if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
 456              // If a BFT sender was provided, attempt to advance the current round.
 457              if let Some(bft_sender) = self.bft_sender.get() {
 458                  match bft_sender.send_primary_round_to_bft(self.current_round()).await {
 459                      // 'is_ready' is true if the primary is ready to propose a batch for the next round.
 460                      Ok(true) => (), // continue,
 461                      // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
 462                      Ok(false) => return Ok(()),
 463                      // An error occurred while attempting to advance the current round.
 464                      Err(err) => {
 465                          let err = err.context("Failed to update the BFT to the next round");
 466                          warn!("{}", &flatten_error(&err));
 467                          return Err(err);
 468                      }
 469                  }
 470              }
 471              debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
 472              return Ok(());
 473          }
 474  
 475          // Determine if the current round has been proposed.
 476          // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
 477          // good for network reliability and should not be prevented for the already existing proposed_batch.
 478          // If a certificate already exists for the current round, an attempt should be made to advance the
 479          // round as early as possible.
 480          if round == *lock_guard {
 481              debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
 482              return Ok(());
 483          }
 484  
 485          // Retrieve the committee to check against.
 486          let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
 487          // Check if the primary is connected to enough validators to reach quorum threshold.
 488          {
 489              // Retrieve the connected validator addresses.
 490              let mut connected_validators = self.gateway.connected_addresses();
 491              // Append the primary to the set.
 492              connected_validators.insert(self.gateway.account().address());
 493              // If quorum threshold is not reached, return early.
 494              if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
 495                  debug!(
 496                      "Primary is safely skipping a batch proposal for round {round} {}",
 497                      "(please connect to more validators)".dimmed()
 498                  );
 499                  trace!("Primary is connected to {} validators", connected_validators.len() - 1);
 500                  return Ok(());
 501              }
 502          }
 503  
 504          // Retrieve the previous certificates.
 505          let previous_certificates = self.storage.get_certificates_for_round(previous_round);
 506  
 507          // Check if the batch is ready to be proposed.
 508          // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
 509          let mut is_ready = previous_round == 0;
 510          // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
 511          if previous_round > 0 {
 512              // Retrieve the committee lookback for the round.
 513              let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
 514                  bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
 515              };
 516              // Construct a set over the authors.
 517              let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
 518              // Check if the previous certificates have reached the quorum threshold.
 519              if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
 520                  is_ready = true;
 521              }
 522          }
 523          // If the batch is not ready to be proposed, return early.
 524          if !is_ready {
 525              debug!(
 526                  "Primary is safely skipping a batch proposal for round {round} {}",
 527                  format!("(previous round {previous_round} has not reached quorum)").dimmed()
 528              );
 529              return Ok(());
 530          }
 531  
 532          // Initialize the map of transmissions.
 533          let mut transmissions: IndexMap<_, _> = Default::default();
 534          // Track the total execution costs of the batch proposal as it is being constructed.
 535          let mut proposal_cost = 0u64;
 536          // Note: worker draining and transaction inclusion needs to be thought
 537          // through carefully when there is more than one worker. The fairness
 538          // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
 539          debug_assert_eq!(MAX_WORKERS, 1);
 540  
 541          'outer: for worker in self.workers().iter() {
 542              let mut num_worker_transmissions = 0usize;
 543  
 544              while let Some((id, transmission)) = worker.remove_front() {
 545                  // Check the selected transmissions are below the batch limit.
 546                  if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
 547                      // Reinsert the transmission into the worker.
 548                      worker.insert_front(id, transmission);
 549                      break 'outer;
 550                  }
 551  
 552                  // Check the max transmissions per worker is not exceeded.
 553                  if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
 554                      // Reinsert the transmission into the worker.
 555                      worker.insert_front(id, transmission);
 556                      continue 'outer;
 557                  }
 558  
 559                  // Check if the ledger already contains the transmission.
 560                  if self.ledger.contains_transmission(&id).unwrap_or(true) {
 561                      trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
 562                      continue;
 563                  }
 564  
 565                  // Check if the storage already contain the transmission.
 566                  // Note: We do not skip if this is the first transmission in the proposal, to ensure that
 567                  // the primary does not propose a batch with no transmissions.
 568                  if !transmissions.is_empty() && self.storage.contains_transmission(id) {
 569                      trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
 570                      continue;
 571                  }
 572  
 573                  // Check the transmission is still valid.
 574                  match (id, transmission.clone()) {
 575                      (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
 576                          // Ensure the checksum matches. If not, skip the solution.
 577                          if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
 578                          {
 579                              trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
 580                              continue;
 581                          }
 582                          // Check if the solution is still valid.
 583                          if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
 584                              trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
 585                              continue;
 586                          }
 587                      }
 588                      (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
 589                          // Ensure the checksum matches. If not, skip the transaction.
 590                          if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
 591                          {
 592                              trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
 593                              continue;
 594                          }
 595  
 596                          // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
 597                          let transaction = spawn_blocking!({
 598                              match transaction {
 599                                  Data::Object(transaction) => Ok(transaction),
 600                                  Data::Buffer(bytes) => {
 601                                      Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
 602                                  }
 603                              }
 604                          })?;
 605  
 606                          // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
 607                          // ConsensusVersion V8 Migration logic -
 608                          // Do not include deployments in a batch proposal.
 609                          let current_block_height = self.ledger.latest_block_height();
 610                          let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
 611                          let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
 612                          let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
 613                          if current_block_height > consensus_version_v7_height
 614                              && current_block_height <= consensus_version_v8_height
 615                              && transaction.is_deploy()
 616                          {
 617                              trace!(
 618                                  "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
 619                                  fmt_id(transaction_id)
 620                              );
 621                              continue;
 622                          }
 623  
 624                          // Compute the transaction spent cost (in microcredits).
 625                          // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
 626                          let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
 627                          else {
 628                              debug!(
 629                                  "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
 630                                  fmt_id(transaction_id)
 631                              );
 632                              continue;
 633                          };
 634  
 635                          // Check if the transaction is still valid.
 636                          if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
 637                              trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
 638                              continue;
 639                          }
 640  
 641                          // Compute the next proposal cost.
 642                          // Note: We purposefully discard this transaction if the proposal cost overflows.
 643                          let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
 644                              debug!(
 645                                  "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
 646                                  fmt_id(transaction_id)
 647                              );
 648                              continue;
 649                          };
 650  
 651                          // Check if the next proposal cost exceeds the batch proposal spend limit.
 652                          let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
 653                          if next_proposal_cost > batch_spend_limit {
 654                              debug!(
 655                                  "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
 656                                  fmt_id(transaction_id),
 657                                  batch_spend_limit
 658                              );
 659  
 660                              // Reinsert the transmission into the worker.
 661                              worker.insert_front(id, transmission);
 662                              break 'outer;
 663                          }
 664  
 665                          // Update the proposal cost.
 666                          proposal_cost = next_proposal_cost;
 667                      }
 668  
 669                      // Note: We explicitly forbid including ratifications,
 670                      // as the protocol currently does not support ratifications.
 671                      (TransmissionID::Ratification, Transmission::Ratification) => continue,
 672                      // All other combinations are clearly invalid.
 673                      _ => continue,
 674                  }
 675  
 676                  // If the transmission is valid, insert it into the proposal's transmission list.
 677                  transmissions.insert(id, transmission);
 678                  num_worker_transmissions = num_worker_transmissions.saturating_add(1);
 679              }
 680          }
 681  
 682          // Determine the current timestamp.
 683          let current_timestamp = now();
 684  
 685          *lock_guard = round;
 686  
 687          /* Proceeding to sign & propose the batch. */
 688          info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
 689  
 690          // Retrieve the private key.
 691          let private_key = *self.gateway.account().private_key();
 692          // Retrieve the committee ID.
 693          let committee_id = committee_lookback.id();
 694          // Prepare the transmission IDs.
 695          let transmission_ids = transmissions.keys().copied().collect();
 696          // Prepare the previous batch certificate IDs.
 697          let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
 698          // Sign the batch header and construct the proposal.
 699          let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
 700              &private_key,
 701              round,
 702              current_timestamp,
 703              committee_id,
 704              transmission_ids,
 705              previous_certificate_ids,
 706              &mut rand::thread_rng()
 707          ))
 708          .and_then(|batch_header| {
 709              Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
 710                  .map(|proposal| (batch_header, proposal))
 711          })
 712          .inspect_err(|_| {
 713              // On error, reinsert the transmissions and then propagate the error.
 714              if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
 715                  error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
 716              }
 717          })?;
 718          // Broadcast the batch to all validators for signing.
 719          self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
 720          // Set the timestamp of the latest proposed batch.
 721          *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
 722          // Set the proposed batch.
 723          *self.proposed_batch.write() = Some(proposal);
 724          Ok(())
 725      }
 726  
 727      /// Processes a batch propose from a peer.
 728      ///
 729      /// This method performs the following steps:
 730      /// 1. Verify the batch.
 731      /// 2. Sign the batch.
 732      /// 3. Broadcast the signature back to the validator.
 733      ///
 734      /// If our primary is ahead of the peer, we will not sign the batch.
 735      /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
 736      async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
 737          let BatchPropose { round: batch_round, batch_header } = batch_propose;
 738  
 739          // Deserialize the batch header.
 740          let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
 741          // Ensure the round matches in the batch header.
 742          if batch_round != batch_header.round() {
 743              // Proceed to disconnect the validator.
 744              self.gateway.disconnect(peer_ip);
 745              bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
 746          }
 747  
 748          // Retrieve the batch author.
 749          let batch_author = batch_header.author();
 750  
 751          // Ensure the batch proposal is from the validator.
 752          match self.gateway.resolve_to_aleo_addr(peer_ip) {
 753              // If the peer is a validator, then ensure the batch proposal is from the validator.
 754              Some(address) => {
 755                  if address != batch_author {
 756                      // Proceed to disconnect the validator.
 757                      self.gateway.disconnect(peer_ip);
 758                      bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
 759                  }
 760              }
 761              None => bail!("Batch proposal from a disconnected validator"),
 762          }
 763          // Ensure the batch author is a current committee member.
 764          if !self.gateway.is_authorized_validator_address(batch_author) {
 765              // Proceed to disconnect the validator.
 766              self.gateway.disconnect(peer_ip);
 767              bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
 768          }
 769          // Ensure the batch proposal is not from the current primary.
 770          if self.gateway.account().address() == batch_author {
 771              bail!("Invalid peer - proposed batch from myself ({batch_author})");
 772          }
 773  
 774          // Ensure that the batch proposal's committee ID matches the expected committee ID.
 775          let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
 776          if expected_committee_id != batch_header.committee_id() {
 777              // Proceed to disconnect the validator.
 778              self.gateway.disconnect(peer_ip);
 779              bail!(
 780                  "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
 781                  batch_header.committee_id()
 782              );
 783          }
 784  
 785          // Retrieve the cached round and batch ID for this validator.
 786          if let Some((signed_round, signed_batch_id, signature)) =
 787              self.signed_proposals.read().get(&batch_author).copied()
 788          {
 789              // If the signed round is ahead of the peer's batch round, do not sign the proposal.
 790              // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
 791              if signed_round > batch_header.round() {
 792                  bail!(
 793                      "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
 794                      batch_header.round()
 795                  );
 796              }
 797  
 798              // If the round matches and the batch ID differs, then the validator is malicious.
 799              if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
 800                  bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
 801              }
 802              // If the round and batch ID matches, then skip signing the batch a second time.
 803              // Instead, rebroadcast the cached signature to the peer.
 804              if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
 805                  let gateway = self.gateway.clone();
 806                  tokio::spawn(async move {
 807                      debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
 808                      let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
 809                      // Resend the batch signature to the peer.
 810                      if gateway.send(peer_ip, event).await.is_none() {
 811                          warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
 812                      }
 813                  });
 814                  // Return early.
 815                  return Ok(());
 816              }
 817          }
 818  
 819          // Ensure that the batch header doesn't already exist in storage.
 820          // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
 821          if self.storage.contains_batch(batch_header.batch_id()) {
 822              debug!(
 823                  "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
 824                  format!("batch for round {batch_round} already exists in storage").dimmed()
 825              );
 826              return Ok(());
 827          }
 828  
 829          // Compute the previous round.
 830          let previous_round = batch_round.saturating_sub(1);
 831          // Ensure that the peer did not propose a batch too quickly.
 832          if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
 833              // Proceed to disconnect the validator.
 834              self.gateway.disconnect(peer_ip);
 835              return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
 836          }
 837  
 838          // Ensure the batch header does not contain any ratifications.
 839          if batch_header.contains(TransmissionID::Ratification) {
 840              // Proceed to disconnect the validator.
 841              self.gateway.disconnect(peer_ip);
 842              bail!(
 843                  "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
 844              );
 845          }
 846  
 847          // If the peer is ahead, use the batch header to sync up to the peer.
 848          let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
 849  
 850          // Check that the transmission ids match and are not fee transactions.
 851          if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
 852              // If the transmission is not well-formed, then return early.
 853              self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
 854          }) {
 855              let err = err.context(format!(
 856                  "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
 857              ));
 858              debug!("{}", flatten_error(err));
 859              return Ok(());
 860          }
 861  
 862          // Ensure the batch is for the current round.
 863          // This method must be called after fetching previous certificates (above),
 864          // and prior to checking the batch header (below).
 865          if let Err(e) = self.ensure_is_signing_round(batch_round) {
 866              // If the primary is not signing for the peer's round, then return early.
 867              debug!("{e} from '{peer_ip}'");
 868              return Ok(());
 869          }
 870  
 871          // Ensure the batch header from the peer is valid.
 872          let (storage, header) = (self.storage.clone(), batch_header.clone());
 873  
 874          // Check the batch header, and return early if it already exists in storage.
 875          let Some(missing_transmissions) =
 876              spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
 877          else {
 878              return Ok(());
 879          };
 880  
 881          // Inserts the missing transmissions into the workers.
 882          self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
 883  
 884          // Ensure the transaction doesn't bring the proposal above the spend limit.
 885          let block_height = self.ledger.latest_block_height() + 1;
 886          if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
 887              let mut proposal_cost = 0u64;
 888              for transmission_id in batch_header.transmission_ids() {
 889                  let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
 890                  let Some(worker) = self.workers.get(worker_id as usize) else {
 891                      debug!("Unable to find worker {worker_id}");
 892                      return Ok(());
 893                  };
 894  
 895                  let Some(transmission) = worker.get_transmission(*transmission_id) else {
 896                      debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
 897                      return Ok(());
 898                  };
 899  
 900                  // If the transmission is a transaction, compute its execution cost.
 901                  if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
 902                      (transmission_id, transmission)
 903                  {
 904                      // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
 905                      let transaction = spawn_blocking!({
 906                          match transaction {
 907                              Data::Object(transaction) => Ok(transaction),
 908                              Data::Buffer(bytes) => {
 909                                  Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
 910                              }
 911                          }
 912                      })?;
 913  
 914                      // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
 915                      // ConsensusVersion V8 Migration logic -
 916                      // Do not include deployments in a batch proposal.
 917                      let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
 918                      let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
 919                      let consensus_version = N::CONSENSUS_VERSION(block_height)?;
 920                      if block_height > consensus_version_v7_height
 921                          && block_height <= consensus_version_v8_height
 922                          && transaction.is_deploy()
 923                      {
 924                          bail!(
 925                              "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
 926                          )
 927                      }
 928  
 929                      // Compute the transaction spent cost (in microcredits).
 930                      // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
 931                      let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
 932                      else {
 933                          bail!(
 934                              "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
 935                              fmt_id(transaction_id)
 936                          )
 937                      };
 938  
 939                      // Compute the next proposal cost.
 940                      // Note: We purposefully discard this transaction if the proposal cost overflows.
 941                      let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
 942                          bail!(
 943                              "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
 944                              fmt_id(transaction_id)
 945                          )
 946                      };
 947  
 948                      // Check if the next proposal cost exceeds the batch proposal spend limit.
 949                      let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
 950                      if next_proposal_cost > batch_spend_limit {
 951                          bail!(
 952                              "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
 953                              fmt_id(transaction_id),
 954                              batch_spend_limit
 955                          );
 956                      }
 957  
 958                      // Update the proposal cost.
 959                      proposal_cost = next_proposal_cost;
 960                  }
 961              }
 962          }
 963  
 964          /* Proceeding to sign the batch. */
 965  
 966          // Retrieve the batch ID.
 967          let batch_id = batch_header.batch_id();
 968          // Sign the batch ID.
 969          let account = self.gateway.account().clone();
 970          let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
 971  
 972          // Ensure the proposal has not already been signed.
 973          //
 974          // Note: Due to the need to sync the batch header with the peer, it is possible
 975          // for the primary to receive the same 'BatchPropose' event again, whereby only
 976          // one instance of this handler should sign the batch. This check guarantees this.
 977          match self.signed_proposals.write().0.entry(batch_author) {
 978              std::collections::hash_map::Entry::Occupied(mut entry) => {
 979                  // If the validator has already signed a batch for this round, then return early,
 980                  // since, if the peer still has not received the signature, they will request it again,
 981                  // and the logic at the start of this function will resend the (now cached) signature
 982                  // to the peer if asked to sign this batch proposal again.
 983                  if entry.get().0 == batch_round {
 984                      return Ok(());
 985                  }
 986                  // Otherwise, cache the round, batch ID, and signature for this validator.
 987                  entry.insert((batch_round, batch_id, signature));
 988              }
 989              // If the validator has not signed a batch before, then continue.
 990              std::collections::hash_map::Entry::Vacant(entry) => {
 991                  // Cache the round, batch ID, and signature for this validator.
 992                  entry.insert((batch_round, batch_id, signature));
 993              }
 994          };
 995  
 996          // Broadcast the signature back to the validator.
 997          let self_ = self.clone();
 998          tokio::spawn(async move {
 999              let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1000              // Send the batch signature to the peer.
1001              if self_.gateway.send(peer_ip, event).await.is_some() {
1002                  debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1003              }
1004          });
1005  
1006          Ok(())
1007      }
1008  
1009      /// Processes a batch signature from a peer.
1010      ///
1011      /// This method performs the following steps:
1012      /// 1. Ensure the proposed batch has not expired.
1013      /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
1014      /// 3. Store the signature.
1015      /// 4. Certify the batch if enough signatures have been received.
1016      /// 5. Broadcast the batch certificate to all validators.
1017      async fn process_batch_signature_from_peer(
1018          &self,
1019          peer_ip: SocketAddr,
1020          batch_signature: BatchSignature<N>,
1021      ) -> Result<()> {
1022          // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1023          self.check_proposed_batch_for_expiration().await?;
1024  
1025          // Retrieve the signature and timestamp.
1026          let BatchSignature { batch_id, signature } = batch_signature;
1027  
1028          // Retrieve the signer.
1029          let signer = signature.to_address();
1030  
1031          // Ensure the batch signature is signed by the validator.
1032          if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1033              // Proceed to disconnect the validator.
1034              self.gateway.disconnect(peer_ip);
1035              bail!("Malicious peer - batch signature is from a different validator ({signer})");
1036          }
1037          // Ensure the batch signature is not from the current primary.
1038          if self.gateway.account().address() == signer {
1039              bail!("Invalid peer - received a batch signature from myself ({signer})");
1040          }
1041  
1042          let self_ = self.clone();
1043          let Some(proposal) = spawn_blocking!({
1044              // Acquire the write lock.
1045              let mut proposed_batch = self_.proposed_batch.write();
1046              // Add the signature to the batch, and determine if the batch is ready to be certified.
1047              match proposed_batch.as_mut() {
1048                  Some(proposal) => {
1049                      // Ensure the batch ID matches the currently proposed batch ID.
1050                      if proposal.batch_id() != batch_id {
1051                          match self_.storage.contains_batch(batch_id) {
1052                              // If this batch was already certified, return early.
1053                              true => {
1054                                  debug!(
1055                                      "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1056                                      proposal.round()
1057                                  );
1058                                  return Ok(None);
1059                              }
1060                              // If the batch ID is unknown, return an error.
1061                              false => bail!(
1062                                  "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1063                                  proposal.batch_id(),
1064                                  proposal.round()
1065                              ),
1066                          }
1067                      }
1068                      // Retrieve the committee lookback for the round.
1069                      let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1070                      // Retrieve the address of the validator.
1071                      let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1072                          bail!("Signature is from a disconnected validator");
1073                      };
1074                      // Add the signature to the batch.
1075                      let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1076  
1077                      if new_signature {
1078                          info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1079                          // Check if the batch is ready to be certified.
1080                          if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1081                              // If the batch is not ready to be certified, return early.
1082                              return Ok(None);
1083                          }
1084                      } else {
1085                          debug!(
1086                              "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1087                              round = proposal.round()
1088                          );
1089                          return Ok(None);
1090                      }
1091                  }
1092                  // There is no proposed batch, so return early.
1093                  None => return Ok(None),
1094              };
1095              // Retrieve the batch proposal, clearing the proposed batch.
1096              match proposed_batch.take() {
1097                  Some(proposal) => Ok(Some(proposal)),
1098                  None => Ok(None),
1099              }
1100          })?
1101          else {
1102              return Ok(());
1103          };
1104  
1105          /* Proceeding to certify the batch. */
1106  
1107          info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1108  
1109          // Retrieve the committee lookback for the round.
1110          let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1111          // Store the certified batch and broadcast it to all validators.
1112          // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1113          if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1114              // Reinsert the transmissions back into the ready queue for the next proposal.
1115              self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1116              return Err(e);
1117          }
1118  
1119          #[cfg(feature = "metrics")]
1120          metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1121          Ok(())
1122      }
1123  
1124      /// Processes a batch certificate from a peer.
1125      ///
1126      /// This method performs the following steps:
1127      /// 1. Stores the given batch certificate, after ensuring it is valid.
1128      /// 2. If there are enough certificates to reach quorum threshold for the current round,
1129      ///    then proceed to advance to the next round.
1130      async fn process_batch_certificate_from_peer(
1131          &self,
1132          peer_ip: SocketAddr,
1133          certificate: BatchCertificate<N>,
1134      ) -> Result<()> {
1135          // Ensure the batch certificate is from an authorized validator.
1136          if !self.gateway.is_authorized_validator_ip(peer_ip) {
1137              // Proceed to disconnect the validator.
1138              self.gateway.disconnect(peer_ip);
1139              bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1140          }
1141          // Ensure storage does not already contain the certificate.
1142          if self.storage.contains_certificate(certificate.id()) {
1143              return Ok(());
1144          // Otherwise, ensure ephemeral storage contains the certificate.
1145          } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1146              self.storage.insert_unprocessed_certificate(certificate.clone())?;
1147          }
1148  
1149          // Retrieve the batch certificate author.
1150          let author = certificate.author();
1151          // Retrieve the batch certificate round.
1152          let certificate_round = certificate.round();
1153          // Retrieve the batch certificate committee ID.
1154          let committee_id = certificate.committee_id();
1155  
1156          // Ensure the batch certificate is not from the current primary.
1157          if self.gateway.account().address() == author {
1158              bail!("Received a batch certificate for myself ({author})");
1159          }
1160  
1161          // Ensure that the incoming certificate is valid.
1162          self.storage.check_incoming_certificate(&certificate)?;
1163  
1164          // Store the certificate, after ensuring it is valid above.
1165          // The following call recursively fetches and stores
1166          // the previous certificates referenced from this certificate.
1167          // It is critical to make the following call this after validating the certificate above.
1168          // The reason is that a sequence of malformed certificates,
1169          // with references to previous certificates with non-decreasing rounds,
1170          // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1171          // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1172          // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1173          // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1174          // TODO: eliminate those redundant checks
1175          self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1176  
1177          // If there are enough certificates to reach quorum threshold for the certificate round,
1178          // then proceed to advance to the next round.
1179  
1180          // Retrieve the committee lookback.
1181          let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1182  
1183          // Retrieve the certificate authors.
1184          let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1185          // Check if the certificates have reached the quorum threshold.
1186          let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1187  
1188          // Ensure that the batch certificate's committee ID matches the expected committee ID.
1189          let expected_committee_id = committee_lookback.id();
1190          if expected_committee_id != committee_id {
1191              // Proceed to disconnect the validator.
1192              self.gateway.disconnect(peer_ip);
1193              bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1194          }
1195  
1196          // Determine if we are currently proposing a round that is relevant.
1197          // Note: This is important, because while our peers have advanced,
1198          // they may not be proposing yet, and thus still able to sign our proposed batch.
1199          let should_advance = match &*self.proposed_batch.read() {
1200              // We advance if the proposal round is less than the current round that was just certified.
1201              Some(proposal) => proposal.round() < certificate_round,
1202              // If there's no proposal, we consider advancing.
1203              None => true,
1204          };
1205  
1206          // Retrieve the current round.
1207          let current_round = self.current_round();
1208  
1209          // Determine whether to advance to the next round.
1210          if is_quorum && should_advance && certificate_round >= current_round {
1211              // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1212              self.try_increment_to_the_next_round(current_round + 1).await?;
1213          }
1214          Ok(())
1215      }
1216  }
1217  
1218  impl<N: Network> Primary<N> {
1219      /// Starts the primary handlers.
1220      ///
1221      /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1222      /// that awaits new data and handles it accordingly.
1223      /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically
1224      /// tries to move the the next round of batches.
1225      ///
1226      /// This function is called exactly once, in `Self::run()`.
1227      fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1228          let PrimaryReceiver {
1229              mut rx_batch_propose,
1230              mut rx_batch_signature,
1231              mut rx_batch_certified,
1232              mut rx_primary_ping,
1233              mut rx_unconfirmed_solution,
1234              mut rx_unconfirmed_transaction,
1235          } = primary_receiver;
1236  
1237          // Start the primary ping sender.
1238          let self_ = self.clone();
1239          self.spawn(async move {
1240              loop {
1241                  // Sleep briefly.
1242                  tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1243  
1244                  // Retrieve the block locators.
1245                  let self__ = self_.clone();
1246                  let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1247                      Ok(block_locators) => block_locators,
1248                      Err(e) => {
1249                          warn!("Failed to retrieve block locators - {e}");
1250                          continue;
1251                      }
1252                  };
1253  
1254                  // Retrieve the latest certificate of the primary.
1255                  let primary_certificate = {
1256                      // Retrieve the primary address.
1257                      let primary_address = self_.gateway.account().address();
1258  
1259                      // Iterate backwards from the latest round to find the primary certificate.
1260                      let mut certificate = None;
1261                      let mut current_round = self_.current_round();
1262                      while certificate.is_none() {
1263                          // If the current round is 0, then break the while loop.
1264                          if current_round == 0 {
1265                              break;
1266                          }
1267                          // Retrieve the primary certificates.
1268                          if let Some(primary_certificate) =
1269                              self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1270                          {
1271                              certificate = Some(primary_certificate);
1272                          // If the primary certificate was not found, decrement the round.
1273                          } else {
1274                              current_round = current_round.saturating_sub(1);
1275                          }
1276                      }
1277  
1278                      // Determine if the primary certificate was found.
1279                      match certificate {
1280                          Some(certificate) => certificate,
1281                          // Skip this iteration of the loop (do not send a primary ping).
1282                          None => continue,
1283                      }
1284                  };
1285  
1286                  // Construct the primary ping.
1287                  let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1288                  // Broadcast the event.
1289                  self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1290              }
1291          });
1292  
1293          // Start the primary ping handler.
1294          let self_ = self.clone();
1295          self.spawn(async move {
1296              while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1297                  // If the primary is not synced, then do not process the primary ping.
1298                  if self_.sync.is_synced() {
1299                      trace!("Processing new primary ping from '{peer_ip}'");
1300                  } else {
1301                      trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1302                      continue;
1303                  }
1304  
1305                  // Spawn a task to process the primary certificate.
1306                  {
1307                      let self_ = self_.clone();
1308                      tokio::spawn(async move {
1309                          // Deserialize the primary certificate in the primary ping.
1310                          let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1311                          else {
1312                              warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1313                              return;
1314                          };
1315                          // Process the primary certificate.
1316                          let id = fmt_id(primary_certificate.id());
1317                          let round = primary_certificate.round();
1318                          if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1319                              warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1320                          }
1321                      });
1322                  }
1323              }
1324          });
1325  
1326          // Start the worker ping(s).
1327          let self_ = self.clone();
1328          self.spawn(async move {
1329              loop {
1330                  tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1331                  // If the primary is not synced, then do not broadcast the worker ping(s).
1332                  if !self_.sync.is_synced() {
1333                      trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1334                      continue;
1335                  }
1336                  // Broadcast the worker ping(s).
1337                  for worker in self_.workers.iter() {
1338                      worker.broadcast_ping();
1339                  }
1340              }
1341          });
1342  
1343          // Start the batch proposer.
1344          let self_ = self.clone();
1345          self.spawn(async move {
1346              loop {
1347                  // Sleep briefly, but longer than if there were no batch.
1348                  tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1349                  let current_round = self_.current_round();
1350                  // If the primary is not synced, then do not propose a batch.
1351                  if !self_.sync.is_synced() {
1352                      debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1353                      continue;
1354                  }
1355                  // A best-effort attempt to skip the scheduled batch proposal if
1356                  // round progression already triggered one.
1357                  if self_.propose_lock.try_lock().is_err() {
1358                      trace!(
1359                          "Skipping batch proposal for round {current_round} {}",
1360                          "(node is already proposing)".dimmed()
1361                      );
1362                      continue;
1363                  };
1364                  // If there is no proposed batch, attempt to propose a batch.
1365                  // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1366                  // and only one batch needs to be proposed at a time.
1367                  if let Err(e) = self_.propose_batch().await {
1368                      warn!("Cannot propose a batch - {e}");
1369                  }
1370              }
1371          });
1372  
1373          // Start the proposed batch handler.
1374          let self_ = self.clone();
1375          self.spawn(async move {
1376              while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1377                  // If the primary is not synced, then do not sign the batch.
1378                  if !self_.sync.is_synced() {
1379                      trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1380                      continue;
1381                  }
1382                  // Spawn a task to process the proposed batch.
1383                  let self_ = self_.clone();
1384                  tokio::spawn(async move {
1385                      // Process the batch proposal.
1386                      let round = batch_propose.round;
1387                      if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1388                          warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1389                      }
1390                  });
1391              }
1392          });
1393  
1394          // Start the batch signature handler.
1395          let self_ = self.clone();
1396          self.spawn(async move {
1397              while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1398                  // If the primary is not synced, then do not store the signature.
1399                  if !self_.sync.is_synced() {
1400                      trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1401                      continue;
1402                  }
1403                  // Process the batch signature.
1404                  // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1405                  // is a critical path, and we should only store the minimum required number of signatures.
1406                  // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1407                  // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1408                  let id = fmt_id(batch_signature.batch_id);
1409                  if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1410                      let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1411                      warn!("{}", flatten_error(err));
1412                  }
1413              }
1414          });
1415  
1416          // Start the certified batch handler.
1417          let self_ = self.clone();
1418          self.spawn(async move {
1419              while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1420                  // If the primary is not synced, then do not store the certificate.
1421                  if !self_.sync.is_synced() {
1422                      trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1423                      continue;
1424                  }
1425                  // Spawn a task to process the batch certificate.
1426                  let self_ = self_.clone();
1427                  tokio::spawn(async move {
1428                      // Deserialize the batch certificate.
1429                      let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1430                          warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1431                          return;
1432                      };
1433                      // Process the batch certificate.
1434                      let id = fmt_id(batch_certificate.id());
1435                      let round = batch_certificate.round();
1436                      if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1437                          warn!(
1438                              "{}",
1439                              flatten_error(err.context(format!(
1440                                  "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1441                              )))
1442                          );
1443                      }
1444                  });
1445              }
1446          });
1447  
1448          // This task periodically tries to move to the next round.
1449          //
1450          // Note: This is necessary to ensure that the primary is not stuck on a previous round
1451          // despite having received enough certificates to advance to the next round.
1452          let self_ = self.clone();
1453          self.spawn(async move {
1454              loop {
1455                  // Sleep briefly.
1456                  tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1457                  // If the primary is not synced, then do not increment to the next round.
1458                  if !self_.sync.is_synced() {
1459                      trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1460                      continue;
1461                  }
1462                  // Attempt to increment to the next round.
1463                  let current_round = self_.current_round();
1464                  let next_round = current_round.saturating_add(1);
1465                  // Determine if the quorum threshold is reached for the current round.
1466                  let is_quorum_threshold_reached = {
1467                      // Retrieve the certificate authors for the current round.
1468                      let authors = self_.storage.get_certificate_authors_for_round(current_round);
1469                      // If there are no certificates, then skip this check.
1470                      if authors.is_empty() {
1471                          continue;
1472                      }
1473                      // Retrieve the committee lookback for the current round.
1474                      let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1475                          warn!("Failed to retrieve the committee lookback for round {current_round}");
1476                          continue;
1477                      };
1478                      // Check if the quorum threshold is reached for the current round.
1479                      committee_lookback.is_quorum_threshold_reached(&authors)
1480                  };
1481                  // Attempt to increment to the next round if the quorum threshold is reached.
1482                  if is_quorum_threshold_reached {
1483                      debug!("Quorum threshold reached for round {current_round}");
1484                      if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1485                          warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1486                      }
1487                  }
1488              }
1489          });
1490  
1491          // Start a handler to process new unconfirmed solutions.
1492          let self_ = self.clone();
1493          self.spawn(async move {
1494              while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1495                  // Compute the checksum for the solution.
1496                  let Ok(checksum) = solution.to_checksum::<N>() else {
1497                      error!("Failed to compute the checksum for the unconfirmed solution");
1498                      continue;
1499                  };
1500                  // Compute the worker ID.
1501                  let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1502                      error!("Unable to determine the worker ID for the unconfirmed solution");
1503                      continue;
1504                  };
1505                  let self_ = self_.clone();
1506                  tokio::spawn(async move {
1507                      // Retrieve the worker.
1508                      let worker = &self_.workers[worker_id as usize];
1509                      // Process the unconfirmed solution.
1510                      let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1511                      // Send the result to the callback.
1512                      callback.send(result).ok();
1513                  });
1514              }
1515          });
1516  
1517          // Start a handler to process new unconfirmed transactions.
1518          let self_ = self.clone();
1519          self.spawn(async move {
1520              while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1521                  trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1522                  // Compute the checksum for the transaction.
1523                  let Ok(checksum) = transaction.to_checksum::<N>() else {
1524                      error!("Failed to compute the checksum for the unconfirmed transaction");
1525                      continue;
1526                  };
1527                  // Compute the worker ID.
1528                  let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1529                      error!("Unable to determine the worker ID for the unconfirmed transaction");
1530                      continue;
1531                  };
1532                  let self_ = self_.clone();
1533                  tokio::spawn(async move {
1534                      // Retrieve the worker.
1535                      let worker = &self_.workers[worker_id as usize];
1536                      // Process the unconfirmed transaction.
1537                      let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1538                      // Send the result to the callback.
1539                      callback.send(result).ok();
1540                  });
1541              }
1542          });
1543      }
1544  
1545      /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1546      async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1547          // Check if the proposed batch is timed out or stale.
1548          let is_expired = match self.proposed_batch.read().as_ref() {
1549              Some(proposal) => proposal.round() < self.current_round(),
1550              None => false,
1551          };
1552          // If the batch is expired, clear the proposed batch.
1553          if is_expired {
1554              // Reset the proposed batch.
1555              let proposal = self.proposed_batch.write().take();
1556              if let Some(proposal) = proposal {
1557                  debug!("Cleared expired proposal for round {}", proposal.round());
1558                  self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1559              }
1560          }
1561          Ok(())
1562      }
1563  
1564      /// Increments to the next round.
1565      async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1566          // If the next round is within GC range, then iterate to the penultimate round.
1567          if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1568              let mut fast_forward_round = self.current_round();
1569              // Iterate until the penultimate round is reached.
1570              while fast_forward_round < next_round.saturating_sub(1) {
1571                  // Update to the next round in storage.
1572                  fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1573                  // Clear the proposed batch.
1574                  *self.proposed_batch.write() = None;
1575              }
1576          }
1577  
1578          // Retrieve the current round.
1579          let current_round = self.current_round();
1580          // Attempt to advance to the next round.
1581          if current_round < next_round {
1582              // If a BFT sender was provided, send the current round to the BFT.
1583              let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1584                  match bft_sender.send_primary_round_to_bft(current_round).await {
1585                      Ok(is_ready) => is_ready,
1586                      Err(err) => {
1587                          let err = err.context("Failed to update the BFT to the next round");
1588                          warn!("{}", flatten_error(&err));
1589                          return Err(err);
1590                      }
1591                  }
1592              }
1593              // Otherwise, handle the Narwhal case.
1594              else {
1595                  // Update to the next round in storage.
1596                  self.storage.increment_to_next_round(current_round)?;
1597                  // Set 'is_ready' to 'true'.
1598                  true
1599              };
1600  
1601              // Log whether the next round is ready.
1602              match is_ready {
1603                  true => debug!("Primary is ready to propose the next round"),
1604                  false => debug!("Primary is not ready to propose the next round"),
1605              }
1606  
1607              // If the node is ready, propose a batch for the next round.
1608              if is_ready {
1609                  self.propose_batch().await?;
1610              }
1611          }
1612          Ok(())
1613      }
1614  
1615      /// Ensures the primary is signing for the specified batch round.
1616      /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1617      /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1618      fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1619          // Retrieve the current round.
1620          let current_round = self.current_round();
1621          // Ensure the batch round is within GC range of the current round.
1622          if current_round + self.storage.max_gc_rounds() <= batch_round {
1623              bail!("Round {batch_round} is too far in the future")
1624          }
1625          // Ensure the batch round is at or one before the current round.
1626          // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1627          // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1628          if current_round > batch_round + 1 {
1629              bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1630          }
1631          // Check if the primary is still signing for the batch round.
1632          if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1633              if signing_round > batch_round {
1634                  bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1635              }
1636          }
1637          Ok(())
1638      }
1639  
1640      /// Ensure the primary is not creating batch proposals too frequently.
1641      /// This checks that the certificate timestamp for the previous round is within the expected range.
1642      fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1643          // Retrieve the timestamp of the previous timestamp to check against.
1644          let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1645              // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1646              Some(certificate) => certificate.timestamp(),
1647              None => match self.gateway.account().address() == author {
1648                  // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1649                  true => *self.latest_proposed_batch_timestamp.read(),
1650                  // If we do not see a previous certificate for the author, then proceed optimistically.
1651                  false => return Ok(()),
1652              },
1653          };
1654  
1655          // Determine the elapsed time since the previous timestamp.
1656          let elapsed = timestamp
1657              .checked_sub(previous_timestamp)
1658              .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1659          // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1660          match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1661              true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1662              false => Ok(()),
1663          }
1664      }
1665  
1666      /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1667      async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1668          // Create the batch certificate and transmissions.
1669          let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1670          // Convert the transmissions into a HashMap.
1671          // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1672          let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1673          // Store the certified batch.
1674          let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1675          spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1676          debug!("Stored a batch certificate for round {}", certificate.round());
1677          // If a BFT sender was provided, send the certificate to the BFT.
1678          if let Some(bft_sender) = self.bft_sender.get() {
1679              // Await the callback to continue.
1680              if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1681                  let err = err.context("Failed to update the BFT DAG from primary");
1682                  warn!("{}", flatten_error(&err));
1683                  return Err(err);
1684              };
1685          }
1686          // Broadcast the certified batch to all validators.
1687          self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1688          // Log the certified batch.
1689          let num_transmissions = certificate.transmission_ids().len();
1690          let round = certificate.round();
1691          info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1692          // Increment to the next round.
1693          self.try_increment_to_the_next_round(round + 1).await
1694      }
1695  
1696      /// Inserts the missing transmissions from the proposal into the workers.
1697      fn insert_missing_transmissions_into_workers(
1698          &self,
1699          peer_ip: SocketAddr,
1700          transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1701      ) -> Result<()> {
1702          // Insert the transmissions into the workers.
1703          assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1704              worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1705          })
1706      }
1707  
1708      /// Re-inserts the transmissions from the proposal into the workers.
1709      fn reinsert_transmissions_into_workers(
1710          &self,
1711          transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1712      ) -> Result<()> {
1713          // Re-insert the transmissions into the workers.
1714          assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1715              worker.reinsert(transmission_id, transmission);
1716          })
1717      }
1718  
1719      /// Recursively stores a given batch certificate, after ensuring:
1720      ///   - Ensure the round matches the committee round.
1721      ///   - Ensure the address is a member of the committee.
1722      ///   - Ensure the timestamp is within range.
1723      ///   - Ensure we have all of the transmissions.
1724      ///   - Ensure we have all of the previous certificates.
1725      ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1726      ///   - Ensure the previous certificates have reached the quorum threshold.
1727      ///   - Ensure we have not already signed the batch ID.
1728      #[async_recursion::async_recursion]
1729      async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1730          &self,
1731          peer_ip: SocketAddr,
1732          certificate: BatchCertificate<N>,
1733      ) -> Result<()> {
1734          // Retrieve the batch header.
1735          let batch_header = certificate.batch_header();
1736          // Retrieve the batch round.
1737          let batch_round = batch_header.round();
1738  
1739          // If the certificate round is outdated, do not store it.
1740          if batch_round <= self.storage.gc_round() {
1741              return Ok(());
1742          }
1743          // If the certificate already exists in storage, return early.
1744          if self.storage.contains_certificate(certificate.id()) {
1745              return Ok(());
1746          }
1747  
1748          // If node is not in sync mode and the node is not synced. Then return an error.
1749          if !IS_SYNCING && !self.is_synced() {
1750              bail!(
1751                  "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1752                  fmt_id(certificate.id())
1753              );
1754          }
1755  
1756          // If the peer is ahead, use the batch header to sync up to the peer.
1757          let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1758  
1759          // Check if the certificate needs to be stored.
1760          if !self.storage.contains_certificate(certificate.id()) {
1761              // Store the batch certificate.
1762              let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1763              spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1764              debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1765              // If a BFT sender was provided, send the round and certificate to the BFT.
1766              if let Some(bft_sender) = self.bft_sender.get() {
1767                  // Send the certificate to the BFT.
1768                  if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1769                      let err = err.context("Failed to update the BFT DAG from sync");
1770                      warn!("{}", &flatten_error(&err));
1771                      return Err(err);
1772                  };
1773              }
1774          }
1775          Ok(())
1776      }
1777  
1778      /// Recursively syncs using the given batch header.
1779      async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1780          &self,
1781          peer_ip: SocketAddr,
1782          batch_header: &BatchHeader<N>,
1783      ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1784          // Retrieve the batch round.
1785          let batch_round = batch_header.round();
1786  
1787          // If the certificate round is outdated, do not store it.
1788          if batch_round <= self.storage.gc_round() {
1789              bail!("Round {batch_round} is too far in the past")
1790          }
1791  
1792          // If node is not in sync mode and the node is not synced, then return an error.
1793          if !IS_SYNCING && !self.is_synced() {
1794              bail!(
1795                  "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1796                  fmt_id(batch_header.batch_id())
1797              );
1798          }
1799  
1800          // Determine if quorum threshold is reached on the batch round.
1801          let is_quorum_threshold_reached = {
1802              let authors = self.storage.get_certificate_authors_for_round(batch_round);
1803              let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1804              committee_lookback.is_quorum_threshold_reached(&authors)
1805          };
1806  
1807          // Check if our primary should move to the next round.
1808          // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1809          // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1810          // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1811          let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1812          // Check if our primary is far behind the peer.
1813          let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1814          // If our primary is far behind the peer, update our committee to the batch round.
1815          if is_behind_schedule || is_peer_far_in_future {
1816              // If the batch round is greater than the current committee round, update the committee.
1817              self.try_increment_to_the_next_round(batch_round).await?;
1818          }
1819  
1820          // Ensure the primary has all of the transmissions.
1821          let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1822  
1823          // Ensure the primary has all of the previous certificates.
1824          let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1825  
1826          // Wait for the missing transmissions and previous certificates to be fetched.
1827          let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1828              missing_transmissions_handle,
1829              missing_previous_certificates_handle,
1830          ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1831  
1832          // Iterate through the missing previous certificates.
1833          for batch_certificate in missing_previous_certificates {
1834              // Store the batch certificate (recursively fetching any missing previous certificates).
1835              self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1836          }
1837          Ok(missing_transmissions)
1838      }
1839  
1840      /// Fetches any missing transmissions for the specified batch header.
1841      /// If a transmission does not exist, it will be fetched from the specified peer IP.
1842      async fn fetch_missing_transmissions(
1843          &self,
1844          peer_ip: SocketAddr,
1845          batch_header: &BatchHeader<N>,
1846      ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1847          // If the round is <= the GC round, return early.
1848          if batch_header.round() <= self.storage.gc_round() {
1849              return Ok(Default::default());
1850          }
1851  
1852          // Ensure this batch ID is new, otherwise return early.
1853          if self.storage.contains_batch(batch_header.batch_id()) {
1854              trace!("Batch for round {} from peer has already been processed", batch_header.round());
1855              return Ok(Default::default());
1856          }
1857  
1858          // Retrieve the workers.
1859          let workers = self.workers.clone();
1860  
1861          // Initialize a list for the transmissions.
1862          let mut fetch_transmissions = FuturesUnordered::new();
1863  
1864          // Retrieve the number of workers.
1865          let num_workers = self.num_workers();
1866          // Iterate through the transmission IDs.
1867          for transmission_id in batch_header.transmission_ids() {
1868              // If the transmission does not exist in storage, proceed to fetch the transmission.
1869              if !self.storage.contains_transmission(*transmission_id) {
1870                  // Determine the worker ID.
1871                  let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1872                      bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1873                  };
1874                  // Retrieve the worker.
1875                  let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1876                  // Push the callback onto the list.
1877                  fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1878              }
1879          }
1880  
1881          // Initialize a set for the transmissions.
1882          let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1883          // Wait for all of the transmissions to be fetched.
1884          while let Some(result) = fetch_transmissions.next().await {
1885              // Retrieve the transmission.
1886              let (transmission_id, transmission) = result?;
1887              // Insert the transmission into the set.
1888              transmissions.insert(transmission_id, transmission);
1889          }
1890          // Return the transmissions.
1891          Ok(transmissions)
1892      }
1893  
1894      /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1895      async fn fetch_missing_previous_certificates(
1896          &self,
1897          peer_ip: SocketAddr,
1898          batch_header: &BatchHeader<N>,
1899      ) -> Result<HashSet<BatchCertificate<N>>> {
1900          // Retrieve the round.
1901          let round = batch_header.round();
1902          // If the previous round is 0, or is <= the GC round, return early.
1903          if round == 1 || round <= self.storage.gc_round() + 1 {
1904              return Ok(Default::default());
1905          }
1906  
1907          // Fetch the missing previous certificates.
1908          let missing_previous_certificates =
1909              self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1910          if !missing_previous_certificates.is_empty() {
1911              debug!(
1912                  "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1913                  missing_previous_certificates.len(),
1914              );
1915          }
1916          // Return the missing previous certificates.
1917          Ok(missing_previous_certificates)
1918      }
1919  
1920      /// Fetches any missing certificates for the specified batch header from the specified peer.
1921      async fn fetch_missing_certificates(
1922          &self,
1923          peer_ip: SocketAddr,
1924          round: u64,
1925          certificate_ids: &IndexSet<Field<N>>,
1926      ) -> Result<HashSet<BatchCertificate<N>>> {
1927          // Initialize a list for the missing certificates.
1928          let mut fetch_certificates = FuturesUnordered::new();
1929          // Initialize a set for the missing certificates.
1930          let mut missing_certificates = HashSet::default();
1931          // Iterate through the certificate IDs.
1932          for certificate_id in certificate_ids {
1933              // Check if the certificate already exists in the ledger.
1934              if self.ledger.contains_certificate(certificate_id)? {
1935                  continue;
1936              }
1937              // Check if the certificate already exists in storage.
1938              if self.storage.contains_certificate(*certificate_id) {
1939                  continue;
1940              }
1941              // If we have not fully processed the certificate yet, store it.
1942              if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1943                  missing_certificates.insert(certificate);
1944              } else {
1945                  // If we do not have the certificate, request it.
1946                  trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1947                  // TODO (howardwu): Limit the number of open requests we send to a peer.
1948                  // Send an certificate request to the peer.
1949                  fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1950              }
1951          }
1952  
1953          // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1954          match fetch_certificates.is_empty() {
1955              true => return Ok(missing_certificates),
1956              false => trace!(
1957                  "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1958                  fetch_certificates.len(),
1959              ),
1960          }
1961  
1962          // Wait for all of the missing certificates to be fetched.
1963          while let Some(result) = fetch_certificates.next().await {
1964              // Insert the missing certificate into the set.
1965              missing_certificates.insert(result?);
1966          }
1967          // Return the missing certificates.
1968          Ok(missing_certificates)
1969      }
1970  }
1971  
1972  impl<N: Network> Primary<N> {
1973      /// Spawns a task with the given future; it should only be used for long-running tasks.
1974      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1975          self.handles.lock().push(tokio::spawn(future));
1976      }
1977  
1978      /// Shuts down the primary.
1979      pub async fn shut_down(&self) {
1980          info!("Shutting down the primary...");
1981          // Shut down the workers.
1982          self.workers.iter().for_each(|worker| worker.shut_down());
1983          // Abort the tasks.
1984          self.handles.lock().iter().for_each(|handle| handle.abort());
1985          // Save the current proposal cache to disk.
1986          let proposal_cache = {
1987              let proposal = self.proposed_batch.write().take();
1988              let signed_proposals = self.signed_proposals.read().clone();
1989              let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1990              let pending_certificates = self.storage.get_pending_certificates();
1991              ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1992          };
1993          if let Err(err) = proposal_cache.store(&self.storage_mode) {
1994              error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
1995          }
1996          // Close the gateway.
1997          self.gateway.shut_down().await;
1998      }
1999  }
2000  
2001  #[cfg(test)]
2002  mod tests {
2003      use super::*;
2004      use alphaos_node_bft_ledger_service::MockLedgerService;
2005      use alphaos_node_bft_storage_service::BFTMemoryService;
2006      use alphaos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2007      use alphavm::{
2008          ledger::{
2009              committee::{Committee, MIN_VALIDATOR_STAKE},
2010              test_helpers::sample_execution_transaction_with_fee,
2011          },
2012          prelude::{Address, Signature},
2013      };
2014  
2015      use bytes::Bytes;
2016      use indexmap::IndexSet;
2017      use rand::RngCore;
2018  
2019      type CurrentNetwork = alphavm::prelude::MainnetV0;
2020  
2021      fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2022          // Create a committee containing the primary's account.
2023          const COMMITTEE_SIZE: usize = 4;
2024          let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2025          let mut members = IndexMap::new();
2026  
2027          for i in 0..COMMITTEE_SIZE {
2028              let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2029              let account = Account::new(rng).unwrap();
2030  
2031              members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2032              accounts.push((socket_addr, account));
2033          }
2034  
2035          (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2036      }
2037  
2038      // Returns a primary and a list of accounts in the configured committee.
2039      fn primary_with_committee(
2040          account_index: usize,
2041          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2042          committee: Committee<CurrentNetwork>,
2043          height: u32,
2044      ) -> Primary<CurrentNetwork> {
2045          let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2046          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2047  
2048          // Initialize the primary.
2049          let account = accounts[account_index].1.clone();
2050          let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2051          let mut primary =
2052              Primary::new(account, storage, ledger, block_sync, None, &[], false, StorageMode::new_test(None), None)
2053                  .unwrap();
2054  
2055          // Construct a worker instance.
2056          primary.workers = Arc::from([Worker::new(
2057              0, // id
2058              Arc::new(primary.gateway.clone()),
2059              primary.storage.clone(),
2060              primary.ledger.clone(),
2061              primary.proposed_batch.clone(),
2062          )
2063          .unwrap()]);
2064          for a in accounts.iter().skip(account_index) {
2065              primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2066          }
2067  
2068          primary
2069      }
2070  
2071      fn primary_without_handlers(
2072          rng: &mut TestRng,
2073      ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2074          let (accounts, committee) = sample_committee(rng);
2075          let primary = primary_with_committee(
2076              0, // index of primary's account
2077              &accounts,
2078              committee,
2079              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2080          );
2081  
2082          (primary, accounts)
2083      }
2084  
2085      // Creates a mock solution.
2086      fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2087          // Sample a random fake solution ID.
2088          let solution_id = rng.r#gen::<u64>().into();
2089          // Vary the size of the solutions.
2090          let size = rng.gen_range(1024..10 * 1024);
2091          // Sample random fake solution bytes.
2092          let mut vec = vec![0u8; size];
2093          rng.fill_bytes(&mut vec);
2094          let solution = Data::Buffer(Bytes::from(vec));
2095          // Return the solution ID and solution.
2096          (solution_id, solution)
2097      }
2098  
2099      // Samples a test transaction.
2100      fn sample_unconfirmed_transaction(
2101          rng: &mut TestRng,
2102      ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2103          let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2104          let id = transaction.id();
2105  
2106          (id, Data::Object(transaction))
2107      }
2108  
2109      // Creates a batch proposal with one solution and one transaction.
2110      fn create_test_proposal(
2111          author: &Account<CurrentNetwork>,
2112          committee: Committee<CurrentNetwork>,
2113          round: u64,
2114          previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2115          timestamp: i64,
2116          num_transactions: u64,
2117          rng: &mut TestRng,
2118      ) -> Proposal<CurrentNetwork> {
2119          let mut transmission_ids = IndexSet::new();
2120          let mut transmissions = IndexMap::new();
2121  
2122          // Prepare the solution and insert into the sets.
2123          let (solution_id, solution) = sample_unconfirmed_solution(rng);
2124          let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2125          let solution_transmission_id = (solution_id, solution_checksum).into();
2126          transmission_ids.insert(solution_transmission_id);
2127          transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2128  
2129          // Prepare the transactions and insert into the sets.
2130          for _ in 0..num_transactions {
2131              let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2132              let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2133              let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2134              transmission_ids.insert(transaction_transmission_id);
2135              transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2136          }
2137  
2138          // Retrieve the private key.
2139          let private_key = author.private_key();
2140          // Sign the batch header.
2141          let batch_header = BatchHeader::new(
2142              private_key,
2143              round,
2144              timestamp,
2145              committee.id(),
2146              transmission_ids,
2147              previous_certificate_ids,
2148              rng,
2149          )
2150          .unwrap();
2151          // Construct the proposal.
2152          Proposal::new(committee, batch_header, transmissions).unwrap()
2153      }
2154  
2155      // Creates a signature of the primary's current proposal for each committee member (excluding
2156      // the primary).
2157      fn peer_signatures_for_proposal(
2158          primary: &Primary<CurrentNetwork>,
2159          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2160          rng: &mut TestRng,
2161      ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2162          // Each committee member signs the batch.
2163          let mut signatures = Vec::with_capacity(accounts.len() - 1);
2164          for (socket_addr, account) in accounts {
2165              if account.address() == primary.gateway.account().address() {
2166                  continue;
2167              }
2168              let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2169              let signature = account.sign(&[batch_id], rng).unwrap();
2170              signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2171          }
2172  
2173          signatures
2174      }
2175  
2176      /// Creates a signature of the batch ID for each committee member (excluding the primary).
2177      fn peer_signatures_for_batch(
2178          primary_address: Address<CurrentNetwork>,
2179          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2180          batch_id: Field<CurrentNetwork>,
2181          rng: &mut TestRng,
2182      ) -> IndexSet<Signature<CurrentNetwork>> {
2183          let mut signatures = IndexSet::new();
2184          for (_, account) in accounts {
2185              if account.address() == primary_address {
2186                  continue;
2187              }
2188              let signature = account.sign(&[batch_id], rng).unwrap();
2189              signatures.insert(signature);
2190          }
2191          signatures
2192      }
2193  
2194      // Creates a batch certificate.
2195      fn create_batch_certificate(
2196          primary_address: Address<CurrentNetwork>,
2197          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2198          round: u64,
2199          previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2200          rng: &mut TestRng,
2201      ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2202          let timestamp = now();
2203  
2204          let author =
2205              accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2206          let private_key = author.private_key();
2207  
2208          let committee_id = Field::rand(rng);
2209          let (solution_id, solution) = sample_unconfirmed_solution(rng);
2210          let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2211          let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2212          let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2213  
2214          let solution_transmission_id = (solution_id, solution_checksum).into();
2215          let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2216  
2217          let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2218          let transmissions = [
2219              (solution_transmission_id, Transmission::Solution(solution)),
2220              (transaction_transmission_id, Transmission::Transaction(transaction)),
2221          ]
2222          .into();
2223  
2224          let batch_header = BatchHeader::new(
2225              private_key,
2226              round,
2227              timestamp,
2228              committee_id,
2229              transmission_ids,
2230              previous_certificate_ids,
2231              rng,
2232          )
2233          .unwrap();
2234          let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2235          let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2236          (certificate, transmissions)
2237      }
2238  
2239      // Create a certificate chain up to, but not including, the specified round in the primary storage.
2240      fn store_certificate_chain(
2241          primary: &Primary<CurrentNetwork>,
2242          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2243          round: u64,
2244          rng: &mut TestRng,
2245      ) -> IndexSet<Field<CurrentNetwork>> {
2246          let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2247          let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2248          for cur_round in 1..round {
2249              for (_, account) in accounts.iter() {
2250                  let (certificate, transmissions) = create_batch_certificate(
2251                      account.address(),
2252                      accounts,
2253                      cur_round,
2254                      previous_certificates.clone(),
2255                      rng,
2256                  );
2257                  next_certificates.insert(certificate.id());
2258                  assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2259              }
2260  
2261              assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2262              previous_certificates = next_certificates;
2263              next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2264          }
2265  
2266          previous_certificates
2267      }
2268  
2269      // Insert the account socket addresses into the resolver so that
2270      // they are recognized as "connected".
2271      fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2272          // First account is primary, which doesn't need to resolve.
2273          for (addr, acct) in accounts.iter().skip(1) {
2274              primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2275          }
2276      }
2277  
2278      #[tokio::test]
2279      async fn test_propose_batch() {
2280          let mut rng = TestRng::default();
2281          let (primary, _) = primary_without_handlers(&mut rng);
2282  
2283          // Check there is no batch currently proposed.
2284          assert!(primary.proposed_batch.read().is_none());
2285  
2286          // Generate a solution and a transaction.
2287          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2288          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2289  
2290          // Store it on one of the workers.
2291          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2292          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2293  
2294          // Try to propose a batch again. This time, it should succeed.
2295          assert!(primary.propose_batch().await.is_ok());
2296          assert!(primary.proposed_batch.read().is_some());
2297      }
2298  
2299      #[tokio::test]
2300      async fn test_propose_batch_with_no_transmissions() {
2301          let mut rng = TestRng::default();
2302          let (primary, _) = primary_without_handlers(&mut rng);
2303  
2304          // Check there is no batch currently proposed.
2305          assert!(primary.proposed_batch.read().is_none());
2306  
2307          // Try to propose a batch with no transmissions.
2308          assert!(primary.propose_batch().await.is_ok());
2309          assert!(primary.proposed_batch.read().is_some());
2310      }
2311  
2312      #[tokio::test]
2313      async fn test_propose_batch_in_round() {
2314          let round = 3;
2315          let mut rng = TestRng::default();
2316          let (primary, accounts) = primary_without_handlers(&mut rng);
2317  
2318          // Fill primary storage.
2319          store_certificate_chain(&primary, &accounts, round, &mut rng);
2320  
2321          // Sleep for a while to ensure the primary is ready to propose the next round.
2322          tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2323  
2324          // Generate a solution and a transaction.
2325          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2326          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2327  
2328          // Store it on one of the workers.
2329          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2330          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2331  
2332          // Propose a batch again. This time, it should succeed.
2333          assert!(primary.propose_batch().await.is_ok());
2334          assert!(primary.proposed_batch.read().is_some());
2335      }
2336  
2337      #[tokio::test]
2338      async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2339          let round = 3;
2340          let prev_round = round - 1;
2341          let mut rng = TestRng::default();
2342          let (primary, accounts) = primary_without_handlers(&mut rng);
2343          let peer_account = &accounts[1];
2344          let peer_ip = peer_account.0;
2345  
2346          // Fill primary storage.
2347          store_certificate_chain(&primary, &accounts, round, &mut rng);
2348  
2349          // Get transmissions from previous certificates.
2350          let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2351  
2352          // Track the number of transmissions in the previous round.
2353          let mut num_transmissions_in_previous_round = 0;
2354  
2355          // Generate a solution and a transaction.
2356          let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2357          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2358          let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2359          let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2360  
2361          // Store it on one of the workers.
2362          primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2363          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2364  
2365          // Check that the worker has 2 transmissions.
2366          assert_eq!(primary.workers[0].num_transmissions(), 2);
2367  
2368          // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2369          for (_, account) in accounts.iter() {
2370              let (certificate, transmissions) = create_batch_certificate(
2371                  account.address(),
2372                  &accounts,
2373                  round,
2374                  previous_certificate_ids.clone(),
2375                  &mut rng,
2376              );
2377  
2378              // Add the transmissions to the worker.
2379              for (transmission_id, transmission) in transmissions.iter() {
2380                  primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2381              }
2382  
2383              // Insert the certificate to storage.
2384              num_transmissions_in_previous_round += transmissions.len();
2385              primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2386          }
2387  
2388          // Sleep for a while to ensure the primary is ready to propose the next round.
2389          tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2390  
2391          // Advance to the next round.
2392          assert!(primary.storage.increment_to_next_round(round).is_ok());
2393  
2394          // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2395          assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2396  
2397          // Propose the batch.
2398          assert!(primary.propose_batch().await.is_ok());
2399  
2400          // Check that the proposal only contains the new transmissions that were not in previous certificates.
2401          let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2402          assert_eq!(proposed_transmissions.len(), 2);
2403          assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2404          assert!(
2405              proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2406          );
2407      }
2408  
2409      #[tokio::test]
2410      async fn test_propose_batch_over_spend_limit() {
2411          let mut rng = TestRng::default();
2412  
2413          // Create a primary to test spend limit backwards compatibility with V4.
2414          let (accounts, committee) = sample_committee(&mut rng);
2415          let primary = primary_with_committee(
2416              0,
2417              &accounts,
2418              committee.clone(),
2419              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2420          );
2421  
2422          // Check there is no batch currently proposed.
2423          assert!(primary.proposed_batch.read().is_none());
2424          // Check the workers are empty.
2425          primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2426  
2427          // Generate a solution and transactions.
2428          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2429          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2430  
2431          for _i in 0..5 {
2432              let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2433              // Store it on one of the workers.
2434              primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2435          }
2436  
2437          // Try to propose a batch again. This time, it should succeed.
2438          assert!(primary.propose_batch().await.is_ok());
2439          // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2440          assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2441          // Check the transmissions were correctly drained from the workers.
2442          assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2443      }
2444  
2445      #[tokio::test]
2446      async fn test_batch_propose_from_peer() {
2447          let mut rng = TestRng::default();
2448          let (primary, accounts) = primary_without_handlers(&mut rng);
2449  
2450          // Create a valid proposal with an author that isn't the primary.
2451          let round = 1;
2452          let peer_account = &accounts[1];
2453          let peer_ip = peer_account.0;
2454          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2455          let proposal = create_test_proposal(
2456              &peer_account.1,
2457              primary.ledger.current_committee().unwrap(),
2458              round,
2459              Default::default(),
2460              timestamp,
2461              1,
2462              &mut rng,
2463          );
2464  
2465          // Make sure the primary is aware of the transmissions in the proposal.
2466          for (transmission_id, transmission) in proposal.transmissions() {
2467              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2468          }
2469  
2470          // The author must be known to resolver to pass propose checks.
2471          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2472  
2473          // The primary will only consider itself synced if we received
2474          // block locators from a peer.
2475          primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2476          primary.sync.testing_only_try_block_sync_testing_only().await;
2477  
2478          // Try to process the batch proposal from the peer, should succeed.
2479          assert!(
2480              primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2481          );
2482      }
2483  
2484      #[tokio::test]
2485      async fn test_batch_propose_from_peer_when_not_synced() {
2486          let mut rng = TestRng::default();
2487          let (primary, accounts) = primary_without_handlers(&mut rng);
2488  
2489          // Create a valid proposal with an author that isn't the primary.
2490          let round = 1;
2491          let peer_account = &accounts[1];
2492          let peer_ip = peer_account.0;
2493          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2494          let proposal = create_test_proposal(
2495              &peer_account.1,
2496              primary.ledger.current_committee().unwrap(),
2497              round,
2498              Default::default(),
2499              timestamp,
2500              1,
2501              &mut rng,
2502          );
2503  
2504          // Make sure the primary is aware of the transmissions in the proposal.
2505          for (transmission_id, transmission) in proposal.transmissions() {
2506              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2507          }
2508  
2509          // The author must be known to resolver to pass propose checks.
2510          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2511  
2512          // Add a high block locator to indicate we are not synced.
2513          primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2514  
2515          // Try to process the batch proposal from the peer, should fail
2516          assert!(
2517              primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2518          );
2519      }
2520  
2521      #[tokio::test]
2522      async fn test_batch_propose_from_peer_in_round() {
2523          let round = 2;
2524          let mut rng = TestRng::default();
2525          let (primary, accounts) = primary_without_handlers(&mut rng);
2526  
2527          // Generate certificates.
2528          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2529  
2530          // Create a valid proposal with an author that isn't the primary.
2531          let peer_account = &accounts[1];
2532          let peer_ip = peer_account.0;
2533          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2534          let proposal = create_test_proposal(
2535              &peer_account.1,
2536              primary.ledger.current_committee().unwrap(),
2537              round,
2538              previous_certificates,
2539              timestamp,
2540              1,
2541              &mut rng,
2542          );
2543  
2544          // Make sure the primary is aware of the transmissions in the proposal.
2545          for (transmission_id, transmission) in proposal.transmissions() {
2546              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2547          }
2548  
2549          // The author must be known to resolver to pass propose checks.
2550          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2551  
2552          // The primary will only consider itself synced if we received
2553          // block locators from a peer.
2554          primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2555          primary.sync.testing_only_try_block_sync_testing_only().await;
2556  
2557          // Try to process the batch proposal from the peer, should succeed.
2558          primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2559      }
2560  
2561      #[tokio::test]
2562      async fn test_batch_propose_from_peer_wrong_round() {
2563          let mut rng = TestRng::default();
2564          let (primary, accounts) = primary_without_handlers(&mut rng);
2565  
2566          // Create a valid proposal with an author that isn't the primary.
2567          let round = 1;
2568          let peer_account = &accounts[1];
2569          let peer_ip = peer_account.0;
2570          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2571          let proposal = create_test_proposal(
2572              &peer_account.1,
2573              primary.ledger.current_committee().unwrap(),
2574              round,
2575              Default::default(),
2576              timestamp,
2577              1,
2578              &mut rng,
2579          );
2580  
2581          // Make sure the primary is aware of the transmissions in the proposal.
2582          for (transmission_id, transmission) in proposal.transmissions() {
2583              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2584          }
2585  
2586          // The author must be known to resolver to pass propose checks.
2587          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2588          // The primary must be considered synced.
2589          primary.sync.testing_only_try_block_sync_testing_only().await;
2590  
2591          // Try to process the batch proposal from the peer, should error.
2592          assert!(
2593              primary
2594                  .process_batch_propose_from_peer(peer_ip, BatchPropose {
2595                      round: round + 1,
2596                      batch_header: Data::Object(proposal.batch_header().clone())
2597                  })
2598                  .await
2599                  .is_err()
2600          );
2601      }
2602  
2603      #[tokio::test]
2604      async fn test_batch_propose_from_peer_in_round_wrong_round() {
2605          let round = 4;
2606          let mut rng = TestRng::default();
2607          let (primary, accounts) = primary_without_handlers(&mut rng);
2608  
2609          // Generate certificates.
2610          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2611  
2612          // Create a valid proposal with an author that isn't the primary.
2613          let peer_account = &accounts[1];
2614          let peer_ip = peer_account.0;
2615          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2616          let proposal = create_test_proposal(
2617              &peer_account.1,
2618              primary.ledger.current_committee().unwrap(),
2619              round,
2620              previous_certificates,
2621              timestamp,
2622              1,
2623              &mut rng,
2624          );
2625  
2626          // Make sure the primary is aware of the transmissions in the proposal.
2627          for (transmission_id, transmission) in proposal.transmissions() {
2628              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2629          }
2630  
2631          // The author must be known to resolver to pass propose checks.
2632          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2633          // The primary must be considered synced.
2634          primary.sync.testing_only_try_block_sync_testing_only().await;
2635  
2636          // Try to process the batch proposal from the peer, should error.
2637          assert!(
2638              primary
2639                  .process_batch_propose_from_peer(peer_ip, BatchPropose {
2640                      round: round + 1,
2641                      batch_header: Data::Object(proposal.batch_header().clone())
2642                  })
2643                  .await
2644                  .is_err()
2645          );
2646      }
2647  
2648      /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2649      #[tokio::test]
2650      async fn test_batch_propose_from_peer_with_past_timestamp() {
2651          let round = 2;
2652          let mut rng = TestRng::default();
2653          let (primary, accounts) = primary_without_handlers(&mut rng);
2654  
2655          // Generate certificates.
2656          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2657  
2658          // Create a valid proposal with an author that isn't the primary.
2659          let peer_account = &accounts[1];
2660          let peer_ip = peer_account.0;
2661  
2662          // Use a timestamp that is too early.
2663          // Set it to something that is less than the minimum batch delay
2664          // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2665          let last_timestamp = primary
2666              .storage
2667              .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2668              .expect("No previous proposal exists")
2669              .timestamp();
2670          let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2671  
2672          let proposal = create_test_proposal(
2673              &peer_account.1,
2674              primary.ledger.current_committee().unwrap(),
2675              round,
2676              previous_certificates,
2677              invalid_timestamp,
2678              1,
2679              &mut rng,
2680          );
2681  
2682          // Make sure the primary is aware of the transmissions in the proposal.
2683          for (transmission_id, transmission) in proposal.transmissions() {
2684              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2685          }
2686  
2687          // The author must be known to resolver to pass propose checks.
2688          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2689          // The primary must be considered synced.
2690          primary.sync.testing_only_try_block_sync_testing_only().await;
2691  
2692          // Try to process the batch proposal from the peer, should error.
2693          assert!(
2694              primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2695          );
2696      }
2697  
2698      /// Check that proposals rejected that have timestamps older than the previous proposal.
2699      #[tokio::test]
2700      async fn test_batch_propose_from_peer_over_spend_limit() {
2701          let mut rng = TestRng::default();
2702  
2703          // Create two primaries to test spend limit activation on V5.
2704          let (accounts, committee) = sample_committee(&mut rng);
2705          let primary_v4 = primary_with_committee(
2706              0,
2707              &accounts,
2708              committee.clone(),
2709              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2710          );
2711          let primary_v5 = primary_with_committee(
2712              1,
2713              &accounts,
2714              committee.clone(),
2715              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2716          );
2717  
2718          // Create a valid proposal with an author that isn't the primary.
2719          let round = 1;
2720          let peer_account = &accounts[2];
2721          let peer_ip = peer_account.0;
2722  
2723          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2724  
2725          let proposal =
2726              create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2727  
2728          // Make sure the primary is aware of the transmissions in the proposal.
2729          for (transmission_id, transmission) in proposal.transmissions() {
2730              primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2731              primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2732          }
2733  
2734          // The author must be known to resolver to pass propose checks.
2735          primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2736          primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2737  
2738          // primary v4 must be considered synced.
2739          primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2740          primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2741  
2742          // primary v5 must be ocnsidered synced.
2743          primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2744          primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2745  
2746          // Check the spend limit is enforced from V5 onwards.
2747          assert!(
2748              primary_v4
2749                  .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2750                  .await
2751                  .is_ok()
2752          );
2753  
2754          assert!(
2755              primary_v5
2756                  .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2757                  .await
2758                  .is_err()
2759          );
2760      }
2761  
2762      #[tokio::test]
2763      async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2764          let round = 3;
2765          let mut rng = TestRng::default();
2766          let (primary, _) = primary_without_handlers(&mut rng);
2767  
2768          // Check there is no batch currently proposed.
2769          assert!(primary.proposed_batch.read().is_none());
2770  
2771          // Generate a solution and a transaction.
2772          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2773          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2774  
2775          // Store it on one of the workers.
2776          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2777          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2778  
2779          // Set the proposal lock to a round ahead of the storage.
2780          let old_proposal_lock_round = *primary.propose_lock.lock().await;
2781          *primary.propose_lock.lock().await = round + 1;
2782  
2783          // Propose a batch and enforce that it fails.
2784          assert!(primary.propose_batch().await.is_ok());
2785          assert!(primary.proposed_batch.read().is_none());
2786  
2787          // Set the proposal lock back to the old round.
2788          *primary.propose_lock.lock().await = old_proposal_lock_round;
2789  
2790          // Try to propose a batch again. This time, it should succeed.
2791          assert!(primary.propose_batch().await.is_ok());
2792          assert!(primary.proposed_batch.read().is_some());
2793      }
2794  
2795      #[tokio::test]
2796      async fn test_propose_batch_with_storage_round_behind_proposal() {
2797          let round = 5;
2798          let mut rng = TestRng::default();
2799          let (primary, accounts) = primary_without_handlers(&mut rng);
2800  
2801          // Generate previous certificates.
2802          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2803  
2804          // Create a valid proposal.
2805          let timestamp = now();
2806          let proposal = create_test_proposal(
2807              primary.gateway.account(),
2808              primary.ledger.current_committee().unwrap(),
2809              round + 1,
2810              previous_certificates,
2811              timestamp,
2812              1,
2813              &mut rng,
2814          );
2815  
2816          // Store the proposal on the primary.
2817          *primary.proposed_batch.write() = Some(proposal);
2818  
2819          // Try to propose a batch will terminate early because the storage is behind the proposal.
2820          assert!(primary.propose_batch().await.is_ok());
2821          assert!(primary.proposed_batch.read().is_some());
2822          assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2823      }
2824  
2825      #[tokio::test(flavor = "multi_thread")]
2826      async fn test_batch_signature_from_peer() {
2827          let mut rng = TestRng::default();
2828          let (primary, accounts) = primary_without_handlers(&mut rng);
2829          map_account_addresses(&primary, &accounts);
2830  
2831          // Create a valid proposal.
2832          let round = 1;
2833          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2834          let proposal = create_test_proposal(
2835              primary.gateway.account(),
2836              primary.ledger.current_committee().unwrap(),
2837              round,
2838              Default::default(),
2839              timestamp,
2840              1,
2841              &mut rng,
2842          );
2843  
2844          // Store the proposal on the primary.
2845          *primary.proposed_batch.write() = Some(proposal);
2846  
2847          // Each committee member signs the batch.
2848          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2849  
2850          // Have the primary process the signatures.
2851          for (socket_addr, signature) in signatures {
2852              primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2853          }
2854  
2855          // Check the certificate was created and stored by the primary.
2856          assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2857          // Check the round was incremented.
2858          assert_eq!(primary.current_round(), round + 1);
2859      }
2860  
2861      #[tokio::test(flavor = "multi_thread")]
2862      async fn test_batch_signature_from_peer_in_round() {
2863          let round = 5;
2864          let mut rng = TestRng::default();
2865          let (primary, accounts) = primary_without_handlers(&mut rng);
2866          map_account_addresses(&primary, &accounts);
2867  
2868          // Generate certificates.
2869          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2870  
2871          // Create a valid proposal.
2872          let timestamp = now();
2873          let proposal = create_test_proposal(
2874              primary.gateway.account(),
2875              primary.ledger.current_committee().unwrap(),
2876              round,
2877              previous_certificates,
2878              timestamp,
2879              1,
2880              &mut rng,
2881          );
2882  
2883          // Store the proposal on the primary.
2884          *primary.proposed_batch.write() = Some(proposal);
2885  
2886          // Each committee member signs the batch.
2887          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2888  
2889          // Have the primary process the signatures.
2890          for (socket_addr, signature) in signatures {
2891              primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2892          }
2893  
2894          // Check the certificate was created and stored by the primary.
2895          assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2896          // Check the round was incremented.
2897          assert_eq!(primary.current_round(), round + 1);
2898      }
2899  
2900      #[tokio::test]
2901      async fn test_batch_signature_from_peer_no_quorum() {
2902          let mut rng = TestRng::default();
2903          let (primary, accounts) = primary_without_handlers(&mut rng);
2904          map_account_addresses(&primary, &accounts);
2905  
2906          // Create a valid proposal.
2907          let round = 1;
2908          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2909          let proposal = create_test_proposal(
2910              primary.gateway.account(),
2911              primary.ledger.current_committee().unwrap(),
2912              round,
2913              Default::default(),
2914              timestamp,
2915              1,
2916              &mut rng,
2917          );
2918  
2919          // Store the proposal on the primary.
2920          *primary.proposed_batch.write() = Some(proposal);
2921  
2922          // Each committee member signs the batch.
2923          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2924  
2925          // Have the primary process only one signature, mimicking a lack of quorum.
2926          let (socket_addr, signature) = signatures.first().unwrap();
2927          primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2928  
2929          // Check the certificate was not created and stored by the primary.
2930          assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2931          // Check the round was incremented.
2932          assert_eq!(primary.current_round(), round);
2933      }
2934  
2935      #[tokio::test]
2936      async fn test_batch_signature_from_peer_in_round_no_quorum() {
2937          let round = 7;
2938          let mut rng = TestRng::default();
2939          let (primary, accounts) = primary_without_handlers(&mut rng);
2940          map_account_addresses(&primary, &accounts);
2941  
2942          // Generate certificates.
2943          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2944  
2945          // Create a valid proposal.
2946          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2947          let proposal = create_test_proposal(
2948              primary.gateway.account(),
2949              primary.ledger.current_committee().unwrap(),
2950              round,
2951              previous_certificates,
2952              timestamp,
2953              1,
2954              &mut rng,
2955          );
2956  
2957          // Store the proposal on the primary.
2958          *primary.proposed_batch.write() = Some(proposal);
2959  
2960          // Each committee member signs the batch.
2961          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2962  
2963          // Have the primary process only one signature, mimicking a lack of quorum.
2964          let (socket_addr, signature) = signatures.first().unwrap();
2965          primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2966  
2967          // Check the certificate was not created and stored by the primary.
2968          assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2969          // Check the round was incremented.
2970          assert_eq!(primary.current_round(), round);
2971      }
2972  
2973      #[tokio::test]
2974      async fn test_insert_certificate_with_aborted_transmissions() {
2975          let round = 3;
2976          let prev_round = round - 1;
2977          let mut rng = TestRng::default();
2978          let (primary, accounts) = primary_without_handlers(&mut rng);
2979          let peer_account = &accounts[1];
2980          let peer_ip = peer_account.0;
2981  
2982          // Fill primary storage.
2983          store_certificate_chain(&primary, &accounts, round, &mut rng);
2984  
2985          // Get transmissions from previous certificates.
2986          let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2987  
2988          // Generate a solution and a transaction.
2989          let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2990          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2991  
2992          // Store it on one of the workers.
2993          primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2994          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2995  
2996          // Check that the worker has 2 transmissions.
2997          assert_eq!(primary.workers[0].num_transmissions(), 2);
2998  
2999          // Create certificates for the current round.
3000          let account = accounts[0].1.clone();
3001          let (certificate, transmissions) =
3002              create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3003          let certificate_id = certificate.id();
3004  
3005          // Randomly abort some of the transmissions.
3006          let mut aborted_transmissions = HashSet::new();
3007          let mut transmissions_without_aborted = HashMap::new();
3008          for (transmission_id, transmission) in transmissions.clone() {
3009              match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3010                  true => {
3011                      // Insert the aborted transmission.
3012                      aborted_transmissions.insert(transmission_id);
3013                  }
3014                  false => {
3015                      // Insert the transmission without the aborted transmission.
3016                      transmissions_without_aborted.insert(transmission_id, transmission);
3017                  }
3018              };
3019          }
3020  
3021          // Add the non-aborted transmissions to the worker.
3022          for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3023              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3024          }
3025  
3026          // Check that inserting the transmission with missing transmissions fails.
3027          assert!(
3028              primary
3029                  .storage
3030                  .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3031                  .is_err()
3032          );
3033          assert!(
3034              primary
3035                  .storage
3036                  .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3037                  .is_err()
3038          );
3039  
3040          // Insert the certificate to storage.
3041          primary
3042              .storage
3043              .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3044              .unwrap();
3045  
3046          // Ensure the certificate exists in storage.
3047          assert!(primary.storage.contains_certificate(certificate_id));
3048          // Ensure that the aborted transmission IDs exist in storage.
3049          for aborted_transmission_id in aborted_transmissions {
3050              assert!(primary.storage.contains_transmission(aborted_transmission_id));
3051              assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3052          }
3053      }
3054  }