/ node / bft / src / primary.rs
primary.rs
   1  // Copyright (c) 2025-2026 ACDC Network
   2  // This file is part of the alphaos library.
   3  //
   4  // Alpha Chain | Delta Chain Protocol
   5  // International Monetary Graphite.
   6  //
   7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
   8  // They built world-class ZK infrastructure. We installed the EASY button.
   9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
  10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
  11  //
  12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
  13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
  14  // No rights reserved. No permission required. No warranty. No refunds.
  15  //
  16  // https://creativecommons.org/publicdomain/zero/1.0/
  17  // SPDX-License-Identifier: CC0-1.0
  18  
  19  use crate::{
  20      events::{BatchPropose, BatchSignature, Event},
  21      helpers::{
  22          assign_to_worker,
  23          assign_to_workers,
  24          fmt_id,
  25          init_sync_channels,
  26          init_worker_channels,
  27          now,
  28          BFTSender,
  29          PrimaryReceiver,
  30          PrimarySender,
  31          Proposal,
  32          ProposalCache,
  33          SignedProposals,
  34          Storage,
  35      },
  36      spawn_blocking,
  37      Gateway,
  38      Sync,
  39      Transport,
  40      Worker,
  41      MAX_BATCH_DELAY_IN_MS,
  42      MAX_WORKERS,
  43      MIN_BATCH_DELAY_IN_SECS,
  44      PRIMARY_PING_IN_MS,
  45      WORKER_PING_IN_MS,
  46  };
  47  use alphaos_account::Account;
  48  use alphaos_node_bft_events::PrimaryPing;
  49  use alphaos_node_bft_ledger_service::LedgerService;
  50  use alphaos_node_network::PeerPoolHandling;
  51  use alphaos_node_sync::{BlockSync, Ping, DUMMY_SELF_IP};
  52  use alphavm::{
  53      console::{
  54          prelude::*,
  55          types::{Address, Field},
  56      },
  57      ledger::{
  58          block::Transaction,
  59          narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
  60          puzzle::{Solution, SolutionID},
  61      },
  62      prelude::{committee::Committee, ConsensusVersion},
  63      utilities::flatten_error,
  64  };
  65  
  66  use alphastd::StorageMode;
  67  use anyhow::Context;
  68  use colored::Colorize;
  69  use futures::stream::{FuturesUnordered, StreamExt};
  70  use indexmap::{IndexMap, IndexSet};
  71  #[cfg(feature = "locktick")]
  72  use locktick::{
  73      parking_lot::{Mutex, RwLock},
  74      tokio::Mutex as TMutex,
  75  };
  76  #[cfg(not(feature = "locktick"))]
  77  use parking_lot::{Mutex, RwLock};
  78  #[cfg(not(feature = "serial"))]
  79  use rayon::prelude::*;
  80  use std::{
  81      collections::{HashMap, HashSet},
  82      future::Future,
  83      net::SocketAddr,
  84      sync::Arc,
  85      time::Duration,
  86  };
  87  #[cfg(not(feature = "locktick"))]
  88  use tokio::sync::Mutex as TMutex;
  89  use tokio::{sync::OnceCell, task::JoinHandle};
  90  
  91  /// A helper type for an optional proposed batch.
  92  pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
  93  
  94  /// The primary logic of a node.
  95  /// AlphaBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
  96  #[derive(Clone)]
  97  pub struct Primary<N: Network> {
  98      /// The sync module enables fetching data from other validators.
  99      sync: Sync<N>,
 100      /// The gateway allows talking to other nodes in the validator set.
 101      gateway: Gateway<N>,
 102      /// The storage.
 103      storage: Storage<N>,
 104      /// The ledger service.
 105      ledger: Arc<dyn LedgerService<N>>,
 106      /// The workers.
 107      workers: Arc<[Worker<N>]>,
 108      /// The BFT sender.
 109      bft_sender: Arc<OnceCell<BFTSender<N>>>,
 110      /// The batch proposal, if the primary is currently proposing a batch.
 111      proposed_batch: Arc<ProposedBatch<N>>,
 112      /// The timestamp of the most recent proposed batch.
 113      latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
 114      /// The recently-signed batch proposals.
 115      signed_proposals: Arc<RwLock<SignedProposals<N>>>,
 116      /// The handles for all background tasks spawned by this primary.
 117      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
 118      /// The lock for propose_batch.
 119      propose_lock: Arc<TMutex<u64>>,
 120      /// The storage mode of the node.
 121      storage_mode: StorageMode,
 122  }
 123  
 124  impl<N: Network> Primary<N> {
 125      /// The maximum number of unconfirmed transmissions to send to the primary.
 126      pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
 127  
 128      /// Initializes a new primary instance.
 129      #[allow(clippy::too_many_arguments)]
 130      pub fn new(
 131          account: Account<N>,
 132          storage: Storage<N>,
 133          ledger: Arc<dyn LedgerService<N>>,
 134          block_sync: Arc<BlockSync<N>>,
 135          ip: Option<SocketAddr>,
 136          trusted_validators: &[SocketAddr],
 137          trusted_peers_only: bool,
 138          storage_mode: StorageMode,
 139          dev: Option<u16>,
 140      ) -> Result<Self> {
 141          // Initialize the gateway.
 142          let gateway = Gateway::new(
 143              account,
 144              storage.clone(),
 145              ledger.clone(),
 146              ip,
 147              trusted_validators,
 148              trusted_peers_only,
 149              storage_mode.clone(),
 150              dev,
 151          )?;
 152          // Initialize the sync module.
 153          let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
 154  
 155          // Initialize the primary instance.
 156          Ok(Self {
 157              sync,
 158              gateway,
 159              storage,
 160              ledger,
 161              workers: Arc::from(vec![]),
 162              bft_sender: Default::default(),
 163              proposed_batch: Default::default(),
 164              latest_proposed_batch_timestamp: Default::default(),
 165              signed_proposals: Default::default(),
 166              handles: Default::default(),
 167              propose_lock: Default::default(),
 168              storage_mode,
 169          })
 170      }
 171  
 172      /// Load the proposal cache file and update the Primary state with the stored data.
 173      async fn load_proposal_cache(&self) -> Result<()> {
 174          // IMPORTANT: Skip proposal cache in dev mode to allow fresh starts.
 175          // Dev mode is for testing and should allow easy resets without manual cleanup.
 176          // Production mode still benefits from proposal cache for crash recovery.
 177          if self.storage_mode.dev().is_some() {
 178              info!("Skipping proposal cache in dev mode (allows fresh restarts)");
 179              return Ok(());
 180          }
 181  
 182          // Fetch the signed proposals from the file system if it exists.
 183          match ProposalCache::<N>::exists(&self.storage_mode) {
 184              // If the proposal cache exists, then process the proposal cache.
 185              true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
 186                  Ok(proposal_cache) => {
 187                      // Extract the proposal and signed proposals.
 188                      let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
 189                          proposal_cache.into();
 190  
 191                      // Write the proposed batch.
 192                      *self.proposed_batch.write() = proposed_batch;
 193                      // Write the signed proposals.
 194                      *self.signed_proposals.write() = signed_proposals;
 195                      // Writ the propose lock.
 196                      *self.propose_lock.lock().await = latest_certificate_round;
 197  
 198                      // Update the storage with the pending certificates.
 199                      for certificate in pending_certificates {
 200                          let batch_id = certificate.batch_id();
 201                          // We use a dummy IP because the node should not need to request from any peers.
 202                          // The storage should have stored all the transmissions. If not, we simply
 203                          // skip the certificate.
 204                          if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
 205                          {
 206                              let err = err.context(format!(
 207                                  "Failed to load stored certificate {} from proposal cache",
 208                                  fmt_id(batch_id)
 209                              ));
 210                              warn!("{}", &flatten_error(err));
 211                          }
 212                      }
 213                      Ok(())
 214                  }
 215                  Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
 216              },
 217              // If the proposal cache does not exist, then return early.
 218              false => Ok(()),
 219          }
 220      }
 221  
 222      /// Run the primary instance.
 223      pub async fn run(
 224          &mut self,
 225          ping: Option<Arc<Ping<N>>>,
 226          bft_sender: Option<BFTSender<N>>,
 227          primary_sender: PrimarySender<N>,
 228          primary_receiver: PrimaryReceiver<N>,
 229      ) -> Result<()> {
 230          info!("Starting the primary instance of the memory pool...");
 231  
 232          // Set the BFT sender.
 233          if let Some(bft_sender) = &bft_sender {
 234              // Set the BFT sender in the primary.
 235              self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
 236          }
 237  
 238          // Construct a map of the worker senders.
 239          let mut worker_senders = IndexMap::new();
 240          // Construct a map for the workers.
 241          let mut workers = Vec::new();
 242          // Initialize the workers.
 243          for id in 0..MAX_WORKERS {
 244              // Construct the worker channels.
 245              let (tx_worker, rx_worker) = init_worker_channels();
 246              // Construct the worker instance.
 247              let worker = Worker::new(
 248                  id,
 249                  Arc::new(self.gateway.clone()),
 250                  self.storage.clone(),
 251                  self.ledger.clone(),
 252                  self.proposed_batch.clone(),
 253              )?;
 254              // Run the worker instance.
 255              worker.run(rx_worker);
 256              // Add the worker to the list of workers.
 257              workers.push(worker);
 258              // Add the worker sender to the map.
 259              worker_senders.insert(id, tx_worker);
 260          }
 261          // Set the workers.
 262          self.workers = Arc::from(workers);
 263  
 264          // First, initialize the sync channels.
 265          let (sync_sender, sync_receiver) = init_sync_channels();
 266          // Next, initialize the sync module and sync the storage from ledger.
 267          self.sync.initialize(bft_sender).await?;
 268          // Next, load and process the proposal cache before running the sync module.
 269          self.load_proposal_cache().await?;
 270          // Next, run the sync module.
 271          self.sync.run(ping, sync_receiver).await?;
 272          // Next, initialize the gateway.
 273          self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
 274          // Lastly, start the primary handlers.
 275          // Note: This ensures the primary does not start communicating before syncing is complete.
 276          self.start_handlers(primary_receiver);
 277  
 278          Ok(())
 279      }
 280  
 281      /// Returns the current round.
 282      pub fn current_round(&self) -> u64 {
 283          self.storage.current_round()
 284      }
 285  
 286      /// Returns `true` if the primary is synced.
 287      pub fn is_synced(&self) -> bool {
 288          self.sync.is_synced()
 289      }
 290  
 291      /// Returns the gateway.
 292      pub const fn gateway(&self) -> &Gateway<N> {
 293          &self.gateway
 294      }
 295  
 296      /// Returns the storage.
 297      pub const fn storage(&self) -> &Storage<N> {
 298          &self.storage
 299      }
 300  
 301      /// Returns the ledger.
 302      pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
 303          &self.ledger
 304      }
 305  
 306      /// Returns the number of workers.
 307      pub fn num_workers(&self) -> u8 {
 308          u8::try_from(self.workers.len()).expect("Too many workers")
 309      }
 310  
 311      /// Returns the workers.
 312      pub const fn workers(&self) -> &Arc<[Worker<N>]> {
 313          &self.workers
 314      }
 315  
 316      /// Returns the batch proposal of our primary, if one currently exists.
 317      pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
 318          &self.proposed_batch
 319      }
 320  }
 321  
 322  impl<N: Network> Primary<N> {
 323      /// Returns the number of unconfirmed transmissions.
 324      pub fn num_unconfirmed_transmissions(&self) -> usize {
 325          self.workers.iter().map(|worker| worker.num_transmissions()).sum()
 326      }
 327  
 328      /// Returns the number of unconfirmed ratifications.
 329      pub fn num_unconfirmed_ratifications(&self) -> usize {
 330          self.workers.iter().map(|worker| worker.num_ratifications()).sum()
 331      }
 332  
 333      /// Returns the number of unconfirmed solutions.
 334      pub fn num_unconfirmed_solutions(&self) -> usize {
 335          self.workers.iter().map(|worker| worker.num_solutions()).sum()
 336      }
 337  
 338      /// Returns the number of unconfirmed transactions.
 339      pub fn num_unconfirmed_transactions(&self) -> usize {
 340          self.workers.iter().map(|worker| worker.num_transactions()).sum()
 341      }
 342  }
 343  
 344  impl<N: Network> Primary<N> {
 345      /// Returns the worker transmission IDs.
 346      pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
 347          self.workers.iter().flat_map(|worker| worker.transmission_ids())
 348      }
 349  
 350      /// Returns the worker transmissions.
 351      pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
 352          self.workers.iter().flat_map(|worker| worker.transmissions())
 353      }
 354  
 355      /// Returns the worker solutions.
 356      pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
 357          self.workers.iter().flat_map(|worker| worker.solutions())
 358      }
 359  
 360      /// Returns the worker transactions.
 361      pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
 362          self.workers.iter().flat_map(|worker| worker.transactions())
 363      }
 364  }
 365  
 366  impl<N: Network> Primary<N> {
 367      /// Clears the worker solutions.
 368      pub fn clear_worker_solutions(&self) {
 369          self.workers.iter().for_each(Worker::clear_solutions);
 370      }
 371  }
 372  
 373  impl<N: Network> Primary<N> {
 374      /// Proposes the batch for the current round.
 375      ///
 376      /// This method performs the following steps:
 377      /// 1. Drain the workers.
 378      /// 2. Sign the batch.
 379      /// 3. Set the batch proposal in the primary.
 380      /// 4. Broadcast the batch header to all validators for signing.
 381      pub async fn propose_batch(&self) -> Result<()> {
 382          // This function isn't re-entrant.
 383          let mut lock_guard = self.propose_lock.lock().await;
 384  
 385          // Check if the proposed batch has expired, and clear it if it has expired.
 386          if let Err(err) = self
 387              .check_proposed_batch_for_expiration()
 388              .await
 389              .with_context(|| "Failed to check the proposed batch for expiration")
 390          {
 391              warn!("{}", flatten_error(&err));
 392              return Ok(());
 393          }
 394  
 395          // Retrieve the current round.
 396          let round = self.current_round();
 397          // Compute the previous round.
 398          let previous_round = round.saturating_sub(1);
 399  
 400          // If the current round is 0, return early.
 401          // This can actually never happen, because of the invariant that the current round is never 0
 402          // (see [`StorageInner::current_round`]).
 403          ensure!(round > 0, "Round 0 cannot have transaction batches");
 404  
 405          // If the current storage round is below the latest proposal round, then return early.
 406          if round < *lock_guard {
 407              warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
 408              return Ok(());
 409          }
 410  
 411          // If there is a batch being proposed already,
 412          // rebroadcast the batch header to the non-signers, and return early.
 413          if let Some(proposal) = self.proposed_batch.read().as_ref() {
 414              // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
 415              if round < proposal.round()
 416                  || proposal
 417                      .batch_header()
 418                      .previous_certificate_ids()
 419                      .iter()
 420                      .any(|id| !self.storage.contains_certificate(*id))
 421              {
 422                  warn!(
 423                      "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
 424                      proposal.round(),
 425                  );
 426                  return Ok(());
 427              }
 428              // Construct the event.
 429              // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
 430              let event = Event::BatchPropose(proposal.batch_header().clone().into());
 431              // Iterate through the non-signers.
 432              for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
 433                  // Resolve the address to the peer IP.
 434                  match self.gateway.resolver().read().get_peer_ip_for_address(address) {
 435                      // Resend the batch proposal to the validator for signing.
 436                      Some(peer_ip) => {
 437                          let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
 438                          tokio::spawn(async move {
 439                              debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
 440                              // Resend the batch proposal to the peer.
 441                              if gateway.send(peer_ip, event_).await.is_none() {
 442                                  warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
 443                              }
 444                          });
 445                      }
 446                      None => continue,
 447                  }
 448              }
 449              debug!("Proposed batch for round {} is still valid", proposal.round());
 450              return Ok(());
 451          }
 452  
 453          #[cfg(feature = "metrics")]
 454          metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
 455  
 456          // Ensure that the primary does not create a new proposal too quickly.
 457          if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
 458              debug!(
 459                  "{}",
 460                  flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
 461              );
 462              return Ok(());
 463          }
 464  
 465          // Ensure the primary has not proposed a batch for this round before.
 466          if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
 467              // If a BFT sender was provided, attempt to advance the current round.
 468              if let Some(bft_sender) = self.bft_sender.get() {
 469                  match bft_sender.send_primary_round_to_bft(self.current_round()).await {
 470                      // 'is_ready' is true if the primary is ready to propose a batch for the next round.
 471                      Ok(true) => (), // continue,
 472                      // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
 473                      Ok(false) => return Ok(()),
 474                      // An error occurred while attempting to advance the current round.
 475                      Err(err) => {
 476                          let err = err.context("Failed to update the BFT to the next round");
 477                          warn!("{}", &flatten_error(&err));
 478                          return Err(err);
 479                      }
 480                  }
 481              }
 482              debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
 483              return Ok(());
 484          }
 485  
 486          // Determine if the current round has been proposed.
 487          // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
 488          // good for network reliability and should not be prevented for the already existing proposed_batch.
 489          // If a certificate already exists for the current round, an attempt should be made to advance the
 490          // round as early as possible.
 491          if round == *lock_guard {
 492              debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
 493              return Ok(());
 494          }
 495  
 496          // Retrieve the committee to check against.
 497          let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
 498          // Check if the primary is connected to enough validators to reach quorum threshold.
 499          {
 500              // Retrieve the connected validator addresses.
 501              let mut connected_validators = self.gateway.connected_addresses();
 502              // Append the primary to the set.
 503              connected_validators.insert(self.gateway.account().address());
 504              // If quorum threshold is not reached, return early.
 505              if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
 506                  debug!(
 507                      "Primary is safely skipping a batch proposal for round {round} {}",
 508                      "(please connect to more validators)".dimmed()
 509                  );
 510                  trace!("Primary is connected to {} validators", connected_validators.len() - 1);
 511                  return Ok(());
 512              }
 513          }
 514  
 515          // Retrieve the previous certificates.
 516          let previous_certificates = self.storage.get_certificates_for_round(previous_round);
 517  
 518          // Check if the batch is ready to be proposed.
 519          // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
 520          let mut is_ready = previous_round == 0;
 521          // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
 522          if previous_round > 0 {
 523              // Retrieve the committee lookback for the round.
 524              let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
 525                  bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
 526              };
 527              // Construct a set over the authors.
 528              let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
 529              // Check if the previous certificates have reached the quorum threshold.
 530              if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
 531                  is_ready = true;
 532              }
 533          }
 534          // If the batch is not ready to be proposed, return early.
 535          if !is_ready {
 536              debug!(
 537                  "Primary is safely skipping a batch proposal for round {round} {}",
 538                  format!("(previous round {previous_round} has not reached quorum)").dimmed()
 539              );
 540              return Ok(());
 541          }
 542  
 543          // Initialize the map of transmissions.
 544          let mut transmissions: IndexMap<_, _> = Default::default();
 545          // Track the total execution costs of the batch proposal as it is being constructed.
 546          let mut proposal_cost = 0u64;
 547          // Note: worker draining and transaction inclusion needs to be thought
 548          // through carefully when there is more than one worker. The fairness
 549          // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
 550          debug_assert_eq!(MAX_WORKERS, 1);
 551  
 552          'outer: for worker in self.workers().iter() {
 553              let mut num_worker_transmissions = 0usize;
 554  
 555              while let Some((id, transmission)) = worker.remove_front() {
 556                  // Check the selected transmissions are below the batch limit.
 557                  if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
 558                      // Reinsert the transmission into the worker.
 559                      worker.insert_front(id, transmission);
 560                      break 'outer;
 561                  }
 562  
 563                  // Check the max transmissions per worker is not exceeded.
 564                  if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
 565                      // Reinsert the transmission into the worker.
 566                      worker.insert_front(id, transmission);
 567                      continue 'outer;
 568                  }
 569  
 570                  // Check if the ledger already contains the transmission.
 571                  if self.ledger.contains_transmission(&id).unwrap_or(true) {
 572                      trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
 573                      continue;
 574                  }
 575  
 576                  // Check if the storage already contain the transmission.
 577                  // Note: We do not skip if this is the first transmission in the proposal, to ensure that
 578                  // the primary does not propose a batch with no transmissions.
 579                  if !transmissions.is_empty() && self.storage.contains_transmission(id) {
 580                      trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
 581                      continue;
 582                  }
 583  
 584                  // Check the transmission is still valid.
 585                  match (id, transmission.clone()) {
 586                      (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
 587                          // Ensure the checksum matches. If not, skip the solution.
 588                          if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
 589                          {
 590                              trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
 591                              continue;
 592                          }
 593                          // Check if the solution is still valid.
 594                          if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
 595                              trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
 596                              continue;
 597                          }
 598                      }
 599                      (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
 600                          // Ensure the checksum matches. If not, skip the transaction.
 601                          if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
 602                          {
 603                              trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
 604                              continue;
 605                          }
 606  
 607                          // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
 608                          let transaction = spawn_blocking!({
 609                              match transaction {
 610                                  Data::Object(transaction) => Ok(transaction),
 611                                  Data::Buffer(bytes) => {
 612                                      Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
 613                                  }
 614                              }
 615                          })?;
 616  
 617                          // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
 618                          // ConsensusVersion V8 Migration logic -
 619                          // Do not include deployments in a batch proposal.
 620                          let current_block_height = self.ledger.latest_block_height();
 621                          let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
 622                          let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
 623                          let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
 624                          if current_block_height > consensus_version_v7_height
 625                              && current_block_height <= consensus_version_v8_height
 626                              && transaction.is_deploy()
 627                          {
 628                              trace!(
 629                                  "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
 630                                  fmt_id(transaction_id)
 631                              );
 632                              continue;
 633                          }
 634  
 635                          // Compute the transaction spent cost (in microcredits).
 636                          // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
 637                          let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
 638                          else {
 639                              debug!(
 640                                  "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
 641                                  fmt_id(transaction_id)
 642                              );
 643                              continue;
 644                          };
 645  
 646                          // Check if the transaction is still valid.
 647                          if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
 648                              trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
 649                              continue;
 650                          }
 651  
 652                          // Compute the next proposal cost.
 653                          // Note: We purposefully discard this transaction if the proposal cost overflows.
 654                          let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
 655                              debug!(
 656                                  "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
 657                                  fmt_id(transaction_id)
 658                              );
 659                              continue;
 660                          };
 661  
 662                          // Check if the next proposal cost exceeds the batch proposal spend limit.
 663                          let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
 664                          if next_proposal_cost > batch_spend_limit {
 665                              debug!(
 666                                  "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
 667                                  fmt_id(transaction_id),
 668                                  batch_spend_limit
 669                              );
 670  
 671                              // Reinsert the transmission into the worker.
 672                              worker.insert_front(id, transmission);
 673                              break 'outer;
 674                          }
 675  
 676                          // Update the proposal cost.
 677                          proposal_cost = next_proposal_cost;
 678                      }
 679  
 680                      // Note: We explicitly forbid including ratifications,
 681                      // as the protocol currently does not support ratifications.
 682                      (TransmissionID::Ratification, Transmission::Ratification) => continue,
 683                      // All other combinations are clearly invalid.
 684                      _ => continue,
 685                  }
 686  
 687                  // If the transmission is valid, insert it into the proposal's transmission list.
 688                  transmissions.insert(id, transmission);
 689                  num_worker_transmissions = num_worker_transmissions.saturating_add(1);
 690              }
 691          }
 692  
 693          // Determine the current timestamp.
 694          let current_timestamp = now();
 695  
 696          *lock_guard = round;
 697  
 698          /* Proceeding to sign & propose the batch. */
 699          info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
 700  
 701          // Retrieve the private key.
 702          let private_key = *self.gateway.account().private_key();
 703          // Retrieve the committee ID.
 704          let committee_id = committee_lookback.id();
 705          // Prepare the transmission IDs.
 706          let transmission_ids = transmissions.keys().copied().collect();
 707          // Prepare the previous batch certificate IDs.
 708          let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
 709          // Sign the batch header and construct the proposal.
 710          let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
 711              &private_key,
 712              round,
 713              current_timestamp,
 714              committee_id,
 715              transmission_ids,
 716              previous_certificate_ids,
 717              &mut rand::thread_rng()
 718          ))
 719          .and_then(|batch_header| {
 720              Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
 721                  .map(|proposal| (batch_header, proposal))
 722          })
 723          .inspect_err(|_| {
 724              // On error, reinsert the transmissions and then propagate the error.
 725              if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
 726                  error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
 727              }
 728          })?;
 729          // Broadcast the batch to all validators for signing.
 730          self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
 731          // Set the timestamp of the latest proposed batch.
 732          *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
 733          // Set the proposed batch.
 734          *self.proposed_batch.write() = Some(proposal);
 735          Ok(())
 736      }
 737  
 738      /// Processes a batch propose from a peer.
 739      ///
 740      /// This method performs the following steps:
 741      /// 1. Verify the batch.
 742      /// 2. Sign the batch.
 743      /// 3. Broadcast the signature back to the validator.
 744      ///
 745      /// If our primary is ahead of the peer, we will not sign the batch.
 746      /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
 747      async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
 748          let BatchPropose { round: batch_round, batch_header } = batch_propose;
 749  
 750          // Deserialize the batch header.
 751          let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
 752          // Ensure the round matches in the batch header.
 753          if batch_round != batch_header.round() {
 754              // Proceed to disconnect the validator.
 755              self.gateway.disconnect(peer_ip);
 756              bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
 757          }
 758  
 759          // Retrieve the batch author.
 760          let batch_author = batch_header.author();
 761  
 762          // Ensure the batch proposal is from the validator.
 763          match self.gateway.resolve_to_alpha_addr(peer_ip) {
 764              // If the peer is a validator, then ensure the batch proposal is from the validator.
 765              Some(address) => {
 766                  if address != batch_author {
 767                      // Proceed to disconnect the validator.
 768                      self.gateway.disconnect(peer_ip);
 769                      bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
 770                  }
 771              }
 772              None => bail!("Batch proposal from a disconnected validator"),
 773          }
 774          // Ensure the batch author is a current committee member.
 775          if !self.gateway.is_authorized_validator_address(batch_author) {
 776              // Proceed to disconnect the validator.
 777              self.gateway.disconnect(peer_ip);
 778              bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
 779          }
 780          // Ensure the batch proposal is not from the current primary.
 781          if self.gateway.account().address() == batch_author {
 782              bail!("Invalid peer - proposed batch from myself ({batch_author})");
 783          }
 784  
 785          // Ensure that the batch proposal's committee ID matches the expected committee ID.
 786          let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
 787          if expected_committee_id != batch_header.committee_id() {
 788              // Proceed to disconnect the validator.
 789              self.gateway.disconnect(peer_ip);
 790              bail!(
 791                  "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
 792                  batch_header.committee_id()
 793              );
 794          }
 795  
 796          // Retrieve the cached round and batch ID for this validator.
 797          if let Some((signed_round, signed_batch_id, signature)) =
 798              self.signed_proposals.read().get(&batch_author).copied()
 799          {
 800              // If the signed round is ahead of the peer's batch round, do not sign the proposal.
 801              // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
 802              if signed_round > batch_header.round() {
 803                  bail!(
 804                      "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
 805                      batch_header.round()
 806                  );
 807              }
 808  
 809              // If the round matches and the batch ID differs, then the validator is malicious.
 810              if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
 811                  bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
 812              }
 813              // If the round and batch ID matches, then skip signing the batch a second time.
 814              // Instead, rebroadcast the cached signature to the peer.
 815              if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
 816                  let gateway = self.gateway.clone();
 817                  tokio::spawn(async move {
 818                      debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
 819                      let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
 820                      // Resend the batch signature to the peer.
 821                      if gateway.send(peer_ip, event).await.is_none() {
 822                          warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
 823                      }
 824                  });
 825                  // Return early.
 826                  return Ok(());
 827              }
 828          }
 829  
 830          // Ensure that the batch header doesn't already exist in storage.
 831          // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
 832          if self.storage.contains_batch(batch_header.batch_id()) {
 833              debug!(
 834                  "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
 835                  format!("batch for round {batch_round} already exists in storage").dimmed()
 836              );
 837              return Ok(());
 838          }
 839  
 840          // Compute the previous round.
 841          let previous_round = batch_round.saturating_sub(1);
 842          // Ensure that the peer did not propose a batch too quickly.
 843          if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
 844              // Proceed to disconnect the validator.
 845              self.gateway.disconnect(peer_ip);
 846              return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
 847          }
 848  
 849          // Ensure the batch header does not contain any ratifications.
 850          if batch_header.contains(TransmissionID::Ratification) {
 851              // Proceed to disconnect the validator.
 852              self.gateway.disconnect(peer_ip);
 853              bail!(
 854                  "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
 855              );
 856          }
 857  
 858          // If the peer is ahead, use the batch header to sync up to the peer.
 859          let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
 860  
 861          // Check that the transmission ids match and are not fee transactions.
 862          if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
 863              // If the transmission is not well-formed, then return early.
 864              self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
 865          }) {
 866              let err = err.context(format!(
 867                  "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
 868              ));
 869              debug!("{}", flatten_error(err));
 870              return Ok(());
 871          }
 872  
 873          // Ensure the batch is for the current round.
 874          // This method must be called after fetching previous certificates (above),
 875          // and prior to checking the batch header (below).
 876          if let Err(e) = self.ensure_is_signing_round(batch_round) {
 877              // If the primary is not signing for the peer's round, then return early.
 878              debug!("{e} from '{peer_ip}'");
 879              return Ok(());
 880          }
 881  
 882          // Ensure the batch header from the peer is valid.
 883          let (storage, header) = (self.storage.clone(), batch_header.clone());
 884  
 885          // Check the batch header, and return early if it already exists in storage.
 886          let Some(missing_transmissions) =
 887              spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
 888          else {
 889              return Ok(());
 890          };
 891  
 892          // Inserts the missing transmissions into the workers.
 893          self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
 894  
 895          // Ensure the transaction doesn't bring the proposal above the spend limit.
 896          let block_height = self.ledger.latest_block_height() + 1;
 897          if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
 898              let mut proposal_cost = 0u64;
 899              for transmission_id in batch_header.transmission_ids() {
 900                  let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
 901                  let Some(worker) = self.workers.get(worker_id as usize) else {
 902                      debug!("Unable to find worker {worker_id}");
 903                      return Ok(());
 904                  };
 905  
 906                  let Some(transmission) = worker.get_transmission(*transmission_id) else {
 907                      debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
 908                      return Ok(());
 909                  };
 910  
 911                  // If the transmission is a transaction, compute its execution cost.
 912                  if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
 913                      (transmission_id, transmission)
 914                  {
 915                      // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
 916                      let transaction = spawn_blocking!({
 917                          match transaction {
 918                              Data::Object(transaction) => Ok(transaction),
 919                              Data::Buffer(bytes) => {
 920                                  Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
 921                              }
 922                          }
 923                      })?;
 924  
 925                      // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
 926                      // ConsensusVersion V8 Migration logic -
 927                      // Do not include deployments in a batch proposal.
 928                      let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
 929                      let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
 930                      let consensus_version = N::CONSENSUS_VERSION(block_height)?;
 931                      if block_height > consensus_version_v7_height
 932                          && block_height <= consensus_version_v8_height
 933                          && transaction.is_deploy()
 934                      {
 935                          bail!(
 936                              "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
 937                          )
 938                      }
 939  
 940                      // Compute the transaction spent cost (in microcredits).
 941                      // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
 942                      let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
 943                      else {
 944                          bail!(
 945                              "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
 946                              fmt_id(transaction_id)
 947                          )
 948                      };
 949  
 950                      // Compute the next proposal cost.
 951                      // Note: We purposefully discard this transaction if the proposal cost overflows.
 952                      let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
 953                          bail!(
 954                              "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
 955                              fmt_id(transaction_id)
 956                          )
 957                      };
 958  
 959                      // Check if the next proposal cost exceeds the batch proposal spend limit.
 960                      let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
 961                      if next_proposal_cost > batch_spend_limit {
 962                          bail!(
 963                              "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
 964                              fmt_id(transaction_id),
 965                              batch_spend_limit
 966                          );
 967                      }
 968  
 969                      // Update the proposal cost.
 970                      proposal_cost = next_proposal_cost;
 971                  }
 972              }
 973          }
 974  
 975          /* Proceeding to sign the batch. */
 976  
 977          // Retrieve the batch ID.
 978          let batch_id = batch_header.batch_id();
 979          // Sign the batch ID.
 980          let account = self.gateway.account().clone();
 981          let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
 982  
 983          // Ensure the proposal has not already been signed.
 984          //
 985          // Note: Due to the need to sync the batch header with the peer, it is possible
 986          // for the primary to receive the same 'BatchPropose' event again, whereby only
 987          // one instance of this handler should sign the batch. This check guarantees this.
 988          match self.signed_proposals.write().0.entry(batch_author) {
 989              std::collections::hash_map::Entry::Occupied(mut entry) => {
 990                  // If the validator has already signed a batch for this round, then return early,
 991                  // since, if the peer still has not received the signature, they will request it again,
 992                  // and the logic at the start of this function will resend the (now cached) signature
 993                  // to the peer if asked to sign this batch proposal again.
 994                  if entry.get().0 == batch_round {
 995                      return Ok(());
 996                  }
 997                  // Otherwise, cache the round, batch ID, and signature for this validator.
 998                  entry.insert((batch_round, batch_id, signature));
 999              }
1000              // If the validator has not signed a batch before, then continue.
1001              std::collections::hash_map::Entry::Vacant(entry) => {
1002                  // Cache the round, batch ID, and signature for this validator.
1003                  entry.insert((batch_round, batch_id, signature));
1004              }
1005          };
1006  
1007          // Broadcast the signature back to the validator.
1008          let self_ = self.clone();
1009          tokio::spawn(async move {
1010              let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1011              // Send the batch signature to the peer.
1012              if self_.gateway.send(peer_ip, event).await.is_some() {
1013                  debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1014              }
1015          });
1016  
1017          Ok(())
1018      }
1019  
1020      /// Processes a batch signature from a peer.
1021      ///
1022      /// This method performs the following steps:
1023      /// 1. Ensure the proposed batch has not expired.
1024      /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
1025      /// 3. Store the signature.
1026      /// 4. Certify the batch if enough signatures have been received.
1027      /// 5. Broadcast the batch certificate to all validators.
1028      async fn process_batch_signature_from_peer(
1029          &self,
1030          peer_ip: SocketAddr,
1031          batch_signature: BatchSignature<N>,
1032      ) -> Result<()> {
1033          // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1034          self.check_proposed_batch_for_expiration().await?;
1035  
1036          // Retrieve the signature and timestamp.
1037          let BatchSignature { batch_id, signature } = batch_signature;
1038  
1039          // Retrieve the signer.
1040          let signer = signature.to_address();
1041  
1042          // Ensure the batch signature is signed by the validator.
1043          if self.gateway.resolve_to_alpha_addr(peer_ip) != Some(signer) {
1044              // Proceed to disconnect the validator.
1045              self.gateway.disconnect(peer_ip);
1046              bail!("Malicious peer - batch signature is from a different validator ({signer})");
1047          }
1048          // Ensure the batch signature is not from the current primary.
1049          if self.gateway.account().address() == signer {
1050              bail!("Invalid peer - received a batch signature from myself ({signer})");
1051          }
1052  
1053          let self_ = self.clone();
1054          let Some(proposal) = spawn_blocking!({
1055              // Acquire the write lock.
1056              let mut proposed_batch = self_.proposed_batch.write();
1057              // Add the signature to the batch, and determine if the batch is ready to be certified.
1058              match proposed_batch.as_mut() {
1059                  Some(proposal) => {
1060                      // Ensure the batch ID matches the currently proposed batch ID.
1061                      if proposal.batch_id() != batch_id {
1062                          match self_.storage.contains_batch(batch_id) {
1063                              // If this batch was already certified, return early.
1064                              true => {
1065                                  debug!(
1066                                      "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1067                                      proposal.round()
1068                                  );
1069                                  return Ok(None);
1070                              }
1071                              // If the batch ID is unknown, return an error.
1072                              false => bail!(
1073                                  "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1074                                  proposal.batch_id(),
1075                                  proposal.round()
1076                              ),
1077                          }
1078                      }
1079                      // Retrieve the committee lookback for the round.
1080                      let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1081                      // Retrieve the address of the validator.
1082                      let Some(signer) = self_.gateway.resolve_to_alpha_addr(peer_ip) else {
1083                          bail!("Signature is from a disconnected validator");
1084                      };
1085                      // Add the signature to the batch.
1086                      let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1087  
1088                      if new_signature {
1089                          info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1090                          // Check if the batch is ready to be certified.
1091                          if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1092                              // If the batch is not ready to be certified, return early.
1093                              return Ok(None);
1094                          }
1095                      } else {
1096                          debug!(
1097                              "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1098                              round = proposal.round()
1099                          );
1100                          return Ok(None);
1101                      }
1102                  }
1103                  // There is no proposed batch, so return early.
1104                  None => return Ok(None),
1105              };
1106              // Retrieve the batch proposal, clearing the proposed batch.
1107              match proposed_batch.take() {
1108                  Some(proposal) => Ok(Some(proposal)),
1109                  None => Ok(None),
1110              }
1111          })?
1112          else {
1113              return Ok(());
1114          };
1115  
1116          /* Proceeding to certify the batch. */
1117  
1118          info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1119  
1120          // Retrieve the committee lookback for the round.
1121          let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1122          // Store the certified batch and broadcast it to all validators.
1123          // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1124          if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1125              // Reinsert the transmissions back into the ready queue for the next proposal.
1126              self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1127              return Err(e);
1128          }
1129  
1130          #[cfg(feature = "metrics")]
1131          metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1132          Ok(())
1133      }
1134  
1135      /// Processes a batch certificate from a peer.
1136      ///
1137      /// This method performs the following steps:
1138      /// 1. Stores the given batch certificate, after ensuring it is valid.
1139      /// 2. If there are enough certificates to reach quorum threshold for the current round,
1140      ///    then proceed to advance to the next round.
1141      async fn process_batch_certificate_from_peer(
1142          &self,
1143          peer_ip: SocketAddr,
1144          certificate: BatchCertificate<N>,
1145      ) -> Result<()> {
1146          // Ensure the batch certificate is from an authorized validator.
1147          if !self.gateway.is_authorized_validator_ip(peer_ip) {
1148              // Proceed to disconnect the validator.
1149              self.gateway.disconnect(peer_ip);
1150              bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1151          }
1152          // Ensure storage does not already contain the certificate.
1153          if self.storage.contains_certificate(certificate.id()) {
1154              return Ok(());
1155          // Otherwise, ensure ephemeral storage contains the certificate.
1156          } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1157              self.storage.insert_unprocessed_certificate(certificate.clone())?;
1158          }
1159  
1160          // Retrieve the batch certificate author.
1161          let author = certificate.author();
1162          // Retrieve the batch certificate round.
1163          let certificate_round = certificate.round();
1164          // Retrieve the batch certificate committee ID.
1165          let committee_id = certificate.committee_id();
1166  
1167          // Ensure the batch certificate is not from the current primary.
1168          if self.gateway.account().address() == author {
1169              bail!("Received a batch certificate for myself ({author})");
1170          }
1171  
1172          // Ensure that the incoming certificate is valid.
1173          self.storage.check_incoming_certificate(&certificate)?;
1174  
1175          // Store the certificate, after ensuring it is valid above.
1176          // The following call recursively fetches and stores
1177          // the previous certificates referenced from this certificate.
1178          // It is critical to make the following call this after validating the certificate above.
1179          // The reason is that a sequence of malformed certificates,
1180          // with references to previous certificates with non-decreasing rounds,
1181          // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1182          // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1183          // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1184          // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1185          // TODO: eliminate those redundant checks
1186          self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1187  
1188          // If there are enough certificates to reach quorum threshold for the certificate round,
1189          // then proceed to advance to the next round.
1190  
1191          // Retrieve the committee lookback.
1192          let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1193  
1194          // Retrieve the certificate authors.
1195          let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1196          // Check if the certificates have reached the quorum threshold.
1197          let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1198  
1199          // Ensure that the batch certificate's committee ID matches the expected committee ID.
1200          let expected_committee_id = committee_lookback.id();
1201          if expected_committee_id != committee_id {
1202              // Proceed to disconnect the validator.
1203              self.gateway.disconnect(peer_ip);
1204              bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1205          }
1206  
1207          // Determine if we are currently proposing a round that is relevant.
1208          // Note: This is important, because while our peers have advanced,
1209          // they may not be proposing yet, and thus still able to sign our proposed batch.
1210          let should_advance = match &*self.proposed_batch.read() {
1211              // We advance if the proposal round is less than the current round that was just certified.
1212              Some(proposal) => proposal.round() < certificate_round,
1213              // If there's no proposal, we consider advancing.
1214              None => true,
1215          };
1216  
1217          // Retrieve the current round.
1218          let current_round = self.current_round();
1219  
1220          // Determine whether to advance to the next round.
1221          if is_quorum && should_advance && certificate_round >= current_round {
1222              // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1223              self.try_increment_to_the_next_round(current_round + 1).await?;
1224          }
1225          Ok(())
1226      }
1227  }
1228  
1229  impl<N: Network> Primary<N> {
1230      /// Starts the primary handlers.
1231      ///
1232      /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1233      /// that awaits new data and handles it accordingly.
1234      /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically
1235      /// tries to move the the next round of batches.
1236      ///
1237      /// This function is called exactly once, in `Self::run()`.
1238      fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1239          let PrimaryReceiver {
1240              mut rx_batch_propose,
1241              mut rx_batch_signature,
1242              mut rx_batch_certified,
1243              mut rx_primary_ping,
1244              mut rx_unconfirmed_solution,
1245              mut rx_unconfirmed_transaction,
1246          } = primary_receiver;
1247  
1248          // Start the primary ping sender.
1249          let self_ = self.clone();
1250          self.spawn(async move {
1251              loop {
1252                  // Sleep briefly.
1253                  tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1254  
1255                  // Retrieve the block locators.
1256                  let self__ = self_.clone();
1257                  let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1258                      Ok(block_locators) => block_locators,
1259                      Err(e) => {
1260                          warn!("Failed to retrieve block locators - {e}");
1261                          continue;
1262                      }
1263                  };
1264  
1265                  // Retrieve the latest certificate of the primary.
1266                  let primary_certificate = {
1267                      // Retrieve the primary address.
1268                      let primary_address = self_.gateway.account().address();
1269  
1270                      // Iterate backwards from the latest round to find the primary certificate.
1271                      let mut certificate = None;
1272                      let mut current_round = self_.current_round();
1273                      while certificate.is_none() {
1274                          // If the current round is 0, then break the while loop.
1275                          if current_round == 0 {
1276                              break;
1277                          }
1278                          // Retrieve the primary certificates.
1279                          if let Some(primary_certificate) =
1280                              self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1281                          {
1282                              certificate = Some(primary_certificate);
1283                          // If the primary certificate was not found, decrement the round.
1284                          } else {
1285                              current_round = current_round.saturating_sub(1);
1286                          }
1287                      }
1288  
1289                      // Determine if the primary certificate was found.
1290                      match certificate {
1291                          Some(certificate) => certificate,
1292                          // Skip this iteration of the loop (do not send a primary ping).
1293                          None => continue,
1294                      }
1295                  };
1296  
1297                  // Construct the primary ping.
1298                  let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1299                  // Broadcast the event.
1300                  self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1301              }
1302          });
1303  
1304          // Start the primary ping handler.
1305          let self_ = self.clone();
1306          self.spawn(async move {
1307              while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1308                  // If the primary is not synced, then do not process the primary ping.
1309                  if self_.sync.is_synced() {
1310                      trace!("Processing new primary ping from '{peer_ip}'");
1311                  } else {
1312                      trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1313                      continue;
1314                  }
1315  
1316                  // Spawn a task to process the primary certificate.
1317                  {
1318                      let self_ = self_.clone();
1319                      tokio::spawn(async move {
1320                          // Deserialize the primary certificate in the primary ping.
1321                          let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1322                          else {
1323                              warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1324                              return;
1325                          };
1326                          // Process the primary certificate.
1327                          let id = fmt_id(primary_certificate.id());
1328                          let round = primary_certificate.round();
1329                          if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1330                              warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1331                          }
1332                      });
1333                  }
1334              }
1335          });
1336  
1337          // Start the worker ping(s).
1338          let self_ = self.clone();
1339          self.spawn(async move {
1340              loop {
1341                  tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1342                  // If the primary is not synced, then do not broadcast the worker ping(s).
1343                  if !self_.sync.is_synced() {
1344                      trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1345                      continue;
1346                  }
1347                  // Broadcast the worker ping(s).
1348                  for worker in self_.workers.iter() {
1349                      worker.broadcast_ping();
1350                  }
1351              }
1352          });
1353  
1354          // Start the batch proposer.
1355          let self_ = self.clone();
1356          self.spawn(async move {
1357              loop {
1358                  // Sleep briefly, but longer than if there were no batch.
1359                  tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1360                  let current_round = self_.current_round();
1361                  // If the primary is not synced, then do not propose a batch.
1362                  if !self_.sync.is_synced() {
1363                      debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1364                      continue;
1365                  }
1366                  // A best-effort attempt to skip the scheduled batch proposal if
1367                  // round progression already triggered one.
1368                  if self_.propose_lock.try_lock().is_err() {
1369                      trace!(
1370                          "Skipping batch proposal for round {current_round} {}",
1371                          "(node is already proposing)".dimmed()
1372                      );
1373                      continue;
1374                  };
1375                  // If there is no proposed batch, attempt to propose a batch.
1376                  // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1377                  // and only one batch needs to be proposed at a time.
1378                  if let Err(e) = self_.propose_batch().await {
1379                      warn!("Cannot propose a batch - {e}");
1380                  }
1381              }
1382          });
1383  
1384          // Start the proposed batch handler.
1385          let self_ = self.clone();
1386          self.spawn(async move {
1387              while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1388                  // If the primary is not synced, then do not sign the batch.
1389                  if !self_.sync.is_synced() {
1390                      trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1391                      continue;
1392                  }
1393                  // Spawn a task to process the proposed batch.
1394                  let self_ = self_.clone();
1395                  tokio::spawn(async move {
1396                      // Process the batch proposal.
1397                      let round = batch_propose.round;
1398                      if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1399                          warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1400                      }
1401                  });
1402              }
1403          });
1404  
1405          // Start the batch signature handler.
1406          let self_ = self.clone();
1407          self.spawn(async move {
1408              while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1409                  // If the primary is not synced, then do not store the signature.
1410                  if !self_.sync.is_synced() {
1411                      trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1412                      continue;
1413                  }
1414                  // Process the batch signature.
1415                  // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1416                  // is a critical path, and we should only store the minimum required number of signatures.
1417                  // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1418                  // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1419                  let id = fmt_id(batch_signature.batch_id);
1420                  if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1421                      let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1422                      warn!("{}", flatten_error(err));
1423                  }
1424              }
1425          });
1426  
1427          // Start the certified batch handler.
1428          let self_ = self.clone();
1429          self.spawn(async move {
1430              while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1431                  // If the primary is not synced, then do not store the certificate.
1432                  if !self_.sync.is_synced() {
1433                      trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1434                      continue;
1435                  }
1436                  // Spawn a task to process the batch certificate.
1437                  let self_ = self_.clone();
1438                  tokio::spawn(async move {
1439                      // Deserialize the batch certificate.
1440                      let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1441                          warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1442                          return;
1443                      };
1444                      // Process the batch certificate.
1445                      let id = fmt_id(batch_certificate.id());
1446                      let round = batch_certificate.round();
1447                      if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1448                          warn!(
1449                              "{}",
1450                              flatten_error(err.context(format!(
1451                                  "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1452                              )))
1453                          );
1454                      }
1455                  });
1456              }
1457          });
1458  
1459          // This task periodically tries to move to the next round.
1460          //
1461          // Note: This is necessary to ensure that the primary is not stuck on a previous round
1462          // despite having received enough certificates to advance to the next round.
1463          let self_ = self.clone();
1464          self.spawn(async move {
1465              loop {
1466                  // Sleep briefly.
1467                  tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1468                  // If the primary is not synced, then do not increment to the next round.
1469                  if !self_.sync.is_synced() {
1470                      trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1471                      continue;
1472                  }
1473                  // Attempt to increment to the next round.
1474                  let current_round = self_.current_round();
1475                  let next_round = current_round.saturating_add(1);
1476                  // Determine if the quorum threshold is reached for the current round.
1477                  let is_quorum_threshold_reached = {
1478                      // Retrieve the certificate authors for the current round.
1479                      let authors = self_.storage.get_certificate_authors_for_round(current_round);
1480                      // If there are no certificates, then skip this check.
1481                      if authors.is_empty() {
1482                          continue;
1483                      }
1484                      // Retrieve the committee lookback for the current round.
1485                      let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1486                          warn!("Failed to retrieve the committee lookback for round {current_round}");
1487                          continue;
1488                      };
1489                      // Check if the quorum threshold is reached for the current round.
1490                      committee_lookback.is_quorum_threshold_reached(&authors)
1491                  };
1492                  // Attempt to increment to the next round if the quorum threshold is reached.
1493                  if is_quorum_threshold_reached {
1494                      debug!("Quorum threshold reached for round {current_round}");
1495                      if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1496                          warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1497                      }
1498                  }
1499              }
1500          });
1501  
1502          // Start a handler to process new unconfirmed solutions.
1503          let self_ = self.clone();
1504          self.spawn(async move {
1505              while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1506                  // Compute the checksum for the solution.
1507                  let Ok(checksum) = solution.to_checksum::<N>() else {
1508                      error!("Failed to compute the checksum for the unconfirmed solution");
1509                      continue;
1510                  };
1511                  // Compute the worker ID.
1512                  let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1513                      error!("Unable to determine the worker ID for the unconfirmed solution");
1514                      continue;
1515                  };
1516                  let self_ = self_.clone();
1517                  tokio::spawn(async move {
1518                      // Retrieve the worker.
1519                      let worker = &self_.workers[worker_id as usize];
1520                      // Process the unconfirmed solution.
1521                      let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1522                      // Send the result to the callback.
1523                      callback.send(result).ok();
1524                  });
1525              }
1526          });
1527  
1528          // Start a handler to process new unconfirmed transactions.
1529          let self_ = self.clone();
1530          self.spawn(async move {
1531              while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1532                  trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1533                  // Compute the checksum for the transaction.
1534                  let Ok(checksum) = transaction.to_checksum::<N>() else {
1535                      error!("Failed to compute the checksum for the unconfirmed transaction");
1536                      continue;
1537                  };
1538                  // Compute the worker ID.
1539                  let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1540                      error!("Unable to determine the worker ID for the unconfirmed transaction");
1541                      continue;
1542                  };
1543                  let self_ = self_.clone();
1544                  tokio::spawn(async move {
1545                      // Retrieve the worker.
1546                      let worker = &self_.workers[worker_id as usize];
1547                      // Process the unconfirmed transaction.
1548                      let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1549                      // Send the result to the callback.
1550                      callback.send(result).ok();
1551                  });
1552              }
1553          });
1554      }
1555  
1556      /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1557      async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1558          // Check if the proposed batch is timed out or stale.
1559          let is_expired = match self.proposed_batch.read().as_ref() {
1560              Some(proposal) => proposal.round() < self.current_round(),
1561              None => false,
1562          };
1563          // If the batch is expired, clear the proposed batch.
1564          if is_expired {
1565              // Reset the proposed batch.
1566              let proposal = self.proposed_batch.write().take();
1567              if let Some(proposal) = proposal {
1568                  debug!("Cleared expired proposal for round {}", proposal.round());
1569                  self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1570              }
1571          }
1572          Ok(())
1573      }
1574  
1575      /// Increments to the next round.
1576      async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1577          // If the next round is within GC range, then iterate to the penultimate round.
1578          if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1579              let mut fast_forward_round = self.current_round();
1580              // Iterate until the penultimate round is reached.
1581              while fast_forward_round < next_round.saturating_sub(1) {
1582                  // Update to the next round in storage.
1583                  fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1584                  // Clear the proposed batch.
1585                  *self.proposed_batch.write() = None;
1586              }
1587          }
1588  
1589          // Retrieve the current round.
1590          let current_round = self.current_round();
1591          // Attempt to advance to the next round.
1592          if current_round < next_round {
1593              // If a BFT sender was provided, send the current round to the BFT.
1594              let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1595                  match bft_sender.send_primary_round_to_bft(current_round).await {
1596                      Ok(is_ready) => is_ready,
1597                      Err(err) => {
1598                          let err = err.context("Failed to update the BFT to the next round");
1599                          warn!("{}", flatten_error(&err));
1600                          return Err(err);
1601                      }
1602                  }
1603              }
1604              // Otherwise, handle the Narwhal case.
1605              else {
1606                  // Update to the next round in storage.
1607                  self.storage.increment_to_next_round(current_round)?;
1608                  // Set 'is_ready' to 'true'.
1609                  true
1610              };
1611  
1612              // Log whether the next round is ready.
1613              match is_ready {
1614                  true => debug!("Primary is ready to propose the next round"),
1615                  false => debug!("Primary is not ready to propose the next round"),
1616              }
1617  
1618              // If the node is ready, propose a batch for the next round.
1619              if is_ready {
1620                  self.propose_batch().await?;
1621              }
1622          }
1623          Ok(())
1624      }
1625  
1626      /// Ensures the primary is signing for the specified batch round.
1627      /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1628      /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1629      fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1630          // Retrieve the current round.
1631          let current_round = self.current_round();
1632          // Ensure the batch round is within GC range of the current round.
1633          if current_round + self.storage.max_gc_rounds() <= batch_round {
1634              bail!("Round {batch_round} is too far in the future")
1635          }
1636          // Ensure the batch round is at or one before the current round.
1637          // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1638          // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1639          if current_round > batch_round + 1 {
1640              bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1641          }
1642          // Check if the primary is still signing for the batch round.
1643          if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1644              if signing_round > batch_round {
1645                  bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1646              }
1647          }
1648          Ok(())
1649      }
1650  
1651      /// Ensure the primary is not creating batch proposals too frequently.
1652      /// This checks that the certificate timestamp for the previous round is within the expected range.
1653      fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1654          // Retrieve the timestamp of the previous timestamp to check against.
1655          let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1656              // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1657              Some(certificate) => certificate.timestamp(),
1658              None => match self.gateway.account().address() == author {
1659                  // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1660                  true => *self.latest_proposed_batch_timestamp.read(),
1661                  // If we do not see a previous certificate for the author, then proceed optimistically.
1662                  false => return Ok(()),
1663              },
1664          };
1665  
1666          // Determine the elapsed time since the previous timestamp.
1667          let elapsed = timestamp
1668              .checked_sub(previous_timestamp)
1669              .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1670          // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1671          match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1672              true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1673              false => Ok(()),
1674          }
1675      }
1676  
1677      /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1678      async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1679          // Create the batch certificate and transmissions.
1680          let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1681          // Convert the transmissions into a HashMap.
1682          // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1683          let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1684          // Store the certified batch.
1685          let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1686          spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1687          debug!("Stored a batch certificate for round {}", certificate.round());
1688          // If a BFT sender was provided, send the certificate to the BFT.
1689          if let Some(bft_sender) = self.bft_sender.get() {
1690              // Await the callback to continue.
1691              if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1692                  let err = err.context("Failed to update the BFT DAG from primary");
1693                  warn!("{}", flatten_error(&err));
1694                  return Err(err);
1695              };
1696          }
1697          // Broadcast the certified batch to all validators.
1698          self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1699          // Log the certified batch.
1700          let num_transmissions = certificate.transmission_ids().len();
1701          let round = certificate.round();
1702          info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1703          // Increment to the next round.
1704          self.try_increment_to_the_next_round(round + 1).await
1705      }
1706  
1707      /// Inserts the missing transmissions from the proposal into the workers.
1708      fn insert_missing_transmissions_into_workers(
1709          &self,
1710          peer_ip: SocketAddr,
1711          transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1712      ) -> Result<()> {
1713          // Insert the transmissions into the workers.
1714          assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1715              worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1716          })
1717      }
1718  
1719      /// Re-inserts the transmissions from the proposal into the workers.
1720      fn reinsert_transmissions_into_workers(
1721          &self,
1722          transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1723      ) -> Result<()> {
1724          // Re-insert the transmissions into the workers.
1725          assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1726              worker.reinsert(transmission_id, transmission);
1727          })
1728      }
1729  
1730      /// Recursively stores a given batch certificate, after ensuring:
1731      ///   - Ensure the round matches the committee round.
1732      ///   - Ensure the address is a member of the committee.
1733      ///   - Ensure the timestamp is within range.
1734      ///   - Ensure we have all of the transmissions.
1735      ///   - Ensure we have all of the previous certificates.
1736      ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1737      ///   - Ensure the previous certificates have reached the quorum threshold.
1738      ///   - Ensure we have not already signed the batch ID.
1739      #[async_recursion::async_recursion]
1740      async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1741          &self,
1742          peer_ip: SocketAddr,
1743          certificate: BatchCertificate<N>,
1744      ) -> Result<()> {
1745          // Retrieve the batch header.
1746          let batch_header = certificate.batch_header();
1747          // Retrieve the batch round.
1748          let batch_round = batch_header.round();
1749  
1750          // If the certificate round is outdated, do not store it.
1751          if batch_round <= self.storage.gc_round() {
1752              return Ok(());
1753          }
1754          // If the certificate already exists in storage, return early.
1755          if self.storage.contains_certificate(certificate.id()) {
1756              return Ok(());
1757          }
1758  
1759          // If node is not in sync mode and the node is not synced. Then return an error.
1760          if !IS_SYNCING && !self.is_synced() {
1761              bail!(
1762                  "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1763                  fmt_id(certificate.id())
1764              );
1765          }
1766  
1767          // If the peer is ahead, use the batch header to sync up to the peer.
1768          let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1769  
1770          // Check if the certificate needs to be stored.
1771          if !self.storage.contains_certificate(certificate.id()) {
1772              // Store the batch certificate.
1773              let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1774              spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1775              debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1776              // If a BFT sender was provided, send the round and certificate to the BFT.
1777              if let Some(bft_sender) = self.bft_sender.get() {
1778                  // Send the certificate to the BFT.
1779                  if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1780                      let err = err.context("Failed to update the BFT DAG from sync");
1781                      warn!("{}", &flatten_error(&err));
1782                      return Err(err);
1783                  };
1784              }
1785          }
1786          Ok(())
1787      }
1788  
1789      /// Recursively syncs using the given batch header.
1790      async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1791          &self,
1792          peer_ip: SocketAddr,
1793          batch_header: &BatchHeader<N>,
1794      ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1795          // Retrieve the batch round.
1796          let batch_round = batch_header.round();
1797  
1798          // If the certificate round is outdated, do not store it.
1799          if batch_round <= self.storage.gc_round() {
1800              bail!("Round {batch_round} is too far in the past")
1801          }
1802  
1803          // If node is not in sync mode and the node is not synced, then return an error.
1804          if !IS_SYNCING && !self.is_synced() {
1805              bail!(
1806                  "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1807                  fmt_id(batch_header.batch_id())
1808              );
1809          }
1810  
1811          // Determine if quorum threshold is reached on the batch round.
1812          let is_quorum_threshold_reached = {
1813              let authors = self.storage.get_certificate_authors_for_round(batch_round);
1814              let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1815              committee_lookback.is_quorum_threshold_reached(&authors)
1816          };
1817  
1818          // Check if our primary should move to the next round.
1819          // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1820          // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1821          // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1822          let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1823          // Check if our primary is far behind the peer.
1824          let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1825          // If our primary is far behind the peer, update our committee to the batch round.
1826          if is_behind_schedule || is_peer_far_in_future {
1827              // If the batch round is greater than the current committee round, update the committee.
1828              self.try_increment_to_the_next_round(batch_round).await?;
1829          }
1830  
1831          // Ensure the primary has all of the transmissions.
1832          let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1833  
1834          // Ensure the primary has all of the previous certificates.
1835          let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1836  
1837          // Wait for the missing transmissions and previous certificates to be fetched.
1838          let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1839              missing_transmissions_handle,
1840              missing_previous_certificates_handle,
1841          ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1842  
1843          // Iterate through the missing previous certificates.
1844          for batch_certificate in missing_previous_certificates {
1845              // Store the batch certificate (recursively fetching any missing previous certificates).
1846              self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1847          }
1848          Ok(missing_transmissions)
1849      }
1850  
1851      /// Fetches any missing transmissions for the specified batch header.
1852      /// If a transmission does not exist, it will be fetched from the specified peer IP.
1853      async fn fetch_missing_transmissions(
1854          &self,
1855          peer_ip: SocketAddr,
1856          batch_header: &BatchHeader<N>,
1857      ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1858          // If the round is <= the GC round, return early.
1859          if batch_header.round() <= self.storage.gc_round() {
1860              return Ok(Default::default());
1861          }
1862  
1863          // Ensure this batch ID is new, otherwise return early.
1864          if self.storage.contains_batch(batch_header.batch_id()) {
1865              trace!("Batch for round {} from peer has already been processed", batch_header.round());
1866              return Ok(Default::default());
1867          }
1868  
1869          // Retrieve the workers.
1870          let workers = self.workers.clone();
1871  
1872          // Initialize a list for the transmissions.
1873          let mut fetch_transmissions = FuturesUnordered::new();
1874  
1875          // Retrieve the number of workers.
1876          let num_workers = self.num_workers();
1877          // Iterate through the transmission IDs.
1878          for transmission_id in batch_header.transmission_ids() {
1879              // If the transmission does not exist in storage, proceed to fetch the transmission.
1880              if !self.storage.contains_transmission(*transmission_id) {
1881                  // Determine the worker ID.
1882                  let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1883                      bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1884                  };
1885                  // Retrieve the worker.
1886                  let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1887                  // Push the callback onto the list.
1888                  fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1889              }
1890          }
1891  
1892          // Initialize a set for the transmissions.
1893          let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1894          // Wait for all of the transmissions to be fetched.
1895          while let Some(result) = fetch_transmissions.next().await {
1896              // Retrieve the transmission.
1897              let (transmission_id, transmission) = result?;
1898              // Insert the transmission into the set.
1899              transmissions.insert(transmission_id, transmission);
1900          }
1901          // Return the transmissions.
1902          Ok(transmissions)
1903      }
1904  
1905      /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1906      async fn fetch_missing_previous_certificates(
1907          &self,
1908          peer_ip: SocketAddr,
1909          batch_header: &BatchHeader<N>,
1910      ) -> Result<HashSet<BatchCertificate<N>>> {
1911          // Retrieve the round.
1912          let round = batch_header.round();
1913          // If the previous round is 0, or is <= the GC round, return early.
1914          if round == 1 || round <= self.storage.gc_round() + 1 {
1915              return Ok(Default::default());
1916          }
1917  
1918          // Fetch the missing previous certificates.
1919          let missing_previous_certificates =
1920              self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1921          if !missing_previous_certificates.is_empty() {
1922              debug!(
1923                  "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1924                  missing_previous_certificates.len(),
1925              );
1926          }
1927          // Return the missing previous certificates.
1928          Ok(missing_previous_certificates)
1929      }
1930  
1931      /// Fetches any missing certificates for the specified batch header from the specified peer.
1932      async fn fetch_missing_certificates(
1933          &self,
1934          peer_ip: SocketAddr,
1935          round: u64,
1936          certificate_ids: &IndexSet<Field<N>>,
1937      ) -> Result<HashSet<BatchCertificate<N>>> {
1938          // Initialize a list for the missing certificates.
1939          let mut fetch_certificates = FuturesUnordered::new();
1940          // Initialize a set for the missing certificates.
1941          let mut missing_certificates = HashSet::default();
1942          // Iterate through the certificate IDs.
1943          for certificate_id in certificate_ids {
1944              // Check if the certificate already exists in the ledger.
1945              if self.ledger.contains_certificate(certificate_id)? {
1946                  continue;
1947              }
1948              // Check if the certificate already exists in storage.
1949              if self.storage.contains_certificate(*certificate_id) {
1950                  continue;
1951              }
1952              // If we have not fully processed the certificate yet, store it.
1953              if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1954                  missing_certificates.insert(certificate);
1955              } else {
1956                  // If we do not have the certificate, request it.
1957                  trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1958                  // TODO (howardwu): Limit the number of open requests we send to a peer.
1959                  // Send an certificate request to the peer.
1960                  fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1961              }
1962          }
1963  
1964          // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1965          match fetch_certificates.is_empty() {
1966              true => return Ok(missing_certificates),
1967              false => trace!(
1968                  "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1969                  fetch_certificates.len(),
1970              ),
1971          }
1972  
1973          // Wait for all of the missing certificates to be fetched.
1974          while let Some(result) = fetch_certificates.next().await {
1975              // Insert the missing certificate into the set.
1976              missing_certificates.insert(result?);
1977          }
1978          // Return the missing certificates.
1979          Ok(missing_certificates)
1980      }
1981  }
1982  
1983  impl<N: Network> Primary<N> {
1984      /// Spawns a task with the given future; it should only be used for long-running tasks.
1985      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1986          self.handles.lock().push(tokio::spawn(future));
1987      }
1988  
1989      /// Shuts down the primary.
1990      pub async fn shut_down(&self) {
1991          info!("Shutting down the primary...");
1992          // Shut down the workers.
1993          self.workers.iter().for_each(|worker| worker.shut_down());
1994          // Abort the tasks.
1995          self.handles.lock().iter().for_each(|handle| handle.abort());
1996          // Save the current proposal cache to disk.
1997          let proposal_cache = {
1998              let proposal = self.proposed_batch.write().take();
1999              let signed_proposals = self.signed_proposals.read().clone();
2000              let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
2001              let pending_certificates = self.storage.get_pending_certificates();
2002              ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2003          };
2004          if let Err(err) = proposal_cache.store(&self.storage_mode) {
2005              error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2006          }
2007          // Close the gateway.
2008          self.gateway.shut_down().await;
2009      }
2010  }
2011  
2012  #[cfg(test)]
2013  mod tests {
2014      use super::*;
2015      use alphaos_node_bft_ledger_service::MockLedgerService;
2016      use alphaos_node_bft_storage_service::BFTMemoryService;
2017      use alphaos_node_sync::{locators::test_helpers::sample_block_locators, BlockSync};
2018      use alphavm::{
2019          ledger::{
2020              committee::{Committee, MIN_VALIDATOR_STAKE},
2021              test_helpers::sample_execution_transaction_with_fee,
2022          },
2023          prelude::{Address, Signature},
2024      };
2025  
2026      use bytes::Bytes;
2027      use indexmap::IndexSet;
2028      use rand::RngCore;
2029  
2030      type CurrentNetwork = alphavm::prelude::MainnetV0;
2031  
2032      fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2033          // Create a committee containing the primary's account.
2034          const COMMITTEE_SIZE: usize = 4;
2035          let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2036          let mut members = IndexMap::new();
2037  
2038          for i in 0..COMMITTEE_SIZE {
2039              let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2040              let account = Account::new(rng).unwrap();
2041  
2042              members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2043              accounts.push((socket_addr, account));
2044          }
2045  
2046          (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2047      }
2048  
2049      // Returns a primary and a list of accounts in the configured committee.
2050      fn primary_with_committee(
2051          account_index: usize,
2052          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2053          committee: Committee<CurrentNetwork>,
2054          height: u32,
2055      ) -> Primary<CurrentNetwork> {
2056          let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2057          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2058  
2059          // Initialize the primary.
2060          let account = accounts[account_index].1.clone();
2061          let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2062          let mut primary =
2063              Primary::new(account, storage, ledger, block_sync, None, &[], false, StorageMode::new_test(None), None)
2064                  .unwrap();
2065  
2066          // Construct a worker instance.
2067          primary.workers = Arc::from([Worker::new(
2068              0, // id
2069              Arc::new(primary.gateway.clone()),
2070              primary.storage.clone(),
2071              primary.ledger.clone(),
2072              primary.proposed_batch.clone(),
2073          )
2074          .unwrap()]);
2075          for a in accounts.iter().skip(account_index) {
2076              primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2077          }
2078  
2079          primary
2080      }
2081  
2082      fn primary_without_handlers(
2083          rng: &mut TestRng,
2084      ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2085          let (accounts, committee) = sample_committee(rng);
2086          let primary = primary_with_committee(
2087              0, // index of primary's account
2088              &accounts,
2089              committee,
2090              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2091          );
2092  
2093          (primary, accounts)
2094      }
2095  
2096      // Creates a mock solution.
2097      fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2098          // Sample a random fake solution ID.
2099          let solution_id = rng.r#gen::<u64>().into();
2100          // Vary the size of the solutions.
2101          let size = rng.gen_range(1024..10 * 1024);
2102          // Sample random fake solution bytes.
2103          let mut vec = vec![0u8; size];
2104          rng.fill_bytes(&mut vec);
2105          let solution = Data::Buffer(Bytes::from(vec));
2106          // Return the solution ID and solution.
2107          (solution_id, solution)
2108      }
2109  
2110      // Samples a test transaction.
2111      fn sample_unconfirmed_transaction(
2112          rng: &mut TestRng,
2113      ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2114          let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2115          let id = transaction.id();
2116  
2117          (id, Data::Object(transaction))
2118      }
2119  
2120      // Creates a batch proposal with one solution and one transaction.
2121      fn create_test_proposal(
2122          author: &Account<CurrentNetwork>,
2123          committee: Committee<CurrentNetwork>,
2124          round: u64,
2125          previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2126          timestamp: i64,
2127          num_transactions: u64,
2128          rng: &mut TestRng,
2129      ) -> Proposal<CurrentNetwork> {
2130          let mut transmission_ids = IndexSet::new();
2131          let mut transmissions = IndexMap::new();
2132  
2133          // Prepare the solution and insert into the sets.
2134          let (solution_id, solution) = sample_unconfirmed_solution(rng);
2135          let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2136          let solution_transmission_id = (solution_id, solution_checksum).into();
2137          transmission_ids.insert(solution_transmission_id);
2138          transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2139  
2140          // Prepare the transactions and insert into the sets.
2141          for _ in 0..num_transactions {
2142              let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2143              let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2144              let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2145              transmission_ids.insert(transaction_transmission_id);
2146              transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2147          }
2148  
2149          // Retrieve the private key.
2150          let private_key = author.private_key();
2151          // Sign the batch header.
2152          let batch_header = BatchHeader::new(
2153              private_key,
2154              round,
2155              timestamp,
2156              committee.id(),
2157              transmission_ids,
2158              previous_certificate_ids,
2159              rng,
2160          )
2161          .unwrap();
2162          // Construct the proposal.
2163          Proposal::new(committee, batch_header, transmissions).unwrap()
2164      }
2165  
2166      // Creates a signature of the primary's current proposal for each committee member (excluding
2167      // the primary).
2168      fn peer_signatures_for_proposal(
2169          primary: &Primary<CurrentNetwork>,
2170          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2171          rng: &mut TestRng,
2172      ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2173          // Each committee member signs the batch.
2174          let mut signatures = Vec::with_capacity(accounts.len() - 1);
2175          for (socket_addr, account) in accounts {
2176              if account.address() == primary.gateway.account().address() {
2177                  continue;
2178              }
2179              let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2180              let signature = account.sign(&[batch_id], rng).unwrap();
2181              signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2182          }
2183  
2184          signatures
2185      }
2186  
2187      /// Creates a signature of the batch ID for each committee member (excluding the primary).
2188      fn peer_signatures_for_batch(
2189          primary_address: Address<CurrentNetwork>,
2190          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2191          batch_id: Field<CurrentNetwork>,
2192          rng: &mut TestRng,
2193      ) -> IndexSet<Signature<CurrentNetwork>> {
2194          let mut signatures = IndexSet::new();
2195          for (_, account) in accounts {
2196              if account.address() == primary_address {
2197                  continue;
2198              }
2199              let signature = account.sign(&[batch_id], rng).unwrap();
2200              signatures.insert(signature);
2201          }
2202          signatures
2203      }
2204  
2205      // Creates a batch certificate.
2206      fn create_batch_certificate(
2207          primary_address: Address<CurrentNetwork>,
2208          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2209          round: u64,
2210          previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2211          rng: &mut TestRng,
2212      ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2213          let timestamp = now();
2214  
2215          let author =
2216              accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2217          let private_key = author.private_key();
2218  
2219          let committee_id = Field::rand(rng);
2220          let (solution_id, solution) = sample_unconfirmed_solution(rng);
2221          let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2222          let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2223          let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2224  
2225          let solution_transmission_id = (solution_id, solution_checksum).into();
2226          let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2227  
2228          let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2229          let transmissions = [
2230              (solution_transmission_id, Transmission::Solution(solution)),
2231              (transaction_transmission_id, Transmission::Transaction(transaction)),
2232          ]
2233          .into();
2234  
2235          let batch_header = BatchHeader::new(
2236              private_key,
2237              round,
2238              timestamp,
2239              committee_id,
2240              transmission_ids,
2241              previous_certificate_ids,
2242              rng,
2243          )
2244          .unwrap();
2245          let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2246          let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2247          (certificate, transmissions)
2248      }
2249  
2250      // Create a certificate chain up to, but not including, the specified round in the primary storage.
2251      fn store_certificate_chain(
2252          primary: &Primary<CurrentNetwork>,
2253          accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2254          round: u64,
2255          rng: &mut TestRng,
2256      ) -> IndexSet<Field<CurrentNetwork>> {
2257          let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2258          let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2259          for cur_round in 1..round {
2260              for (_, account) in accounts.iter() {
2261                  let (certificate, transmissions) = create_batch_certificate(
2262                      account.address(),
2263                      accounts,
2264                      cur_round,
2265                      previous_certificates.clone(),
2266                      rng,
2267                  );
2268                  next_certificates.insert(certificate.id());
2269                  assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2270              }
2271  
2272              assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2273              previous_certificates = next_certificates;
2274              next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2275          }
2276  
2277          previous_certificates
2278      }
2279  
2280      // Insert the account socket addresses into the resolver so that
2281      // they are recognized as "connected".
2282      fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2283          // First account is primary, which doesn't need to resolve.
2284          for (addr, acct) in accounts.iter().skip(1) {
2285              primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2286          }
2287      }
2288  
2289      #[tokio::test]
2290      async fn test_propose_batch() {
2291          let mut rng = TestRng::default();
2292          let (primary, _) = primary_without_handlers(&mut rng);
2293  
2294          // Check there is no batch currently proposed.
2295          assert!(primary.proposed_batch.read().is_none());
2296  
2297          // Generate a solution and a transaction.
2298          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2299          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2300  
2301          // Store it on one of the workers.
2302          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2303          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2304  
2305          // Try to propose a batch again. This time, it should succeed.
2306          assert!(primary.propose_batch().await.is_ok());
2307          assert!(primary.proposed_batch.read().is_some());
2308      }
2309  
2310      #[tokio::test]
2311      async fn test_propose_batch_with_no_transmissions() {
2312          let mut rng = TestRng::default();
2313          let (primary, _) = primary_without_handlers(&mut rng);
2314  
2315          // Check there is no batch currently proposed.
2316          assert!(primary.proposed_batch.read().is_none());
2317  
2318          // Try to propose a batch with no transmissions.
2319          assert!(primary.propose_batch().await.is_ok());
2320          assert!(primary.proposed_batch.read().is_some());
2321      }
2322  
2323      #[tokio::test]
2324      async fn test_propose_batch_in_round() {
2325          let round = 3;
2326          let mut rng = TestRng::default();
2327          let (primary, accounts) = primary_without_handlers(&mut rng);
2328  
2329          // Fill primary storage.
2330          store_certificate_chain(&primary, &accounts, round, &mut rng);
2331  
2332          // Sleep for a while to ensure the primary is ready to propose the next round.
2333          tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2334  
2335          // Generate a solution and a transaction.
2336          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2337          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2338  
2339          // Store it on one of the workers.
2340          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2341          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2342  
2343          // Propose a batch again. This time, it should succeed.
2344          assert!(primary.propose_batch().await.is_ok());
2345          assert!(primary.proposed_batch.read().is_some());
2346      }
2347  
2348      #[tokio::test]
2349      async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2350          let round = 3;
2351          let prev_round = round - 1;
2352          let mut rng = TestRng::default();
2353          let (primary, accounts) = primary_without_handlers(&mut rng);
2354          let peer_account = &accounts[1];
2355          let peer_ip = peer_account.0;
2356  
2357          // Fill primary storage.
2358          store_certificate_chain(&primary, &accounts, round, &mut rng);
2359  
2360          // Get transmissions from previous certificates.
2361          let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2362  
2363          // Track the number of transmissions in the previous round.
2364          let mut num_transmissions_in_previous_round = 0;
2365  
2366          // Generate a solution and a transaction.
2367          let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2368          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2369          let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2370          let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2371  
2372          // Store it on one of the workers.
2373          primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2374          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2375  
2376          // Check that the worker has 2 transmissions.
2377          assert_eq!(primary.workers[0].num_transmissions(), 2);
2378  
2379          // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2380          for (_, account) in accounts.iter() {
2381              let (certificate, transmissions) = create_batch_certificate(
2382                  account.address(),
2383                  &accounts,
2384                  round,
2385                  previous_certificate_ids.clone(),
2386                  &mut rng,
2387              );
2388  
2389              // Add the transmissions to the worker.
2390              for (transmission_id, transmission) in transmissions.iter() {
2391                  primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2392              }
2393  
2394              // Insert the certificate to storage.
2395              num_transmissions_in_previous_round += transmissions.len();
2396              primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2397          }
2398  
2399          // Sleep for a while to ensure the primary is ready to propose the next round.
2400          tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2401  
2402          // Advance to the next round.
2403          assert!(primary.storage.increment_to_next_round(round).is_ok());
2404  
2405          // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2406          assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2407  
2408          // Propose the batch.
2409          assert!(primary.propose_batch().await.is_ok());
2410  
2411          // Check that the proposal only contains the new transmissions that were not in previous certificates.
2412          let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2413          assert_eq!(proposed_transmissions.len(), 2);
2414          assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2415          assert!(proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum)));
2416      }
2417  
2418      #[tokio::test]
2419      async fn test_propose_batch_over_spend_limit() {
2420          let mut rng = TestRng::default();
2421  
2422          // Create a primary to test spend limit backwards compatibility with V4.
2423          let (accounts, committee) = sample_committee(&mut rng);
2424          let primary = primary_with_committee(
2425              0,
2426              &accounts,
2427              committee.clone(),
2428              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2429          );
2430  
2431          // Check there is no batch currently proposed.
2432          assert!(primary.proposed_batch.read().is_none());
2433          // Check the workers are empty.
2434          primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2435  
2436          // Generate a solution and transactions.
2437          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2438          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2439  
2440          for _i in 0..5 {
2441              let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2442              // Store it on one of the workers.
2443              primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2444          }
2445  
2446          // Try to propose a batch again. This time, it should succeed.
2447          assert!(primary.propose_batch().await.is_ok());
2448          // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2449          assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2450          // Check the transmissions were correctly drained from the workers.
2451          assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2452      }
2453  
2454      #[tokio::test]
2455      async fn test_batch_propose_from_peer() {
2456          let mut rng = TestRng::default();
2457          let (primary, accounts) = primary_without_handlers(&mut rng);
2458  
2459          // Create a valid proposal with an author that isn't the primary.
2460          let round = 1;
2461          let peer_account = &accounts[1];
2462          let peer_ip = peer_account.0;
2463          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2464          let proposal = create_test_proposal(
2465              &peer_account.1,
2466              primary.ledger.current_committee().unwrap(),
2467              round,
2468              Default::default(),
2469              timestamp,
2470              1,
2471              &mut rng,
2472          );
2473  
2474          // Make sure the primary is aware of the transmissions in the proposal.
2475          for (transmission_id, transmission) in proposal.transmissions() {
2476              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2477          }
2478  
2479          // The author must be known to resolver to pass propose checks.
2480          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2481  
2482          // The primary will only consider itself synced if we received
2483          // block locators from a peer.
2484          primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2485          primary.sync.testing_only_try_block_sync_testing_only().await;
2486  
2487          // Try to process the batch proposal from the peer, should succeed.
2488          assert!(primary
2489              .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2490              .await
2491              .is_ok());
2492      }
2493  
2494      #[tokio::test]
2495      async fn test_batch_propose_from_peer_when_not_synced() {
2496          let mut rng = TestRng::default();
2497          let (primary, accounts) = primary_without_handlers(&mut rng);
2498  
2499          // Create a valid proposal with an author that isn't the primary.
2500          let round = 1;
2501          let peer_account = &accounts[1];
2502          let peer_ip = peer_account.0;
2503          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2504          let proposal = create_test_proposal(
2505              &peer_account.1,
2506              primary.ledger.current_committee().unwrap(),
2507              round,
2508              Default::default(),
2509              timestamp,
2510              1,
2511              &mut rng,
2512          );
2513  
2514          // Make sure the primary is aware of the transmissions in the proposal.
2515          for (transmission_id, transmission) in proposal.transmissions() {
2516              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2517          }
2518  
2519          // The author must be known to resolver to pass propose checks.
2520          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2521  
2522          // Add a high block locator to indicate we are not synced.
2523          primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2524  
2525          // Try to process the batch proposal from the peer, should fail
2526          assert!(primary
2527              .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2528              .await
2529              .is_err());
2530      }
2531  
2532      #[tokio::test]
2533      async fn test_batch_propose_from_peer_in_round() {
2534          let round = 2;
2535          let mut rng = TestRng::default();
2536          let (primary, accounts) = primary_without_handlers(&mut rng);
2537  
2538          // Generate certificates.
2539          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2540  
2541          // Create a valid proposal with an author that isn't the primary.
2542          let peer_account = &accounts[1];
2543          let peer_ip = peer_account.0;
2544          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2545          let proposal = create_test_proposal(
2546              &peer_account.1,
2547              primary.ledger.current_committee().unwrap(),
2548              round,
2549              previous_certificates,
2550              timestamp,
2551              1,
2552              &mut rng,
2553          );
2554  
2555          // Make sure the primary is aware of the transmissions in the proposal.
2556          for (transmission_id, transmission) in proposal.transmissions() {
2557              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2558          }
2559  
2560          // The author must be known to resolver to pass propose checks.
2561          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2562  
2563          // The primary will only consider itself synced if we received
2564          // block locators from a peer.
2565          primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2566          primary.sync.testing_only_try_block_sync_testing_only().await;
2567  
2568          // Try to process the batch proposal from the peer, should succeed.
2569          primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2570      }
2571  
2572      #[tokio::test]
2573      async fn test_batch_propose_from_peer_wrong_round() {
2574          let mut rng = TestRng::default();
2575          let (primary, accounts) = primary_without_handlers(&mut rng);
2576  
2577          // Create a valid proposal with an author that isn't the primary.
2578          let round = 1;
2579          let peer_account = &accounts[1];
2580          let peer_ip = peer_account.0;
2581          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2582          let proposal = create_test_proposal(
2583              &peer_account.1,
2584              primary.ledger.current_committee().unwrap(),
2585              round,
2586              Default::default(),
2587              timestamp,
2588              1,
2589              &mut rng,
2590          );
2591  
2592          // Make sure the primary is aware of the transmissions in the proposal.
2593          for (transmission_id, transmission) in proposal.transmissions() {
2594              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2595          }
2596  
2597          // The author must be known to resolver to pass propose checks.
2598          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2599          // The primary must be considered synced.
2600          primary.sync.testing_only_try_block_sync_testing_only().await;
2601  
2602          // Try to process the batch proposal from the peer, should error.
2603          assert!(primary
2604              .process_batch_propose_from_peer(peer_ip, BatchPropose {
2605                  round: round + 1,
2606                  batch_header: Data::Object(proposal.batch_header().clone())
2607              })
2608              .await
2609              .is_err());
2610      }
2611  
2612      #[tokio::test]
2613      async fn test_batch_propose_from_peer_in_round_wrong_round() {
2614          let round = 4;
2615          let mut rng = TestRng::default();
2616          let (primary, accounts) = primary_without_handlers(&mut rng);
2617  
2618          // Generate certificates.
2619          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2620  
2621          // Create a valid proposal with an author that isn't the primary.
2622          let peer_account = &accounts[1];
2623          let peer_ip = peer_account.0;
2624          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2625          let proposal = create_test_proposal(
2626              &peer_account.1,
2627              primary.ledger.current_committee().unwrap(),
2628              round,
2629              previous_certificates,
2630              timestamp,
2631              1,
2632              &mut rng,
2633          );
2634  
2635          // Make sure the primary is aware of the transmissions in the proposal.
2636          for (transmission_id, transmission) in proposal.transmissions() {
2637              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2638          }
2639  
2640          // The author must be known to resolver to pass propose checks.
2641          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2642          // The primary must be considered synced.
2643          primary.sync.testing_only_try_block_sync_testing_only().await;
2644  
2645          // Try to process the batch proposal from the peer, should error.
2646          assert!(primary
2647              .process_batch_propose_from_peer(peer_ip, BatchPropose {
2648                  round: round + 1,
2649                  batch_header: Data::Object(proposal.batch_header().clone())
2650              })
2651              .await
2652              .is_err());
2653      }
2654  
2655      /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2656      #[tokio::test]
2657      async fn test_batch_propose_from_peer_with_past_timestamp() {
2658          let round = 2;
2659          let mut rng = TestRng::default();
2660          let (primary, accounts) = primary_without_handlers(&mut rng);
2661  
2662          // Generate certificates.
2663          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2664  
2665          // Create a valid proposal with an author that isn't the primary.
2666          let peer_account = &accounts[1];
2667          let peer_ip = peer_account.0;
2668  
2669          // Use a timestamp that is too early.
2670          // Set it to something that is less than the minimum batch delay
2671          // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2672          let last_timestamp = primary
2673              .storage
2674              .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2675              .expect("No previous proposal exists")
2676              .timestamp();
2677          let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2678  
2679          let proposal = create_test_proposal(
2680              &peer_account.1,
2681              primary.ledger.current_committee().unwrap(),
2682              round,
2683              previous_certificates,
2684              invalid_timestamp,
2685              1,
2686              &mut rng,
2687          );
2688  
2689          // Make sure the primary is aware of the transmissions in the proposal.
2690          for (transmission_id, transmission) in proposal.transmissions() {
2691              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2692          }
2693  
2694          // The author must be known to resolver to pass propose checks.
2695          primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2696          // The primary must be considered synced.
2697          primary.sync.testing_only_try_block_sync_testing_only().await;
2698  
2699          // Try to process the batch proposal from the peer, should error.
2700          assert!(primary
2701              .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2702              .await
2703              .is_err());
2704      }
2705  
2706      /// Check that proposals rejected that have timestamps older than the previous proposal.
2707      #[tokio::test]
2708      async fn test_batch_propose_from_peer_over_spend_limit() {
2709          let mut rng = TestRng::default();
2710  
2711          // Create two primaries to test spend limit activation on V5.
2712          let (accounts, committee) = sample_committee(&mut rng);
2713          let primary_v4 = primary_with_committee(
2714              0,
2715              &accounts,
2716              committee.clone(),
2717              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2718          );
2719          let primary_v5 = primary_with_committee(
2720              1,
2721              &accounts,
2722              committee.clone(),
2723              CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2724          );
2725  
2726          // Create a valid proposal with an author that isn't the primary.
2727          let round = 1;
2728          let peer_account = &accounts[2];
2729          let peer_ip = peer_account.0;
2730  
2731          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2732  
2733          let proposal =
2734              create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2735  
2736          // Make sure the primary is aware of the transmissions in the proposal.
2737          for (transmission_id, transmission) in proposal.transmissions() {
2738              primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2739              primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2740          }
2741  
2742          // The author must be known to resolver to pass propose checks.
2743          primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2744          primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2745  
2746          // primary v4 must be considered synced.
2747          primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2748          primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2749  
2750          // primary v5 must be ocnsidered synced.
2751          primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2752          primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2753  
2754          // Check the spend limit is enforced from V5 onwards.
2755          assert!(primary_v4
2756              .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2757              .await
2758              .is_ok());
2759  
2760          assert!(primary_v5
2761              .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2762              .await
2763              .is_err());
2764      }
2765  
2766      #[tokio::test]
2767      async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2768          let round = 3;
2769          let mut rng = TestRng::default();
2770          let (primary, _) = primary_without_handlers(&mut rng);
2771  
2772          // Check there is no batch currently proposed.
2773          assert!(primary.proposed_batch.read().is_none());
2774  
2775          // Generate a solution and a transaction.
2776          let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2777          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2778  
2779          // Store it on one of the workers.
2780          primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2781          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2782  
2783          // Set the proposal lock to a round ahead of the storage.
2784          let old_proposal_lock_round = *primary.propose_lock.lock().await;
2785          *primary.propose_lock.lock().await = round + 1;
2786  
2787          // Propose a batch and enforce that it fails.
2788          assert!(primary.propose_batch().await.is_ok());
2789          assert!(primary.proposed_batch.read().is_none());
2790  
2791          // Set the proposal lock back to the old round.
2792          *primary.propose_lock.lock().await = old_proposal_lock_round;
2793  
2794          // Try to propose a batch again. This time, it should succeed.
2795          assert!(primary.propose_batch().await.is_ok());
2796          assert!(primary.proposed_batch.read().is_some());
2797      }
2798  
2799      #[tokio::test]
2800      async fn test_propose_batch_with_storage_round_behind_proposal() {
2801          let round = 5;
2802          let mut rng = TestRng::default();
2803          let (primary, accounts) = primary_without_handlers(&mut rng);
2804  
2805          // Generate previous certificates.
2806          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2807  
2808          // Create a valid proposal.
2809          let timestamp = now();
2810          let proposal = create_test_proposal(
2811              primary.gateway.account(),
2812              primary.ledger.current_committee().unwrap(),
2813              round + 1,
2814              previous_certificates,
2815              timestamp,
2816              1,
2817              &mut rng,
2818          );
2819  
2820          // Store the proposal on the primary.
2821          *primary.proposed_batch.write() = Some(proposal);
2822  
2823          // Try to propose a batch will terminate early because the storage is behind the proposal.
2824          assert!(primary.propose_batch().await.is_ok());
2825          assert!(primary.proposed_batch.read().is_some());
2826          assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2827      }
2828  
2829      #[tokio::test(flavor = "multi_thread")]
2830      async fn test_batch_signature_from_peer() {
2831          let mut rng = TestRng::default();
2832          let (primary, accounts) = primary_without_handlers(&mut rng);
2833          map_account_addresses(&primary, &accounts);
2834  
2835          // Create a valid proposal.
2836          let round = 1;
2837          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2838          let proposal = create_test_proposal(
2839              primary.gateway.account(),
2840              primary.ledger.current_committee().unwrap(),
2841              round,
2842              Default::default(),
2843              timestamp,
2844              1,
2845              &mut rng,
2846          );
2847  
2848          // Store the proposal on the primary.
2849          *primary.proposed_batch.write() = Some(proposal);
2850  
2851          // Each committee member signs the batch.
2852          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2853  
2854          // Have the primary process the signatures.
2855          for (socket_addr, signature) in signatures {
2856              primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2857          }
2858  
2859          // Check the certificate was created and stored by the primary.
2860          assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2861          // Check the round was incremented.
2862          assert_eq!(primary.current_round(), round + 1);
2863      }
2864  
2865      #[tokio::test(flavor = "multi_thread")]
2866      async fn test_batch_signature_from_peer_in_round() {
2867          let round = 5;
2868          let mut rng = TestRng::default();
2869          let (primary, accounts) = primary_without_handlers(&mut rng);
2870          map_account_addresses(&primary, &accounts);
2871  
2872          // Generate certificates.
2873          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2874  
2875          // Create a valid proposal.
2876          let timestamp = now();
2877          let proposal = create_test_proposal(
2878              primary.gateway.account(),
2879              primary.ledger.current_committee().unwrap(),
2880              round,
2881              previous_certificates,
2882              timestamp,
2883              1,
2884              &mut rng,
2885          );
2886  
2887          // Store the proposal on the primary.
2888          *primary.proposed_batch.write() = Some(proposal);
2889  
2890          // Each committee member signs the batch.
2891          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2892  
2893          // Have the primary process the signatures.
2894          for (socket_addr, signature) in signatures {
2895              primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2896          }
2897  
2898          // Check the certificate was created and stored by the primary.
2899          assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2900          // Check the round was incremented.
2901          assert_eq!(primary.current_round(), round + 1);
2902      }
2903  
2904      #[tokio::test]
2905      async fn test_batch_signature_from_peer_no_quorum() {
2906          let mut rng = TestRng::default();
2907          let (primary, accounts) = primary_without_handlers(&mut rng);
2908          map_account_addresses(&primary, &accounts);
2909  
2910          // Create a valid proposal.
2911          let round = 1;
2912          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2913          let proposal = create_test_proposal(
2914              primary.gateway.account(),
2915              primary.ledger.current_committee().unwrap(),
2916              round,
2917              Default::default(),
2918              timestamp,
2919              1,
2920              &mut rng,
2921          );
2922  
2923          // Store the proposal on the primary.
2924          *primary.proposed_batch.write() = Some(proposal);
2925  
2926          // Each committee member signs the batch.
2927          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2928  
2929          // Have the primary process only one signature, mimicking a lack of quorum.
2930          let (socket_addr, signature) = signatures.first().unwrap();
2931          primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2932  
2933          // Check the certificate was not created and stored by the primary.
2934          assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2935          // Check the round was incremented.
2936          assert_eq!(primary.current_round(), round);
2937      }
2938  
2939      #[tokio::test]
2940      async fn test_batch_signature_from_peer_in_round_no_quorum() {
2941          let round = 7;
2942          let mut rng = TestRng::default();
2943          let (primary, accounts) = primary_without_handlers(&mut rng);
2944          map_account_addresses(&primary, &accounts);
2945  
2946          // Generate certificates.
2947          let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2948  
2949          // Create a valid proposal.
2950          let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2951          let proposal = create_test_proposal(
2952              primary.gateway.account(),
2953              primary.ledger.current_committee().unwrap(),
2954              round,
2955              previous_certificates,
2956              timestamp,
2957              1,
2958              &mut rng,
2959          );
2960  
2961          // Store the proposal on the primary.
2962          *primary.proposed_batch.write() = Some(proposal);
2963  
2964          // Each committee member signs the batch.
2965          let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2966  
2967          // Have the primary process only one signature, mimicking a lack of quorum.
2968          let (socket_addr, signature) = signatures.first().unwrap();
2969          primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2970  
2971          // Check the certificate was not created and stored by the primary.
2972          assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2973          // Check the round was incremented.
2974          assert_eq!(primary.current_round(), round);
2975      }
2976  
2977      #[tokio::test]
2978      async fn test_insert_certificate_with_aborted_transmissions() {
2979          let round = 3;
2980          let prev_round = round - 1;
2981          let mut rng = TestRng::default();
2982          let (primary, accounts) = primary_without_handlers(&mut rng);
2983          let peer_account = &accounts[1];
2984          let peer_ip = peer_account.0;
2985  
2986          // Fill primary storage.
2987          store_certificate_chain(&primary, &accounts, round, &mut rng);
2988  
2989          // Get transmissions from previous certificates.
2990          let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2991  
2992          // Generate a solution and a transaction.
2993          let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2994          let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2995  
2996          // Store it on one of the workers.
2997          primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2998          primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2999  
3000          // Check that the worker has 2 transmissions.
3001          assert_eq!(primary.workers[0].num_transmissions(), 2);
3002  
3003          // Create certificates for the current round.
3004          let account = accounts[0].1.clone();
3005          let (certificate, transmissions) =
3006              create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3007          let certificate_id = certificate.id();
3008  
3009          // Randomly abort some of the transmissions.
3010          let mut aborted_transmissions = HashSet::new();
3011          let mut transmissions_without_aborted = HashMap::new();
3012          for (transmission_id, transmission) in transmissions.clone() {
3013              match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3014                  true => {
3015                      // Insert the aborted transmission.
3016                      aborted_transmissions.insert(transmission_id);
3017                  }
3018                  false => {
3019                      // Insert the transmission without the aborted transmission.
3020                      transmissions_without_aborted.insert(transmission_id, transmission);
3021                  }
3022              };
3023          }
3024  
3025          // Add the non-aborted transmissions to the worker.
3026          for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3027              primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3028          }
3029  
3030          // Check that inserting the transmission with missing transmissions fails.
3031          assert!(primary
3032              .storage
3033              .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3034              .is_err());
3035          assert!(primary
3036              .storage
3037              .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3038              .is_err());
3039  
3040          // Insert the certificate to storage.
3041          primary
3042              .storage
3043              .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3044              .unwrap();
3045  
3046          // Ensure the certificate exists in storage.
3047          assert!(primary.storage.contains_certificate(certificate_id));
3048          // Ensure that the aborted transmission IDs exist in storage.
3049          for aborted_transmission_id in aborted_transmissions {
3050              assert!(primary.storage.contains_transmission(aborted_transmission_id));
3051              assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3052          }
3053      }
3054  }