/ node / bft / src / helpers / storage.rs
storage.rs
   1  // Copyright (c) 2025-2026 ACDC Network
   2  // This file is part of the alphaos library.
   3  //
   4  // Alpha Chain | Delta Chain Protocol
   5  // International Monetary Graphite.
   6  //
   7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
   8  // They built world-class ZK infrastructure. We installed the EASY button.
   9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
  10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
  11  //
  12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
  13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
  14  // No rights reserved. No permission required. No warranty. No refunds.
  15  //
  16  // https://creativecommons.org/publicdomain/zero/1.0/
  17  // SPDX-License-Identifier: CC0-1.0
  18  
  19  use crate::helpers::{check_timestamp_for_liveness, fmt_id};
  20  use alphaos_node_bft_ledger_service::LedgerService;
  21  use alphaos_node_bft_storage_service::StorageService;
  22  use alphavm::{
  23      ledger::{
  24          block::{Block, Transaction},
  25          narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID},
  26      },
  27      prelude::{anyhow, bail, ensure, Address, Field, Network, Result},
  28      utilities::{cfg_into_iter, cfg_iter, cfg_sorted_by, flatten_error},
  29  };
  30  
  31  use anyhow::Context;
  32  use indexmap::{map::Entry, IndexMap, IndexSet};
  33  #[cfg(feature = "locktick")]
  34  use locktick::parking_lot::RwLock;
  35  use lru::LruCache;
  36  #[cfg(not(feature = "locktick"))]
  37  use parking_lot::RwLock;
  38  #[cfg(not(feature = "serial"))]
  39  use rayon::prelude::*;
  40  use std::{
  41      collections::{HashMap, HashSet},
  42      num::NonZeroUsize,
  43      sync::{
  44          atomic::{AtomicU32, AtomicU64, Ordering},
  45          Arc,
  46      },
  47  };
  48  
  49  #[derive(Clone, Debug)]
  50  pub struct Storage<N: Network>(Arc<StorageInner<N>>);
  51  
  52  impl<N: Network> std::ops::Deref for Storage<N> {
  53      type Target = Arc<StorageInner<N>>;
  54  
  55      fn deref(&self) -> &Self::Target {
  56          &self.0
  57      }
  58  }
  59  
  60  /// The storage for the memory pool.
  61  ///
  62  /// The storage is used to store the following:
  63  /// - `current_height` tracker.
  64  /// - `current_round` tracker.
  65  /// - `round` to `(certificate ID, batch ID, author)` entries.
  66  /// - `certificate ID` to `certificate` entries.
  67  /// - `batch ID` to `round` entries.
  68  /// - `transmission ID` to `(transmission, certificate IDs)` entries.
  69  ///
  70  /// The chain of events is as follows:
  71  /// 1. A `transmission` is received.
  72  /// 2. After a `batch` is ready to be stored:
  73  ///   - The `certificate` is inserted, triggering updates to the
  74  ///     `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
  75  ///   - The missing `transmissions` from storage are inserted into the `transmissions` map.
  76  ///   - The certificate ID is inserted into the `transmissions` map.
  77  /// 3. After a `round` reaches quorum threshold:
  78  ///  - The next round is inserted into the `current_round`.
  79  #[derive(Debug)]
  80  pub struct StorageInner<N: Network> {
  81      /// The ledger service.
  82      ledger: Arc<dyn LedgerService<N>>,
  83      /* Once per block */
  84      /// The current height.
  85      current_height: AtomicU32,
  86      /* Once per round */
  87      /// The current round.
  88      ///
  89      /// Invariant: current_round > 0.
  90      /// This is established in [`Storage::new`], which sets it to at least 1 via [`Storage::update_current_round`].
  91      /// The only callers of [`Storage::update_current_round`] are
  92      /// [`Storage::increment_to_next_round`] and [`Storage::sync_round_with_block`],
  93      /// both of which set it to at least 1.
  94      current_round: AtomicU64,
  95      /// The `round` for which garbage collection has occurred **up to** (inclusive).
  96      gc_round: AtomicU64,
  97      /// The maximum number of rounds to keep in storage.
  98      max_gc_rounds: u64,
  99      /* Once per batch */
 100      /// The map of `round` to a list of `(certificate ID, author)` entries.
 101      rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Address<N>)>>>,
 102      /// A cache of `certificate ID` to unprocessed `certificate`.
 103      unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>,
 104      /// The map of `certificate ID` to `certificate`.
 105      certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>,
 106      /// The map of `certificate ID` to `round`.
 107      batch_ids: RwLock<IndexMap<Field<N>, u64>>,
 108      /// The map of `transmission ID` to `(transmission, certificate IDs)` entries.
 109      transmissions: Arc<dyn StorageService<N>>,
 110  }
 111  
 112  impl<N: Network> Storage<N> {
 113      /// Initializes a new instance of storage.
 114      pub fn new(
 115          ledger: Arc<dyn LedgerService<N>>,
 116          transmissions: Arc<dyn StorageService<N>>,
 117          max_gc_rounds: u64,
 118      ) -> Self {
 119          // Retrieve the latest committee bonded in the ledger
 120          // (genesis committee if the ledger contains only the genesis block).
 121          let committee = ledger.current_committee().expect("Ledger is missing a committee.");
 122          // Retrieve the round at which that committee was created, or 1 if it is the genesis committee.
 123          let current_round = committee.starting_round().max(1);
 124          // Set the unprocessed certificates cache size.
 125          let unprocessed_cache_size = NonZeroUsize::new((N::LATEST_MAX_CERTIFICATES().unwrap() * 2) as usize).unwrap();
 126  
 127          // Create the storage.
 128          let storage = Self(Arc::new(StorageInner {
 129              ledger,
 130              current_height: Default::default(),
 131              current_round: AtomicU64::new(current_round),
 132              gc_round: Default::default(),
 133              max_gc_rounds,
 134              rounds: Default::default(),
 135              unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)),
 136              certificates: Default::default(),
 137              batch_ids: Default::default(),
 138              transmissions,
 139          }));
 140          // Perform GC on the current round.
 141          // Since there are no certificates yet, this only sets `gc_round`.
 142          storage.garbage_collect_certificates(current_round);
 143          // Return the storage.
 144          storage
 145      }
 146  }
 147  
 148  impl<N: Network> Storage<N> {
 149      /// Returns the current height.
 150      pub fn current_height(&self) -> u32 {
 151          // Get the current height.
 152          self.current_height.load(Ordering::SeqCst)
 153      }
 154  }
 155  
 156  impl<N: Network> Storage<N> {
 157      /// Returns the current round.
 158      pub fn current_round(&self) -> u64 {
 159          // Get the current round.
 160          self.current_round.load(Ordering::SeqCst)
 161      }
 162  
 163      /// Returns the `round` that garbage collection has occurred **up to** (inclusive).
 164      pub fn gc_round(&self) -> u64 {
 165          // Get the GC round.
 166          self.gc_round.load(Ordering::SeqCst)
 167      }
 168  
 169      /// Returns the maximum number of rounds to keep in storage.
 170      pub fn max_gc_rounds(&self) -> u64 {
 171          self.max_gc_rounds
 172      }
 173  
 174      /// Increments storage to the next round, updating the current round.
 175      /// Note: This method is only called once per round, upon certification of the primary's batch.
 176      pub fn increment_to_next_round(&self, current_round: u64) -> Result<u64> {
 177          // Determine the next round.
 178          let next_round = current_round + 1;
 179  
 180          // Check if the next round is less than the current round in storage.
 181          {
 182              // Retrieve the storage round.
 183              let storage_round = self.current_round();
 184              // If the next round is less than the current round in storage, return early with the storage round.
 185              if next_round < storage_round {
 186                  return Ok(storage_round);
 187              }
 188  
 189              trace!("Incrementing storage from round {storage_round} to {next_round}");
 190          }
 191  
 192          // Retrieve the current committee.
 193          let current_committee = self.ledger.current_committee()?;
 194          // Retrieve the current committee's starting round.
 195          let starting_round = current_committee.starting_round();
 196          // If the primary is behind the current committee's starting round, sync with the latest block.
 197          if next_round < starting_round {
 198              // Retrieve the latest block round.
 199              let latest_block_round = self.ledger.latest_round();
 200              // Log the round sync.
 201              info!(
 202                  "Syncing primary round ({next_round}) with the current committee's starting round ({starting_round}). Syncing with the latest block round {latest_block_round}..."
 203              );
 204              // Sync the round with the latest block.
 205              self.sync_round_with_block(latest_block_round);
 206              // Return the latest block round.
 207              return Ok(latest_block_round);
 208          }
 209  
 210          // Update the storage to the next round.
 211          self.update_current_round(next_round);
 212  
 213          #[cfg(feature = "metrics")]
 214          metrics::gauge(metrics::bft::LAST_STORED_ROUND, next_round as f64);
 215  
 216          // Retrieve the storage round.
 217          let storage_round = self.current_round();
 218          // Retrieve the GC round.
 219          let gc_round = self.gc_round();
 220          // Ensure the next round matches in storage.
 221          ensure!(next_round == storage_round, "The next round {next_round} does not match in storage ({storage_round})");
 222          // Ensure the next round is greater than or equal to the GC round.
 223          ensure!(next_round >= gc_round, "The next round {next_round} is behind the GC round {gc_round}");
 224  
 225          // Log the updated round.
 226          info!("Starting round {next_round}...");
 227          Ok(next_round)
 228      }
 229  
 230      /// Updates the storage to the next round.
 231      fn update_current_round(&self, next_round: u64) {
 232          // Update the current round.
 233          self.current_round.store(next_round, Ordering::SeqCst);
 234      }
 235  
 236      /// Update the storage by performing garbage collection based on the next round.
 237      pub(crate) fn garbage_collect_certificates(&self, next_round: u64) {
 238          // Fetch the current GC round.
 239          let current_gc_round = self.gc_round();
 240          // Compute the next GC round.
 241          let next_gc_round = next_round.saturating_sub(self.max_gc_rounds);
 242          // Check if storage needs to be garbage collected.
 243          if next_gc_round > current_gc_round {
 244              // Remove the GC round(s) from storage.
 245              for gc_round in current_gc_round..=next_gc_round {
 246                  // Iterate over the certificates for the GC round.
 247                  for id in self.get_certificate_ids_for_round(gc_round).into_iter() {
 248                      trace!(
 249                          "Garbage collecting certificate {id} at round {gc_round} (cut-off is round {next_gc_round})"
 250                      );
 251                      self.remove_certificate(id);
 252                  }
 253              }
 254              // Update the GC round.
 255              self.gc_round.store(next_gc_round, Ordering::SeqCst);
 256          }
 257      }
 258  }
 259  
 260  impl<N: Network> Storage<N> {
 261      /// Returns `true` if the storage contains the specified `round`.
 262      pub fn contains_certificates_for_round(&self, round: u64) -> bool {
 263          // Check if the round exists in storage.
 264          self.rounds.read().contains_key(&round)
 265      }
 266  
 267      /// Returns `true` if the storage contains the specified `certificate ID`.
 268      pub fn contains_certificate(&self, certificate_id: Field<N>) -> bool {
 269          // Check if the certificate ID exists in storage.
 270          self.certificates.read().contains_key(&certificate_id)
 271      }
 272  
 273      /// Returns `true` if the storage contains a certificate from the specified `author` in the given `round`.
 274      pub fn contains_certificate_in_round_from(&self, round: u64, author: Address<N>) -> bool {
 275          self.rounds.read().get(&round).is_some_and(|set| set.iter().any(|(_, a)| a == &author))
 276      }
 277  
 278      /// Returns `true` if the storage contains the specified `certificate ID`.
 279      pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool {
 280          // Check if the certificate ID exists in storage.
 281          self.unprocessed_certificates.read().contains(&certificate_id)
 282      }
 283  
 284      /// Returns `true` if the storage contains the specified `batch ID`.
 285      pub fn contains_batch(&self, batch_id: Field<N>) -> bool {
 286          // Check if the batch ID exists in storage.
 287          self.batch_ids.read().contains_key(&batch_id)
 288      }
 289  
 290      /// Returns `true` if the storage contains the specified `transmission ID`.
 291      pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
 292          self.transmissions.contains_transmission(transmission_id.into())
 293      }
 294  
 295      /// Returns the transmission for the given `transmission ID`.
 296      /// If the transmission ID does not exist in storage, `None` is returned.
 297      pub fn get_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
 298          self.transmissions.get_transmission(transmission_id.into())
 299      }
 300  
 301      /// Returns the round for the given `certificate ID`.
 302      /// If the certificate ID does not exist in storage, `None` is returned.
 303      pub fn get_round_for_certificate(&self, certificate_id: Field<N>) -> Option<u64> {
 304          // Get the round.
 305          self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
 306      }
 307  
 308      /// Returns the round for the given `batch ID`.
 309      /// If the batch ID does not exist in storage, `None` is returned.
 310      pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> {
 311          // Get the round.
 312          self.batch_ids.read().get(&batch_id).copied()
 313      }
 314  
 315      /// Returns the certificate round for the given `certificate ID`.
 316      /// If the certificate ID does not exist in storage, `None` is returned.
 317      pub fn get_certificate_round(&self, certificate_id: Field<N>) -> Option<u64> {
 318          // Get the batch certificate and return the round.
 319          self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
 320      }
 321  
 322      /// Returns the certificate for the given `certificate ID`.
 323      /// If the certificate ID does not exist in storage, `None` is returned.
 324      pub fn get_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
 325          // Get the batch certificate.
 326          self.certificates.read().get(&certificate_id).cloned()
 327      }
 328  
 329      /// Returns the unprocessed certificate for the given `certificate ID`.
 330      /// If the certificate ID does not exist in storage, `None` is returned.
 331      pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
 332          // Get the unprocessed certificate.
 333          self.unprocessed_certificates.read().peek(&certificate_id).cloned()
 334      }
 335  
 336      /// Returns the certificate for the given `round` and `author`.
 337      /// If the round does not exist in storage, `None` is returned.
 338      /// If the author for the round does not exist in storage, `None` is returned.
 339      pub fn get_certificate_for_round_with_author(&self, round: u64, author: Address<N>) -> Option<BatchCertificate<N>> {
 340          // Retrieve the certificates.
 341          if let Some(entries) = self.rounds.read().get(&round) {
 342              let certificates = self.certificates.read();
 343              entries.iter().find_map(
 344                  |(certificate_id, a)| if a == &author { certificates.get(certificate_id).cloned() } else { None },
 345              )
 346          } else {
 347              Default::default()
 348          }
 349      }
 350  
 351      /// Returns the certificates for the given `round`.
 352      /// If the round does not exist in storage, an empty set is returned.
 353      pub fn get_certificates_for_round(&self, round: u64) -> IndexSet<BatchCertificate<N>> {
 354          // The genesis round does not have batch certificates.
 355          if round == 0 {
 356              return Default::default();
 357          }
 358          // Retrieve the certificates.
 359          if let Some(entries) = self.rounds.read().get(&round) {
 360              let certificates = self.certificates.read();
 361              entries.iter().flat_map(|(certificate_id, _)| certificates.get(certificate_id).cloned()).collect()
 362          } else {
 363              Default::default()
 364          }
 365      }
 366  
 367      /// Returns the certificate IDs for the given `round`.
 368      /// If the round does not exist in storage, an empty set is returned.
 369      pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> {
 370          // The genesis round does not have batch certificates.
 371          if round == 0 {
 372              return Default::default();
 373          }
 374          // Retrieve the certificates.
 375          if let Some(entries) = self.rounds.read().get(&round) {
 376              entries.iter().map(|(certificate_id, _)| *certificate_id).collect()
 377          } else {
 378              Default::default()
 379          }
 380      }
 381  
 382      /// Returns the certificate authors for the given `round`.
 383      /// If the round does not exist in storage, an empty set is returned.
 384      pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> {
 385          // The genesis round does not have batch certificates.
 386          if round == 0 {
 387              return Default::default();
 388          }
 389          // Retrieve the certificates.
 390          if let Some(entries) = self.rounds.read().get(&round) {
 391              entries.iter().map(|(_, author)| *author).collect()
 392          } else {
 393              Default::default()
 394          }
 395      }
 396  
 397      /// Returns the certificates that have not yet been included in the ledger.
 398      /// Note that the order of this set is by round and then insertion.
 399      pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
 400          // Obtain the read locks.
 401          let rounds = self.rounds.read();
 402          let certificates = self.certificates.read();
 403  
 404          // Iterate over the rounds.
 405          cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b))
 406              .flat_map(|(_, certificates_for_round)| {
 407                  // Iterate over the certificates for the round.
 408                  cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _)| {
 409                      // Skip the certificate if it already exists in the ledger.
 410                      if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) {
 411                          None
 412                      } else {
 413                          // Add the certificate to the pending certificates.
 414                          certificates.get(&certificate_id).cloned()
 415                      }
 416                  })
 417              })
 418              .collect()
 419      }
 420  
 421      /// Checks the given `batch_header` for validity, returning the missing transmissions from storage.
 422      ///
 423      /// # Arguments
 424      /// - `batch_header`: The batch header to check.
 425      /// - `transmissions`: All transmissions referenced by the certificate.
 426      /// - `aborted_transmissions`: The set of aborted transmissions in this certificate.
 427      ///
 428      /// # Invariants
 429      /// This method ensures the following invariants:
 430      /// - The batch ID does not already exist in storage.
 431      /// - The author is a member of the committee for the batch round.
 432      /// - The timestamp is within the allowed time range.
 433      /// - None of the transmissions are from any past rounds (up to GC).
 434      /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
 435      /// - All previous certificates declared in the certificate exist in storage (up to GC).
 436      /// - All previous certificates are for the previous round (i.e. round - 1).
 437      /// - All previous certificates contain a unique author.
 438      /// - The previous certificates reached the quorum threshold (N - f).
 439      ///
 440      /// # Returns
 441      /// - `Ok(Some(txns))` for a valid new batch, where `txns` is the set of missing transactions in the batch
 442      ///   that need to be fetched from peers.
 443      /// - `Ok(None)` if the batch already exists in storage
 444      pub fn check_batch_header(
 445          &self,
 446          batch_header: &BatchHeader<N>,
 447          transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
 448          aborted_transmissions: HashSet<TransmissionID<N>>,
 449      ) -> Result<Option<HashMap<TransmissionID<N>, Transmission<N>>>> {
 450          // Retrieve the round.
 451          let round = batch_header.round();
 452          // Retrieve the GC round.
 453          let gc_round = self.gc_round();
 454          // Construct a GC log message.
 455          let gc_log = format!("(gc = {gc_round})");
 456  
 457          // Ensure the batch ID does not already exist in storage.
 458          if self.contains_batch(batch_header.batch_id()) {
 459              debug!("Batch for round {round} already exists in storage {gc_log}");
 460              return Ok(None);
 461          }
 462  
 463          // Retrieve the committee lookback for the batch round.
 464          let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
 465              bail!("Storage failed to retrieve the committee lookback for round {round} {gc_log}")
 466          };
 467          // Ensure the author is in the committee.
 468          if !committee_lookback.is_committee_member(batch_header.author()) {
 469              bail!("Author {} is not in the committee for round {round} {gc_log}", batch_header.author())
 470          }
 471  
 472          // Check the timestamp for liveness.
 473          check_timestamp_for_liveness(batch_header.timestamp())?;
 474  
 475          // Retrieve the missing transmissions in storage from the given transmissions.
 476          let missing_transmissions = self
 477              .transmissions
 478              .find_missing_transmissions(batch_header, transmissions, aborted_transmissions)
 479              .map_err(|e| anyhow!("{e} for round {round} {gc_log}"))?;
 480  
 481          // Compute the previous round.
 482          let previous_round = round.saturating_sub(1);
 483          // Check if the previous round is within range of the GC round.
 484          if previous_round > gc_round {
 485              // Retrieve the committee lookback for the previous round.
 486              let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
 487                  bail!("Missing committee for the previous round {previous_round} in storage {gc_log}")
 488              };
 489              // Ensure the previous round certificates exists in storage.
 490              if !self.contains_certificates_for_round(previous_round) {
 491                  bail!("Missing certificates for the previous round {previous_round} in storage {gc_log}")
 492              }
 493              // Ensure the number of previous certificate IDs is at or below the number of committee members.
 494              if batch_header.previous_certificate_ids().len() > previous_committee_lookback.num_members() {
 495                  bail!("Too many previous certificates for round {round} {gc_log}")
 496              }
 497              // Initialize a set of the previous authors.
 498              let mut previous_authors = HashSet::with_capacity(batch_header.previous_certificate_ids().len());
 499              // Ensure storage contains all declared previous certificates (up to GC).
 500              for previous_certificate_id in batch_header.previous_certificate_ids() {
 501                  // Retrieve the previous certificate.
 502                  let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else {
 503                      bail!(
 504                          "Missing previous certificate '{}' for certificate in round {round} {gc_log}",
 505                          fmt_id(previous_certificate_id)
 506                      )
 507                  };
 508                  // Ensure the previous certificate is for the previous round.
 509                  if previous_certificate.round() != previous_round {
 510                      bail!("Round {round} certificate contains a round {previous_round} certificate {gc_log}")
 511                  }
 512                  // Ensure the previous author is new.
 513                  if previous_authors.contains(&previous_certificate.author()) {
 514                      bail!("Round {round} certificate contains a duplicate author {gc_log}")
 515                  }
 516                  // Insert the author of the previous certificate.
 517                  previous_authors.insert(previous_certificate.author());
 518              }
 519              // Ensure the previous certificates have reached the quorum threshold.
 520              if !previous_committee_lookback.is_quorum_threshold_reached(&previous_authors) {
 521                  bail!("Previous certificates for a batch in round {round} did not reach quorum threshold {gc_log}")
 522              }
 523          }
 524  
 525          Ok(Some(missing_transmissions))
 526      }
 527  
 528      /// Check the validity of a certificate coming from another validator.
 529      ///
 530      /// It suffices to check that the signers (author and endorsers) are members of the applicable committee
 531      /// and that they form a quorum in the committee.
 532      /// Under the fundamental fault tolerance assumption of at most `f` (stake of) faulty validators,
 533      /// the quorum check on signers guarantees that at least one correct validator
 534      /// has ensured the validity of the proposal contained in the certificate,
 535      /// either by construction (by the author) or by checking (by an endorser):
 536      /// given `N > 0` total stake, and `f` the largest integer `< N/3` (where `/` is exact rational division),
 537      /// we have `N >= 3f + 1`, which implies `N - f >= 2f + 1`, which is always `> f`;
 538      /// `N - f` is the quorum stake.
 539      pub fn check_incoming_certificate(&self, certificate: &BatchCertificate<N>) -> Result<()> {
 540          // Retrieve the certificate author and round.
 541          let certificate_author = certificate.author();
 542          let certificate_round = certificate.round();
 543  
 544          // Retrieve the committee lookback.
 545          let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
 546  
 547          // Ensure that the signers of the certificate reach the quorum threshold.
 548          // Note that certificate.signatures() only returns the endorsing signatures, not the author's signature.
 549          let mut signers: HashSet<Address<N>> =
 550              certificate.signatures().map(|signature| signature.to_address()).collect();
 551          signers.insert(certificate_author);
 552          ensure!(
 553              committee_lookback.is_quorum_threshold_reached(&signers),
 554              "Certificate '{}' for round {certificate_round} does not meet quorum requirements",
 555              certificate.id()
 556          );
 557  
 558          // Ensure that the signers of the certificate are in the committee.
 559          cfg_iter!(&signers).try_for_each(|signer| {
 560              ensure!(
 561                  committee_lookback.is_committee_member(*signer),
 562                  "Signer '{signer}' of certificate '{}' for round {certificate_round} is not in the committee",
 563                  certificate.id()
 564              );
 565              Ok(())
 566          })?;
 567  
 568          Ok(())
 569      }
 570  
 571      /// Checks the given `certificate` for validity, returning the missing transmissions from storage.
 572      ///
 573      /// # Arguments
 574      /// - `certificate`: The certificate to check.
 575      /// - `transmissions`: The transmissions contained in the certificate.
 576      /// - `aborted_transmissions`: The aborted transmission contained in the certificate.
 577      ///
 578      /// # Invariants
 579      /// This method ensures the following invariants:
 580      /// - The certificate ID does not already exist in storage.
 581      /// - The batch ID does not already exist in storage.
 582      /// - The author is a member of the committee for the batch round.
 583      /// - The author has not already created a certificate for the batch round.
 584      /// - The timestamp is within the allowed time range.
 585      /// - None of the transmissions are from any past rounds (up to GC).
 586      /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
 587      /// - All previous certificates declared in the certificate exist in storage (up to GC).
 588      /// - All previous certificates are for the previous round (i.e. round - 1).
 589      /// - The previous certificates reached the quorum threshold (N - f).
 590      /// - The timestamps from the signers are all within the allowed time range.
 591      /// - The signers have reached the quorum threshold (N - f).
 592      pub fn check_certificate(
 593          &self,
 594          certificate: &BatchCertificate<N>,
 595          transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
 596          aborted_transmissions: HashSet<TransmissionID<N>>,
 597      ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
 598          // Retrieve the round.
 599          let round = certificate.round();
 600          // Retrieve the GC round.
 601          let gc_round = self.gc_round();
 602          // Construct a GC log message.
 603          let gc_log = format!("(gc = {gc_round})");
 604  
 605          // Ensure the certificate ID does not already exist in storage.
 606          if self.contains_certificate(certificate.id()) {
 607              bail!("Certificate for round {round} already exists in storage {gc_log}")
 608          }
 609  
 610          // Ensure the storage does not already contain a certificate for this author in this round.
 611          if self.contains_certificate_in_round_from(round, certificate.author()) {
 612              bail!("Certificate with this author for round {round} already exists in storage {gc_log}")
 613          }
 614  
 615          // Ensure the batch header is well-formed.
 616          let Some(missing_transmissions) =
 617              self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)?
 618          else {
 619              bail!("Certificate for round {round} already exists in storage {gc_log}")
 620          };
 621  
 622          // Check the timestamp for liveness.
 623          check_timestamp_for_liveness(certificate.timestamp())?;
 624  
 625          // Retrieve the committee lookback for the batch round.
 626          let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
 627              bail!("Storage failed to retrieve the committee for round {round} {gc_log}")
 628          };
 629  
 630          // Initialize a set of the signers.
 631          let mut signers = HashSet::with_capacity(certificate.signatures().len() + 1);
 632          // Append the batch author.
 633          signers.insert(certificate.author());
 634  
 635          // Iterate over the signatures.
 636          for signature in certificate.signatures() {
 637              // Retrieve the signer.
 638              let signer = signature.to_address();
 639              // Ensure the signer is in the committee.
 640              if !committee_lookback.is_committee_member(signer) {
 641                  bail!("Signer {signer} is not in the committee for round {round} {gc_log}")
 642              }
 643              // Append the signer.
 644              signers.insert(signer);
 645          }
 646  
 647          // Ensure the signatures have reached the quorum threshold.
 648          if !committee_lookback.is_quorum_threshold_reached(&signers) {
 649              bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}")
 650          }
 651  
 652          Ok(missing_transmissions)
 653      }
 654  
 655      /// Inserts the given `certificate` into storage.
 656      ///
 657      /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
 658      ///
 659      /// # Arguments
 660      /// - `certificate`: The certificate to insert.
 661      /// - `transmissions`: The transmissions contained in the certificate, or the subset of the transmissions that in the certificate that do not yet exist in storage.
 662      /// - `aborted_transmissions`: The aborted transmission contained in the certificate.
 663      ///
 664      /// # Invariants
 665      /// This method ensures the following invariants:
 666      /// - The certificate ID does not already exist in storage.
 667      /// - The batch ID does not already exist in storage.
 668      /// - All transmissions declared in the certificate are provided or exist in storage (up to GC).
 669      /// - All previous certificates declared in the certificate exist in storage (up to GC).
 670      /// - All previous certificates are for the previous round (i.e. round - 1).
 671      /// - The previous certificates reached the quorum threshold (N - f).
 672      pub fn insert_certificate(
 673          &self,
 674          certificate: BatchCertificate<N>,
 675          transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
 676          aborted_transmissions: HashSet<TransmissionID<N>>,
 677      ) -> Result<()> {
 678          // Ensure the certificate round is above the GC round.
 679          ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
 680          // Ensure the certificate and its transmissions are valid.
 681          let missing_transmissions =
 682              self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?;
 683          // Insert the certificate into storage.
 684          self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
 685          Ok(())
 686      }
 687  
 688      /// Inserts the given `certificate` into storage.
 689      ///
 690      /// This method assumes **all missing** transmissions are provided in the `missing_transmissions` map.
 691      ///
 692      /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
 693      fn insert_certificate_atomic(
 694          &self,
 695          certificate: BatchCertificate<N>,
 696          aborted_transmission_ids: HashSet<TransmissionID<N>>,
 697          missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
 698      ) {
 699          // Retrieve the round.
 700          let round = certificate.round();
 701          // Retrieve the certificate ID.
 702          let certificate_id = certificate.id();
 703          // Retrieve the author of the batch.
 704          let author = certificate.author();
 705  
 706          // Insert the round to certificate ID entry.
 707          self.rounds.write().entry(round).or_default().insert((certificate_id, author));
 708          // Obtain the certificate's transmission ids.
 709          let transmission_ids = certificate.transmission_ids().clone();
 710          // Insert the certificate.
 711          self.certificates.write().insert(certificate_id, certificate);
 712          // Remove the unprocessed certificate.
 713          self.unprocessed_certificates.write().pop(&certificate_id);
 714          // Insert the batch ID.
 715          self.batch_ids.write().insert(certificate_id, round);
 716          // Insert the certificate ID for each of the transmissions into storage.
 717          self.transmissions.insert_transmissions(
 718              certificate_id,
 719              transmission_ids,
 720              aborted_transmission_ids,
 721              missing_transmissions,
 722          );
 723      }
 724  
 725      /// Inserts the given unprocessed `certificate` into storage.
 726      ///
 727      /// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`.
 728      pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
 729          // Ensure the certificate round is above the GC round.
 730          ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
 731          // Insert the certificate.
 732          self.unprocessed_certificates.write().put(certificate.id(), certificate);
 733  
 734          Ok(())
 735      }
 736  
 737      /// Removes the given `certificate ID` from storage. This method is used to garbage collect individual certificates once blocks are committed.
 738      ///
 739      /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
 740      ///
 741      /// If the certificate was successfully removed, `true` is returned.
 742      /// If the certificate did not exist in storage, `false` is returned.
 743      fn remove_certificate(&self, certificate_id: Field<N>) -> bool {
 744          // Retrieve the certificate.
 745          let Some(certificate) = self.get_certificate(certificate_id) else {
 746              warn!("Certificate {certificate_id} does not exist in storage");
 747              return false;
 748          };
 749          // Retrieve the round.
 750          let round = certificate.round();
 751          // Compute the author of the batch.
 752          let author = certificate.author();
 753  
 754          // TODO (howardwu): We may want to use `shift_remove` below, in order to align compatibility
 755          //  with tests written to for `remove_certificate`. However, this will come with performance hits.
 756          //  It will be better to write tests that compare the union of the sets.
 757  
 758          // Update the round.
 759          match self.rounds.write().entry(round) {
 760              Entry::Occupied(mut entry) => {
 761                  // Remove the round to certificate ID entry.
 762                  entry.get_mut().swap_remove(&(certificate_id, author));
 763                  // If the round is empty, remove it.
 764                  if entry.get().is_empty() {
 765                      entry.swap_remove();
 766                  }
 767              }
 768              Entry::Vacant(_) => {}
 769          }
 770          // Remove the certificate.
 771          self.certificates.write().swap_remove(&certificate_id);
 772          // Remove the unprocessed certificate.
 773          self.unprocessed_certificates.write().pop(&certificate_id);
 774          // Remove the batch ID.
 775          self.batch_ids.write().swap_remove(&certificate_id);
 776          // Remove the transmission entries in the certificate from storage.
 777          self.transmissions.remove_transmissions(&certificate_id, certificate.transmission_ids());
 778          // Return successfully.
 779          true
 780      }
 781  }
 782  
 783  impl<N: Network> Storage<N> {
 784      /// Syncs the current height with the block.
 785      pub(crate) fn sync_height_with_block(&self, next_height: u32) {
 786          // If the block height is greater than the current height in storage, sync the height.
 787          if next_height > self.current_height() {
 788              // Update the current height in storage.
 789              self.current_height.store(next_height, Ordering::SeqCst);
 790          }
 791      }
 792  
 793      /// Syncs the current round with the block.
 794      pub(crate) fn sync_round_with_block(&self, next_round: u64) {
 795          // Ensure we sync to at least round 1.
 796          let next_round = next_round.max(1);
 797          // If the round in the block is greater than the current round in storage, sync the round.
 798          if next_round > self.current_round() {
 799              // Update the current round in storage.
 800              self.update_current_round(next_round);
 801              // Log the updated round.
 802              info!("Synced to round {next_round}...");
 803          } else {
 804              trace!(
 805                  "Skipping sync to round {next_round} as it is less than the current round ({})",
 806                  self.current_round()
 807              );
 808          }
 809      }
 810  
 811      /// Syncs the batch certificate with the block.
 812      pub(crate) fn sync_certificate_with_block(
 813          &self,
 814          block: &Block<N>,
 815          certificate: BatchCertificate<N>,
 816          unconfirmed_transactions: &HashMap<N::TransactionID, Transaction<N>>,
 817      ) {
 818          // Skip if the certificate round is below the GC round.
 819          let gc_round = self.gc_round();
 820          if certificate.round() <= gc_round {
 821              trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round());
 822              return;
 823          }
 824  
 825          // If the certificate ID already exists in storage, skip it.
 826          if self.contains_certificate(certificate.id()) {
 827              trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round());
 828              return;
 829          }
 830          // Retrieve the transmissions for the certificate.
 831          let mut missing_transmissions = HashMap::new();
 832  
 833          // Retrieve the aborted transmissions for the certificate.
 834          let mut aborted_transmissions = HashSet::new();
 835  
 836          // Track the block's aborted solutions and transactions.
 837          let aborted_solutions: IndexSet<_> = block.aborted_solution_ids().iter().collect();
 838          let aborted_transactions: IndexSet<_> = block.aborted_transaction_ids().iter().collect();
 839  
 840          // Iterate over the transmission IDs.
 841          for transmission_id in certificate.transmission_ids() {
 842              // If the transmission ID already exists in the map, skip it.
 843              if missing_transmissions.contains_key(transmission_id) {
 844                  continue;
 845              }
 846              // If the transmission ID exists in storage, skip it.
 847              if self.contains_transmission(*transmission_id) {
 848                  continue;
 849              }
 850              // Retrieve the transmission.
 851              match transmission_id {
 852                  TransmissionID::Ratification => (),
 853                  TransmissionID::Solution(solution_id, _) => {
 854                      // Retrieve the solution.
 855                      match block.get_solution(solution_id) {
 856                          // Insert the solution.
 857                          Some(solution) => missing_transmissions.insert(*transmission_id, (*solution).into()),
 858                          // Otherwise, try to load the solution from the ledger.
 859                          None => match self.ledger.get_solution(solution_id) {
 860                              // Insert the solution.
 861                              Ok(solution) => missing_transmissions.insert(*transmission_id, solution.into()),
 862                              // Check if the solution is in the aborted solutions.
 863                              Err(_) => {
 864                                  // Insert the aborted solution if it exists in the block or ledger.
 865                                  match aborted_solutions.contains(solution_id)
 866                                      || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
 867                                  {
 868                                      true => {
 869                                          aborted_transmissions.insert(*transmission_id);
 870                                      }
 871                                      false => error!("Missing solution {solution_id} in block {}", block.height()),
 872                                  }
 873                                  continue;
 874                              }
 875                          },
 876                      };
 877                  }
 878                  TransmissionID::Transaction(transaction_id, _) => {
 879                      // Retrieve the transaction.
 880                      match unconfirmed_transactions.get(transaction_id) {
 881                          // Insert the transaction.
 882                          Some(transaction) => missing_transmissions.insert(*transmission_id, transaction.clone().into()),
 883                          // Otherwise, try to load the unconfirmed transaction from the ledger.
 884                          None => match self.ledger.get_unconfirmed_transaction(*transaction_id) {
 885                              // Insert the transaction.
 886                              Ok(transaction) => missing_transmissions.insert(*transmission_id, transaction.into()),
 887                              // Check if the transaction is in the aborted transactions.
 888                              Err(_) => {
 889                                  // Insert the aborted transaction if it exists in the block or ledger.
 890                                  match aborted_transactions.contains(transaction_id)
 891                                      || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
 892                                  {
 893                                      true => {
 894                                          aborted_transmissions.insert(*transmission_id);
 895                                      }
 896                                      false => warn!("Missing transaction {transaction_id} in block {}", block.height()),
 897                                  }
 898                                  continue;
 899                              }
 900                          },
 901                      };
 902                  }
 903              }
 904          }
 905          // Insert the batch certificate into storage.
 906          let certificate_id = fmt_id(certificate.id());
 907          debug!(
 908              "Syncing certificate '{certificate_id}' for round {} with {} transmissions",
 909              certificate.round(),
 910              certificate.transmission_ids().len()
 911          );
 912  
 913          if let Err(error) = self
 914              .insert_certificate(certificate, missing_transmissions, aborted_transmissions)
 915              .with_context(|| format!("Failed to insert certificate '{certificate_id}' from block {}", block.height()))
 916          {
 917              error!("{}", &flatten_error(&error));
 918          }
 919      }
 920  }
 921  
 922  #[cfg(test)]
 923  impl<N: Network> Storage<N> {
 924      /// Returns the ledger service.
 925      pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
 926          &self.ledger
 927      }
 928  
 929      /// Returns an iterator over the `(round, (certificate ID, batch ID, author))` entries.
 930      pub fn rounds_iter(&self) -> impl Iterator<Item = (u64, IndexSet<(Field<N>, Address<N>)>)> {
 931          self.rounds.read().clone().into_iter()
 932      }
 933  
 934      /// Returns an iterator over the `(certificate ID, certificate)` entries.
 935      pub fn certificates_iter(&self) -> impl Iterator<Item = (Field<N>, BatchCertificate<N>)> {
 936          self.certificates.read().clone().into_iter()
 937      }
 938  
 939      /// Returns an iterator over the `(batch ID, round)` entries.
 940      pub fn batch_ids_iter(&self) -> impl Iterator<Item = (Field<N>, u64)> {
 941          self.batch_ids.read().clone().into_iter()
 942      }
 943  
 944      /// Returns an iterator over the `(transmission ID, (transmission, certificate IDs))` entries.
 945      pub fn transmissions_iter(
 946          &self,
 947      ) -> impl Iterator<Item = (TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>))> {
 948          self.transmissions.as_hashmap().into_iter()
 949      }
 950  
 951      /// Inserts the given `certificate` into storage.
 952      ///
 953      /// Note: Do NOT use this in production. This is for **testing only**.
 954      #[cfg(test)]
 955      #[doc(hidden)]
 956      pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) {
 957          // Retrieve the round.
 958          let round = certificate.round();
 959          // Retrieve the certificate ID.
 960          let certificate_id = certificate.id();
 961          // Retrieve the author of the batch.
 962          let author = certificate.author();
 963  
 964          // Insert the round to certificate ID entry.
 965          self.rounds.write().entry(round).or_default().insert((certificate_id, author));
 966          // Obtain the certificate's transmission ids.
 967          let transmission_ids = certificate.transmission_ids().clone();
 968          // Insert the certificate.
 969          self.certificates.write().insert(certificate_id, certificate);
 970          // Insert the batch ID.
 971          self.batch_ids.write().insert(certificate_id, round);
 972  
 973          // Construct the dummy missing transmissions (for testing purposes).
 974          let missing_transmissions = transmission_ids
 975              .iter()
 976              .map(|id| (*id, Transmission::Transaction(alphavm::ledger::narwhal::Data::Buffer(bytes::Bytes::new()))))
 977              .collect::<HashMap<_, _>>();
 978          // Insert the certificate ID for each of the transmissions into storage.
 979          self.transmissions.insert_transmissions(
 980              certificate_id,
 981              transmission_ids,
 982              Default::default(),
 983              missing_transmissions,
 984          );
 985      }
 986  }
 987  
 988  #[cfg(test)]
 989  pub(crate) mod tests {
 990      use super::*;
 991      use alphaos_node_bft_ledger_service::MockLedgerService;
 992      use alphaos_node_bft_storage_service::BFTMemoryService;
 993      use alphavm::{
 994          ledger::narwhal::{batch_certificate::test_helpers::sample_batch_certificate_for_round_with_committee, Data},
 995          prelude::{Rng, TestRng},
 996      };
 997  
 998      use ::bytes::Bytes;
 999      use indexmap::indexset;
1000  
1001      type CurrentNetwork = alphavm::prelude::MainnetV0;
1002  
1003      /// Asserts that the storage matches the expected layout.
1004      pub fn assert_storage<N: Network>(
1005          storage: &Storage<N>,
1006          rounds: &[(u64, IndexSet<(Field<N>, Address<N>)>)],
1007          certificates: &[(Field<N>, BatchCertificate<N>)],
1008          batch_ids: &[(Field<N>, u64)],
1009          transmissions: &HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
1010      ) {
1011          // Ensure the rounds are well-formed.
1012          assert_eq!(storage.rounds_iter().collect::<Vec<_>>(), *rounds);
1013          // Ensure the certificates are well-formed.
1014          assert_eq!(storage.certificates_iter().collect::<Vec<_>>(), *certificates);
1015          // Ensure the batch IDs are well-formed.
1016          assert_eq!(storage.batch_ids_iter().collect::<Vec<_>>(), *batch_ids);
1017          // Ensure the transmissions are well-formed.
1018          assert_eq!(storage.transmissions_iter().collect::<HashMap<_, _>>(), *transmissions);
1019      }
1020  
1021      /// Samples a random transmission.
1022      fn sample_transmission(rng: &mut TestRng) -> Transmission<CurrentNetwork> {
1023          // Sample random fake solution bytes.
1024          let s = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
1025          // Sample random fake transaction bytes.
1026          let t = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..2048).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
1027          // Sample a random transmission.
1028          match rng.r#gen::<bool>() {
1029              true => Transmission::Solution(s(rng)),
1030              false => Transmission::Transaction(t(rng)),
1031          }
1032      }
1033  
1034      /// Samples the random transmissions, returning the missing transmissions and the transmissions.
1035      pub(crate) fn sample_transmissions(
1036          certificate: &BatchCertificate<CurrentNetwork>,
1037          rng: &mut TestRng,
1038      ) -> (
1039          HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>,
1040          HashMap<TransmissionID<CurrentNetwork>, (Transmission<CurrentNetwork>, IndexSet<Field<CurrentNetwork>>)>,
1041      ) {
1042          // Retrieve the certificate ID.
1043          let certificate_id = certificate.id();
1044  
1045          let mut missing_transmissions = HashMap::new();
1046          let mut transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1047          for transmission_id in certificate.transmission_ids() {
1048              // Initialize the transmission.
1049              let transmission = sample_transmission(rng);
1050              // Update the missing transmissions.
1051              missing_transmissions.insert(*transmission_id, transmission.clone());
1052              // Update the transmissions map.
1053              transmissions
1054                  .entry(*transmission_id)
1055                  .or_insert((transmission, Default::default()))
1056                  .1
1057                  .insert(certificate_id);
1058          }
1059          (missing_transmissions, transmissions)
1060      }
1061  
1062      // TODO (howardwu): Testing with 'max_gc_rounds' set to '0' should ensure everything is cleared after insertion.
1063  
1064      #[test]
1065      fn test_certificate_insert_remove() {
1066          let rng = &mut TestRng::default();
1067  
1068          // Sample a committee.
1069          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
1070          // Initialize the ledger.
1071          let ledger = Arc::new(MockLedgerService::new(committee));
1072          // Initialize the storage.
1073          let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1074  
1075          // Ensure the storage is empty.
1076          assert_storage(&storage, &[], &[], &[], &Default::default());
1077  
1078          // Create a new certificate.
1079          let certificate = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1080          // Retrieve the certificate ID.
1081          let certificate_id = certificate.id();
1082          // Retrieve the round.
1083          let round = certificate.round();
1084          // Retrieve the author of the batch.
1085          let author = certificate.author();
1086  
1087          // Construct the sample 'transmissions'.
1088          let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1089  
1090          // Insert the certificate.
1091          storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions);
1092          // Ensure the certificate exists in storage.
1093          assert!(storage.contains_certificate(certificate_id));
1094          // Ensure the certificate is stored in the correct round.
1095          assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1096          // Ensure the certificate is stored for the correct round and author.
1097          assert_eq!(storage.get_certificate_for_round_with_author(round, author), Some(certificate.clone()));
1098  
1099          // Check that the underlying storage representation is correct.
1100          {
1101              // Construct the expected layout for 'rounds'.
1102              let rounds = [(round, indexset! { (certificate_id, author) })];
1103              // Construct the expected layout for 'certificates'.
1104              let certificates = [(certificate_id, certificate.clone())];
1105              // Construct the expected layout for 'batch_ids'.
1106              let batch_ids = [(certificate_id, round)];
1107              // Assert the storage is well-formed.
1108              assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1109          }
1110  
1111          // Retrieve the certificate.
1112          let candidate_certificate = storage.get_certificate(certificate_id).unwrap();
1113          // Ensure the retrieved certificate is the same as the inserted certificate.
1114          assert_eq!(certificate, candidate_certificate);
1115  
1116          // Remove the certificate.
1117          assert!(storage.remove_certificate(certificate_id));
1118          // Ensure the certificate does not exist in storage.
1119          assert!(!storage.contains_certificate(certificate_id));
1120          // Ensure the certificate is no longer stored in the round.
1121          assert!(storage.get_certificates_for_round(round).is_empty());
1122          // Ensure the certificate is no longer stored for the round and author.
1123          assert_eq!(storage.get_certificate_for_round_with_author(round, author), None);
1124          // Ensure the storage is empty.
1125          assert_storage(&storage, &[], &[], &[], &Default::default());
1126      }
1127  
1128      #[test]
1129      fn test_certificate_duplicate() {
1130          let rng = &mut TestRng::default();
1131  
1132          // Sample a committee.
1133          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
1134          // Initialize the ledger.
1135          let ledger = Arc::new(MockLedgerService::new(committee));
1136          // Initialize the storage.
1137          let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1138  
1139          // Ensure the storage is empty.
1140          assert_storage(&storage, &[], &[], &[], &Default::default());
1141  
1142          // Create a new certificate.
1143          let certificate = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1144          // Retrieve the certificate ID.
1145          let certificate_id = certificate.id();
1146          // Retrieve the round.
1147          let round = certificate.round();
1148          // Retrieve the author of the batch.
1149          let author = certificate.author();
1150  
1151          // Construct the expected layout for 'rounds'.
1152          let rounds = [(round, indexset! { (certificate_id, author) })];
1153          // Construct the expected layout for 'certificates'.
1154          let certificates = [(certificate_id, certificate.clone())];
1155          // Construct the expected layout for 'batch_ids'.
1156          let batch_ids = [(certificate_id, round)];
1157          // Construct the sample 'transmissions'.
1158          let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1159  
1160          // Insert the certificate.
1161          storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1162          // Ensure the certificate exists in storage.
1163          assert!(storage.contains_certificate(certificate_id));
1164          // Check that the underlying storage representation is correct.
1165          assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1166  
1167          // Insert the certificate again - without any missing transmissions.
1168          storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1169          // Ensure the certificate exists in storage.
1170          assert!(storage.contains_certificate(certificate_id));
1171          // Check that the underlying storage representation remains unchanged.
1172          assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1173  
1174          // Insert the certificate again - with all of the original missing transmissions.
1175          storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1176          // Ensure the certificate exists in storage.
1177          assert!(storage.contains_certificate(certificate_id));
1178          // Check that the underlying storage representation remains unchanged.
1179          assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1180      }
1181  
1182      /// Test that `check_incoming_certificate` does not reject a valid cert.
1183      #[test]
1184      fn test_valid_incoming_certificate() {
1185          let rng = &mut TestRng::default();
1186  
1187          // Sample a committee.
1188          let (committee, private_keys) =
1189              alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 5, rng);
1190          // Initialize the ledger.
1191          let ledger = Arc::new(MockLedgerService::new(committee));
1192          // Initialize the storage.
1193          let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1194  
1195          // Go through many rounds of valid certificates and ensure they're accepted.
1196          let mut previous_certs = IndexSet::default();
1197  
1198          for round in 1..=100 {
1199              let mut new_certs = IndexSet::default();
1200  
1201              // Generate one cert per validator
1202              for private_key in private_keys.iter() {
1203                  let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1204  
1205                  let certificate = sample_batch_certificate_for_round_with_committee(
1206                      round,
1207                      previous_certs.clone(),
1208                      private_key,
1209                      &other_keys,
1210                      rng,
1211                  );
1212                  storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1213                  new_certs.insert(certificate.id());
1214  
1215                  // Construct the sample 'transmissions'.
1216                  let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1217                  storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1218              }
1219  
1220              previous_certs = new_certs;
1221          }
1222      }
1223  
1224      /// Make sure that we reject all certificates without sufficient signatures early.
1225      #[test]
1226      fn test_invalid_incoming_certificate_missing_signature() {
1227          let rng = &mut TestRng::default();
1228  
1229          // Sample a committee.
1230          let (committee, private_keys) =
1231              alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1232          // Initialize the ledger.
1233          let ledger = Arc::new(MockLedgerService::new(committee));
1234          // Initialize the storage.
1235          let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1236  
1237          // Go through many rounds of valid certificates and ensure they're accepted.
1238          let mut previous_certs = IndexSet::default();
1239  
1240          for round in 1..=5 {
1241              let mut new_certs = IndexSet::default();
1242  
1243              // Generate one cert per validator
1244              for private_key in private_keys.iter() {
1245                  if round < 5 {
1246                      let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1247  
1248                      let certificate = sample_batch_certificate_for_round_with_committee(
1249                          round,
1250                          previous_certs.clone(),
1251                          private_key,
1252                          &other_keys,
1253                          rng,
1254                      );
1255                      storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1256                      new_certs.insert(certificate.id());
1257  
1258                      // Construct the sample 'transmissions'.
1259                      let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1260                      storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1261                  } else {
1262                      // Pick a few signers, but not enough to form a quorum.
1263                      let other_keys: Vec<_> = private_keys[0..=3].iter().cloned().filter(|k| k != private_key).collect();
1264  
1265                      let certificate = sample_batch_certificate_for_round_with_committee(
1266                          round,
1267                          previous_certs.clone(),
1268                          private_key,
1269                          &other_keys,
1270                          rng,
1271                      );
1272                      assert!(storage.check_incoming_certificate(&certificate).is_err());
1273                  }
1274              }
1275  
1276              previous_certs = new_certs;
1277          }
1278      }
1279  
1280      /// Verify that `insert_certificate` rejects certs with less edges than required.
1281      #[test]
1282      fn test_invalid_certificate_insufficient_previous_certs() {
1283          let rng = &mut TestRng::default();
1284  
1285          // Sample a committee.
1286          let (committee, private_keys) =
1287              alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1288          // Initialize the ledger.
1289          let ledger = Arc::new(MockLedgerService::new(committee));
1290          // Initialize the storage.
1291          let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1292  
1293          // Go through many rounds of valid certificates and ensure they're accepted.
1294          let mut previous_certs = IndexSet::default();
1295  
1296          for round in 1..=6 {
1297              let mut new_certs = IndexSet::default();
1298  
1299              // Generate one cert per validator
1300              for private_key in private_keys.iter() {
1301                  let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1302  
1303                  let certificate = sample_batch_certificate_for_round_with_committee(
1304                      round,
1305                      previous_certs.clone(),
1306                      private_key,
1307                      &other_keys,
1308                      rng,
1309                  );
1310  
1311                  // Construct the sample 'transmissions'.
1312                  let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1313                  let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1314  
1315                  if round <= 5 {
1316                      new_certs.insert(certificate.id());
1317                      storage
1318                          .insert_certificate(certificate, transmissions, Default::default())
1319                          .expect("Valid certificate rejected");
1320                  } else {
1321                      assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1322                  }
1323              }
1324  
1325              if round < 5 {
1326                  previous_certs = new_certs;
1327              } else {
1328                  // Remove more than half of the previous certs.
1329                  previous_certs = new_certs.into_iter().skip(6).collect();
1330              }
1331          }
1332      }
1333  
1334      /// Verify that `insert_certificate` rejects certs that do not increment the round number.
1335      #[test]
1336      fn test_invalid_certificate_wrong_round_number() {
1337          let rng = &mut TestRng::default();
1338  
1339          // Sample a committee.
1340          let (committee, private_keys) =
1341              alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1342          // Initialize the ledger.
1343          let ledger = Arc::new(MockLedgerService::new(committee));
1344          // Initialize the storage.
1345          let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1346  
1347          // Go through many rounds of valid certificates and ensure they're accepted.
1348          let mut previous_certs = IndexSet::default();
1349  
1350          for round in 1..=6 {
1351              let mut new_certs = IndexSet::default();
1352  
1353              // Generate one cert per validator
1354              for private_key in private_keys.iter() {
1355                  let cert_round = round.min(5); // In the sixth round, do not increment
1356                  let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1357  
1358                  let certificate = sample_batch_certificate_for_round_with_committee(
1359                      cert_round,
1360                      previous_certs.clone(),
1361                      private_key,
1362                      &other_keys,
1363                      rng,
1364                  );
1365  
1366                  // Construct the sample 'transmissions'.
1367                  let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1368                  let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1369  
1370                  if round <= 5 {
1371                      new_certs.insert(certificate.id());
1372                      storage
1373                          .insert_certificate(certificate, transmissions, Default::default())
1374                          .expect("Valid certificate rejected");
1375                  } else {
1376                      assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1377                  }
1378              }
1379  
1380              if round < 5 {
1381                  previous_certs = new_certs;
1382              } else {
1383                  // Remove more than half of the previous certs.
1384                  previous_certs = new_certs.into_iter().skip(6).collect();
1385              }
1386          }
1387      }
1388  }
1389  
1390  #[cfg(test)]
1391  pub mod prop_tests {
1392      use super::*;
1393      use crate::helpers::{now, storage::tests::assert_storage};
1394      use alphaos_node_bft_ledger_service::MockLedgerService;
1395      use alphaos_node_bft_storage_service::BFTMemoryService;
1396      use alphavm::{
1397          ledger::{
1398              committee::prop_tests::{CommitteeContext, ValidatorSet},
1399              narwhal::{BatchHeader, Data},
1400              puzzle::SolutionID,
1401          },
1402          prelude::{Signature, Uniform},
1403      };
1404  
1405      use ::bytes::Bytes;
1406      use indexmap::indexset;
1407      use proptest::{
1408          collection,
1409          prelude::{any, Arbitrary, BoxedStrategy, Just, Strategy},
1410          prop_oneof,
1411          sample::{size_range, Selector},
1412          test_runner::TestRng,
1413      };
1414      use rand::{CryptoRng, Error, Rng, RngCore};
1415      use std::fmt::Debug;
1416      use test_strategy::proptest;
1417  
1418      type CurrentNetwork = alphavm::prelude::MainnetV0;
1419  
1420      impl Arbitrary for Storage<CurrentNetwork> {
1421          type Parameters = CommitteeContext;
1422          type Strategy = BoxedStrategy<Storage<CurrentNetwork>>;
1423  
1424          fn arbitrary() -> Self::Strategy {
1425              (any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1426                  .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1427                      let ledger = Arc::new(MockLedgerService::new(committee));
1428                      Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1429                  })
1430                  .boxed()
1431          }
1432  
1433          fn arbitrary_with(context: Self::Parameters) -> Self::Strategy {
1434              (Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1435                  .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1436                      let ledger = Arc::new(MockLedgerService::new(committee));
1437                      Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1438                  })
1439                  .boxed()
1440          }
1441      }
1442  
1443      // The `proptest::TestRng` doesn't implement `rand_core::CryptoRng` trait which is required in alphavm, so we use a wrapper
1444      #[derive(Debug)]
1445      pub struct CryptoTestRng(TestRng);
1446  
1447      impl Arbitrary for CryptoTestRng {
1448          type Parameters = ();
1449          type Strategy = BoxedStrategy<CryptoTestRng>;
1450  
1451          fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1452              Just(0).prop_perturb(|_, rng| CryptoTestRng(rng)).boxed()
1453          }
1454      }
1455      impl RngCore for CryptoTestRng {
1456          fn next_u32(&mut self) -> u32 {
1457              self.0.next_u32()
1458          }
1459  
1460          fn next_u64(&mut self) -> u64 {
1461              self.0.next_u64()
1462          }
1463  
1464          fn fill_bytes(&mut self, dest: &mut [u8]) {
1465              self.0.fill_bytes(dest);
1466          }
1467  
1468          fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> {
1469              self.0.try_fill_bytes(dest)
1470          }
1471      }
1472  
1473      impl CryptoRng for CryptoTestRng {}
1474  
1475      #[derive(Debug, Clone)]
1476      pub struct AnyTransmission(pub Transmission<CurrentNetwork>);
1477  
1478      impl Arbitrary for AnyTransmission {
1479          type Parameters = ();
1480          type Strategy = BoxedStrategy<AnyTransmission>;
1481  
1482          fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1483              any_transmission().prop_map(AnyTransmission).boxed()
1484          }
1485      }
1486  
1487      #[derive(Debug, Clone)]
1488      pub struct AnyTransmissionID(pub TransmissionID<CurrentNetwork>);
1489  
1490      impl Arbitrary for AnyTransmissionID {
1491          type Parameters = ();
1492          type Strategy = BoxedStrategy<AnyTransmissionID>;
1493  
1494          fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1495              any_transmission_id().prop_map(AnyTransmissionID).boxed()
1496          }
1497      }
1498  
1499      fn any_transmission() -> BoxedStrategy<Transmission<CurrentNetwork>> {
1500          prop_oneof![
1501              (collection::vec(any::<u8>(), 512..=512))
1502                  .prop_map(|bytes| Transmission::Solution(Data::Buffer(Bytes::from(bytes)))),
1503              (collection::vec(any::<u8>(), 2048..=2048))
1504                  .prop_map(|bytes| Transmission::Transaction(Data::Buffer(Bytes::from(bytes)))),
1505          ]
1506          .boxed()
1507      }
1508  
1509      pub fn any_solution_id() -> BoxedStrategy<SolutionID<CurrentNetwork>> {
1510          Just(0).prop_perturb(|_, rng| CryptoTestRng(rng).r#gen::<u64>().into()).boxed()
1511      }
1512  
1513      pub fn any_transaction_id() -> BoxedStrategy<<CurrentNetwork as Network>::TransactionID> {
1514          Just(0)
1515              .prop_perturb(|_, rng| {
1516                  <CurrentNetwork as Network>::TransactionID::from(Field::rand(&mut CryptoTestRng(rng)))
1517              })
1518              .boxed()
1519      }
1520  
1521      pub fn any_transmission_id() -> BoxedStrategy<TransmissionID<CurrentNetwork>> {
1522          prop_oneof![
1523              any_transaction_id().prop_perturb(|id, mut rng| TransmissionID::Transaction(
1524                  id,
1525                  rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1526              )),
1527              any_solution_id().prop_perturb(|id, mut rng| TransmissionID::Solution(
1528                  id,
1529                  rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1530              )),
1531          ]
1532          .boxed()
1533      }
1534  
1535      pub fn sign_batch_header<R: Rng + CryptoRng>(
1536          validator_set: &ValidatorSet,
1537          batch_header: &BatchHeader<CurrentNetwork>,
1538          rng: &mut R,
1539      ) -> IndexSet<Signature<CurrentNetwork>> {
1540          let mut signatures = IndexSet::with_capacity(validator_set.0.len());
1541          for validator in validator_set.0.iter() {
1542              let private_key = validator.private_key;
1543              signatures.insert(private_key.sign(&[batch_header.batch_id()], rng).unwrap());
1544          }
1545          signatures
1546      }
1547  
1548      #[proptest]
1549      fn test_certificate_duplicate(
1550          context: CommitteeContext,
1551          #[any(size_range(1..16).lift())] transmissions: Vec<(AnyTransmissionID, AnyTransmission)>,
1552          mut rng: CryptoTestRng,
1553          selector: Selector,
1554      ) {
1555          let CommitteeContext(committee, ValidatorSet(validators)) = context;
1556          let committee_id = committee.id();
1557  
1558          // Initialize the storage.
1559          let ledger = Arc::new(MockLedgerService::new(committee));
1560          let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1561  
1562          // Ensure the storage is empty.
1563          assert_storage(&storage, &[], &[], &[], &Default::default());
1564  
1565          // Create a new certificate.
1566          let signer = selector.select(&validators);
1567  
1568          let mut transmission_map = IndexMap::new();
1569  
1570          for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter() {
1571              transmission_map.insert(*id, t.clone());
1572          }
1573  
1574          let batch_header = BatchHeader::new(
1575              &signer.private_key,
1576              0,
1577              now(),
1578              committee_id,
1579              transmission_map.keys().cloned().collect(),
1580              Default::default(),
1581              &mut rng,
1582          )
1583          .unwrap();
1584  
1585          // Remove the author from the validator set passed to create the batch
1586          // certificate, the author should not sign their own batch.
1587          let mut validators = validators.clone();
1588          validators.remove(signer);
1589  
1590          let certificate = BatchCertificate::from(
1591              batch_header.clone(),
1592              sign_batch_header(&ValidatorSet(validators), &batch_header, &mut rng),
1593          )
1594          .unwrap();
1595  
1596          // Retrieve the certificate ID.
1597          let certificate_id = certificate.id();
1598          let mut internal_transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1599          for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter().cloned() {
1600              internal_transmissions.entry(id).or_insert((t, Default::default())).1.insert(certificate_id);
1601          }
1602  
1603          // Retrieve the round.
1604          let round = certificate.round();
1605          // Retrieve the author of the batch.
1606          let author = certificate.author();
1607  
1608          // Construct the expected layout for 'rounds'.
1609          let rounds = [(round, indexset! { (certificate_id, author) })];
1610          // Construct the expected layout for 'certificates'.
1611          let certificates = [(certificate_id, certificate.clone())];
1612          // Construct the expected layout for 'batch_ids'.
1613          let batch_ids = [(certificate_id, round)];
1614  
1615          // Insert the certificate.
1616          let missing_transmissions: HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>> =
1617              transmission_map.into_iter().collect();
1618          storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1619          // Ensure the certificate exists in storage.
1620          assert!(storage.contains_certificate(certificate_id));
1621          // Check that the underlying storage representation is correct.
1622          assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1623  
1624          // Insert the certificate again - without any missing transmissions.
1625          storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1626          // Ensure the certificate exists in storage.
1627          assert!(storage.contains_certificate(certificate_id));
1628          // Check that the underlying storage representation remains unchanged.
1629          assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1630  
1631          // Insert the certificate again - with all of the original missing transmissions.
1632          storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1633          // Ensure the certificate exists in storage.
1634          assert!(storage.contains_certificate(certificate_id));
1635          // Check that the underlying storage representation remains unchanged.
1636          assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1637      }
1638  }