storage.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::helpers::{check_timestamp_for_liveness, fmt_id}; 20 use alphaos_node_bft_ledger_service::LedgerService; 21 use alphaos_node_bft_storage_service::StorageService; 22 use alphavm::{ 23 ledger::{ 24 block::{Block, Transaction}, 25 narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID}, 26 }, 27 prelude::{anyhow, bail, ensure, Address, Field, Network, Result}, 28 utilities::{cfg_into_iter, cfg_iter, cfg_sorted_by, flatten_error}, 29 }; 30 31 use anyhow::Context; 32 use indexmap::{map::Entry, IndexMap, IndexSet}; 33 #[cfg(feature = "locktick")] 34 use locktick::parking_lot::RwLock; 35 use lru::LruCache; 36 #[cfg(not(feature = "locktick"))] 37 use parking_lot::RwLock; 38 #[cfg(not(feature = "serial"))] 39 use rayon::prelude::*; 40 use std::{ 41 collections::{HashMap, HashSet}, 42 num::NonZeroUsize, 43 sync::{ 44 atomic::{AtomicU32, AtomicU64, Ordering}, 45 Arc, 46 }, 47 }; 48 49 #[derive(Clone, Debug)] 50 pub struct Storage<N: Network>(Arc<StorageInner<N>>); 51 52 impl<N: Network> std::ops::Deref for Storage<N> { 53 type Target = Arc<StorageInner<N>>; 54 55 fn deref(&self) -> &Self::Target { 56 &self.0 57 } 58 } 59 60 /// The storage for the memory pool. 61 /// 62 /// The storage is used to store the following: 63 /// - `current_height` tracker. 64 /// - `current_round` tracker. 65 /// - `round` to `(certificate ID, batch ID, author)` entries. 66 /// - `certificate ID` to `certificate` entries. 67 /// - `batch ID` to `round` entries. 68 /// - `transmission ID` to `(transmission, certificate IDs)` entries. 69 /// 70 /// The chain of events is as follows: 71 /// 1. A `transmission` is received. 72 /// 2. After a `batch` is ready to be stored: 73 /// - The `certificate` is inserted, triggering updates to the 74 /// `rounds`, `certificates`, `batch_ids`, and `transmissions` maps. 75 /// - The missing `transmissions` from storage are inserted into the `transmissions` map. 76 /// - The certificate ID is inserted into the `transmissions` map. 77 /// 3. After a `round` reaches quorum threshold: 78 /// - The next round is inserted into the `current_round`. 79 #[derive(Debug)] 80 pub struct StorageInner<N: Network> { 81 /// The ledger service. 82 ledger: Arc<dyn LedgerService<N>>, 83 /* Once per block */ 84 /// The current height. 85 current_height: AtomicU32, 86 /* Once per round */ 87 /// The current round. 88 /// 89 /// Invariant: current_round > 0. 90 /// This is established in [`Storage::new`], which sets it to at least 1 via [`Storage::update_current_round`]. 91 /// The only callers of [`Storage::update_current_round`] are 92 /// [`Storage::increment_to_next_round`] and [`Storage::sync_round_with_block`], 93 /// both of which set it to at least 1. 94 current_round: AtomicU64, 95 /// The `round` for which garbage collection has occurred **up to** (inclusive). 96 gc_round: AtomicU64, 97 /// The maximum number of rounds to keep in storage. 98 max_gc_rounds: u64, 99 /* Once per batch */ 100 /// The map of `round` to a list of `(certificate ID, author)` entries. 101 rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Address<N>)>>>, 102 /// A cache of `certificate ID` to unprocessed `certificate`. 103 unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>, 104 /// The map of `certificate ID` to `certificate`. 105 certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>, 106 /// The map of `certificate ID` to `round`. 107 batch_ids: RwLock<IndexMap<Field<N>, u64>>, 108 /// The map of `transmission ID` to `(transmission, certificate IDs)` entries. 109 transmissions: Arc<dyn StorageService<N>>, 110 } 111 112 impl<N: Network> Storage<N> { 113 /// Initializes a new instance of storage. 114 pub fn new( 115 ledger: Arc<dyn LedgerService<N>>, 116 transmissions: Arc<dyn StorageService<N>>, 117 max_gc_rounds: u64, 118 ) -> Self { 119 // Retrieve the latest committee bonded in the ledger 120 // (genesis committee if the ledger contains only the genesis block). 121 let committee = ledger.current_committee().expect("Ledger is missing a committee."); 122 // Retrieve the round at which that committee was created, or 1 if it is the genesis committee. 123 let current_round = committee.starting_round().max(1); 124 // Set the unprocessed certificates cache size. 125 let unprocessed_cache_size = NonZeroUsize::new((N::LATEST_MAX_CERTIFICATES().unwrap() * 2) as usize).unwrap(); 126 127 // Create the storage. 128 let storage = Self(Arc::new(StorageInner { 129 ledger, 130 current_height: Default::default(), 131 current_round: AtomicU64::new(current_round), 132 gc_round: Default::default(), 133 max_gc_rounds, 134 rounds: Default::default(), 135 unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)), 136 certificates: Default::default(), 137 batch_ids: Default::default(), 138 transmissions, 139 })); 140 // Perform GC on the current round. 141 // Since there are no certificates yet, this only sets `gc_round`. 142 storage.garbage_collect_certificates(current_round); 143 // Return the storage. 144 storage 145 } 146 } 147 148 impl<N: Network> Storage<N> { 149 /// Returns the current height. 150 pub fn current_height(&self) -> u32 { 151 // Get the current height. 152 self.current_height.load(Ordering::SeqCst) 153 } 154 } 155 156 impl<N: Network> Storage<N> { 157 /// Returns the current round. 158 pub fn current_round(&self) -> u64 { 159 // Get the current round. 160 self.current_round.load(Ordering::SeqCst) 161 } 162 163 /// Returns the `round` that garbage collection has occurred **up to** (inclusive). 164 pub fn gc_round(&self) -> u64 { 165 // Get the GC round. 166 self.gc_round.load(Ordering::SeqCst) 167 } 168 169 /// Returns the maximum number of rounds to keep in storage. 170 pub fn max_gc_rounds(&self) -> u64 { 171 self.max_gc_rounds 172 } 173 174 /// Increments storage to the next round, updating the current round. 175 /// Note: This method is only called once per round, upon certification of the primary's batch. 176 pub fn increment_to_next_round(&self, current_round: u64) -> Result<u64> { 177 // Determine the next round. 178 let next_round = current_round + 1; 179 180 // Check if the next round is less than the current round in storage. 181 { 182 // Retrieve the storage round. 183 let storage_round = self.current_round(); 184 // If the next round is less than the current round in storage, return early with the storage round. 185 if next_round < storage_round { 186 return Ok(storage_round); 187 } 188 189 trace!("Incrementing storage from round {storage_round} to {next_round}"); 190 } 191 192 // Retrieve the current committee. 193 let current_committee = self.ledger.current_committee()?; 194 // Retrieve the current committee's starting round. 195 let starting_round = current_committee.starting_round(); 196 // If the primary is behind the current committee's starting round, sync with the latest block. 197 if next_round < starting_round { 198 // Retrieve the latest block round. 199 let latest_block_round = self.ledger.latest_round(); 200 // Log the round sync. 201 info!( 202 "Syncing primary round ({next_round}) with the current committee's starting round ({starting_round}). Syncing with the latest block round {latest_block_round}..." 203 ); 204 // Sync the round with the latest block. 205 self.sync_round_with_block(latest_block_round); 206 // Return the latest block round. 207 return Ok(latest_block_round); 208 } 209 210 // Update the storage to the next round. 211 self.update_current_round(next_round); 212 213 #[cfg(feature = "metrics")] 214 metrics::gauge(metrics::bft::LAST_STORED_ROUND, next_round as f64); 215 216 // Retrieve the storage round. 217 let storage_round = self.current_round(); 218 // Retrieve the GC round. 219 let gc_round = self.gc_round(); 220 // Ensure the next round matches in storage. 221 ensure!(next_round == storage_round, "The next round {next_round} does not match in storage ({storage_round})"); 222 // Ensure the next round is greater than or equal to the GC round. 223 ensure!(next_round >= gc_round, "The next round {next_round} is behind the GC round {gc_round}"); 224 225 // Log the updated round. 226 info!("Starting round {next_round}..."); 227 Ok(next_round) 228 } 229 230 /// Updates the storage to the next round. 231 fn update_current_round(&self, next_round: u64) { 232 // Update the current round. 233 self.current_round.store(next_round, Ordering::SeqCst); 234 } 235 236 /// Update the storage by performing garbage collection based on the next round. 237 pub(crate) fn garbage_collect_certificates(&self, next_round: u64) { 238 // Fetch the current GC round. 239 let current_gc_round = self.gc_round(); 240 // Compute the next GC round. 241 let next_gc_round = next_round.saturating_sub(self.max_gc_rounds); 242 // Check if storage needs to be garbage collected. 243 if next_gc_round > current_gc_round { 244 // Remove the GC round(s) from storage. 245 for gc_round in current_gc_round..=next_gc_round { 246 // Iterate over the certificates for the GC round. 247 for id in self.get_certificate_ids_for_round(gc_round).into_iter() { 248 trace!( 249 "Garbage collecting certificate {id} at round {gc_round} (cut-off is round {next_gc_round})" 250 ); 251 self.remove_certificate(id); 252 } 253 } 254 // Update the GC round. 255 self.gc_round.store(next_gc_round, Ordering::SeqCst); 256 } 257 } 258 } 259 260 impl<N: Network> Storage<N> { 261 /// Returns `true` if the storage contains the specified `round`. 262 pub fn contains_certificates_for_round(&self, round: u64) -> bool { 263 // Check if the round exists in storage. 264 self.rounds.read().contains_key(&round) 265 } 266 267 /// Returns `true` if the storage contains the specified `certificate ID`. 268 pub fn contains_certificate(&self, certificate_id: Field<N>) -> bool { 269 // Check if the certificate ID exists in storage. 270 self.certificates.read().contains_key(&certificate_id) 271 } 272 273 /// Returns `true` if the storage contains a certificate from the specified `author` in the given `round`. 274 pub fn contains_certificate_in_round_from(&self, round: u64, author: Address<N>) -> bool { 275 self.rounds.read().get(&round).is_some_and(|set| set.iter().any(|(_, a)| a == &author)) 276 } 277 278 /// Returns `true` if the storage contains the specified `certificate ID`. 279 pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool { 280 // Check if the certificate ID exists in storage. 281 self.unprocessed_certificates.read().contains(&certificate_id) 282 } 283 284 /// Returns `true` if the storage contains the specified `batch ID`. 285 pub fn contains_batch(&self, batch_id: Field<N>) -> bool { 286 // Check if the batch ID exists in storage. 287 self.batch_ids.read().contains_key(&batch_id) 288 } 289 290 /// Returns `true` if the storage contains the specified `transmission ID`. 291 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool { 292 self.transmissions.contains_transmission(transmission_id.into()) 293 } 294 295 /// Returns the transmission for the given `transmission ID`. 296 /// If the transmission ID does not exist in storage, `None` is returned. 297 pub fn get_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> { 298 self.transmissions.get_transmission(transmission_id.into()) 299 } 300 301 /// Returns the round for the given `certificate ID`. 302 /// If the certificate ID does not exist in storage, `None` is returned. 303 pub fn get_round_for_certificate(&self, certificate_id: Field<N>) -> Option<u64> { 304 // Get the round. 305 self.certificates.read().get(&certificate_id).map(|certificate| certificate.round()) 306 } 307 308 /// Returns the round for the given `batch ID`. 309 /// If the batch ID does not exist in storage, `None` is returned. 310 pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> { 311 // Get the round. 312 self.batch_ids.read().get(&batch_id).copied() 313 } 314 315 /// Returns the certificate round for the given `certificate ID`. 316 /// If the certificate ID does not exist in storage, `None` is returned. 317 pub fn get_certificate_round(&self, certificate_id: Field<N>) -> Option<u64> { 318 // Get the batch certificate and return the round. 319 self.certificates.read().get(&certificate_id).map(|certificate| certificate.round()) 320 } 321 322 /// Returns the certificate for the given `certificate ID`. 323 /// If the certificate ID does not exist in storage, `None` is returned. 324 pub fn get_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> { 325 // Get the batch certificate. 326 self.certificates.read().get(&certificate_id).cloned() 327 } 328 329 /// Returns the unprocessed certificate for the given `certificate ID`. 330 /// If the certificate ID does not exist in storage, `None` is returned. 331 pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> { 332 // Get the unprocessed certificate. 333 self.unprocessed_certificates.read().peek(&certificate_id).cloned() 334 } 335 336 /// Returns the certificate for the given `round` and `author`. 337 /// If the round does not exist in storage, `None` is returned. 338 /// If the author for the round does not exist in storage, `None` is returned. 339 pub fn get_certificate_for_round_with_author(&self, round: u64, author: Address<N>) -> Option<BatchCertificate<N>> { 340 // Retrieve the certificates. 341 if let Some(entries) = self.rounds.read().get(&round) { 342 let certificates = self.certificates.read(); 343 entries.iter().find_map( 344 |(certificate_id, a)| if a == &author { certificates.get(certificate_id).cloned() } else { None }, 345 ) 346 } else { 347 Default::default() 348 } 349 } 350 351 /// Returns the certificates for the given `round`. 352 /// If the round does not exist in storage, an empty set is returned. 353 pub fn get_certificates_for_round(&self, round: u64) -> IndexSet<BatchCertificate<N>> { 354 // The genesis round does not have batch certificates. 355 if round == 0 { 356 return Default::default(); 357 } 358 // Retrieve the certificates. 359 if let Some(entries) = self.rounds.read().get(&round) { 360 let certificates = self.certificates.read(); 361 entries.iter().flat_map(|(certificate_id, _)| certificates.get(certificate_id).cloned()).collect() 362 } else { 363 Default::default() 364 } 365 } 366 367 /// Returns the certificate IDs for the given `round`. 368 /// If the round does not exist in storage, an empty set is returned. 369 pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> { 370 // The genesis round does not have batch certificates. 371 if round == 0 { 372 return Default::default(); 373 } 374 // Retrieve the certificates. 375 if let Some(entries) = self.rounds.read().get(&round) { 376 entries.iter().map(|(certificate_id, _)| *certificate_id).collect() 377 } else { 378 Default::default() 379 } 380 } 381 382 /// Returns the certificate authors for the given `round`. 383 /// If the round does not exist in storage, an empty set is returned. 384 pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> { 385 // The genesis round does not have batch certificates. 386 if round == 0 { 387 return Default::default(); 388 } 389 // Retrieve the certificates. 390 if let Some(entries) = self.rounds.read().get(&round) { 391 entries.iter().map(|(_, author)| *author).collect() 392 } else { 393 Default::default() 394 } 395 } 396 397 /// Returns the certificates that have not yet been included in the ledger. 398 /// Note that the order of this set is by round and then insertion. 399 pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> { 400 // Obtain the read locks. 401 let rounds = self.rounds.read(); 402 let certificates = self.certificates.read(); 403 404 // Iterate over the rounds. 405 cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b)) 406 .flat_map(|(_, certificates_for_round)| { 407 // Iterate over the certificates for the round. 408 cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _)| { 409 // Skip the certificate if it already exists in the ledger. 410 if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) { 411 None 412 } else { 413 // Add the certificate to the pending certificates. 414 certificates.get(&certificate_id).cloned() 415 } 416 }) 417 }) 418 .collect() 419 } 420 421 /// Checks the given `batch_header` for validity, returning the missing transmissions from storage. 422 /// 423 /// # Arguments 424 /// - `batch_header`: The batch header to check. 425 /// - `transmissions`: All transmissions referenced by the certificate. 426 /// - `aborted_transmissions`: The set of aborted transmissions in this certificate. 427 /// 428 /// # Invariants 429 /// This method ensures the following invariants: 430 /// - The batch ID does not already exist in storage. 431 /// - The author is a member of the committee for the batch round. 432 /// - The timestamp is within the allowed time range. 433 /// - None of the transmissions are from any past rounds (up to GC). 434 /// - All transmissions declared in the batch header are provided or exist in storage (up to GC). 435 /// - All previous certificates declared in the certificate exist in storage (up to GC). 436 /// - All previous certificates are for the previous round (i.e. round - 1). 437 /// - All previous certificates contain a unique author. 438 /// - The previous certificates reached the quorum threshold (N - f). 439 /// 440 /// # Returns 441 /// - `Ok(Some(txns))` for a valid new batch, where `txns` is the set of missing transactions in the batch 442 /// that need to be fetched from peers. 443 /// - `Ok(None)` if the batch already exists in storage 444 pub fn check_batch_header( 445 &self, 446 batch_header: &BatchHeader<N>, 447 transmissions: HashMap<TransmissionID<N>, Transmission<N>>, 448 aborted_transmissions: HashSet<TransmissionID<N>>, 449 ) -> Result<Option<HashMap<TransmissionID<N>, Transmission<N>>>> { 450 // Retrieve the round. 451 let round = batch_header.round(); 452 // Retrieve the GC round. 453 let gc_round = self.gc_round(); 454 // Construct a GC log message. 455 let gc_log = format!("(gc = {gc_round})"); 456 457 // Ensure the batch ID does not already exist in storage. 458 if self.contains_batch(batch_header.batch_id()) { 459 debug!("Batch for round {round} already exists in storage {gc_log}"); 460 return Ok(None); 461 } 462 463 // Retrieve the committee lookback for the batch round. 464 let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else { 465 bail!("Storage failed to retrieve the committee lookback for round {round} {gc_log}") 466 }; 467 // Ensure the author is in the committee. 468 if !committee_lookback.is_committee_member(batch_header.author()) { 469 bail!("Author {} is not in the committee for round {round} {gc_log}", batch_header.author()) 470 } 471 472 // Check the timestamp for liveness. 473 check_timestamp_for_liveness(batch_header.timestamp())?; 474 475 // Retrieve the missing transmissions in storage from the given transmissions. 476 let missing_transmissions = self 477 .transmissions 478 .find_missing_transmissions(batch_header, transmissions, aborted_transmissions) 479 .map_err(|e| anyhow!("{e} for round {round} {gc_log}"))?; 480 481 // Compute the previous round. 482 let previous_round = round.saturating_sub(1); 483 // Check if the previous round is within range of the GC round. 484 if previous_round > gc_round { 485 // Retrieve the committee lookback for the previous round. 486 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else { 487 bail!("Missing committee for the previous round {previous_round} in storage {gc_log}") 488 }; 489 // Ensure the previous round certificates exists in storage. 490 if !self.contains_certificates_for_round(previous_round) { 491 bail!("Missing certificates for the previous round {previous_round} in storage {gc_log}") 492 } 493 // Ensure the number of previous certificate IDs is at or below the number of committee members. 494 if batch_header.previous_certificate_ids().len() > previous_committee_lookback.num_members() { 495 bail!("Too many previous certificates for round {round} {gc_log}") 496 } 497 // Initialize a set of the previous authors. 498 let mut previous_authors = HashSet::with_capacity(batch_header.previous_certificate_ids().len()); 499 // Ensure storage contains all declared previous certificates (up to GC). 500 for previous_certificate_id in batch_header.previous_certificate_ids() { 501 // Retrieve the previous certificate. 502 let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else { 503 bail!( 504 "Missing previous certificate '{}' for certificate in round {round} {gc_log}", 505 fmt_id(previous_certificate_id) 506 ) 507 }; 508 // Ensure the previous certificate is for the previous round. 509 if previous_certificate.round() != previous_round { 510 bail!("Round {round} certificate contains a round {previous_round} certificate {gc_log}") 511 } 512 // Ensure the previous author is new. 513 if previous_authors.contains(&previous_certificate.author()) { 514 bail!("Round {round} certificate contains a duplicate author {gc_log}") 515 } 516 // Insert the author of the previous certificate. 517 previous_authors.insert(previous_certificate.author()); 518 } 519 // Ensure the previous certificates have reached the quorum threshold. 520 if !previous_committee_lookback.is_quorum_threshold_reached(&previous_authors) { 521 bail!("Previous certificates for a batch in round {round} did not reach quorum threshold {gc_log}") 522 } 523 } 524 525 Ok(Some(missing_transmissions)) 526 } 527 528 /// Check the validity of a certificate coming from another validator. 529 /// 530 /// It suffices to check that the signers (author and endorsers) are members of the applicable committee 531 /// and that they form a quorum in the committee. 532 /// Under the fundamental fault tolerance assumption of at most `f` (stake of) faulty validators, 533 /// the quorum check on signers guarantees that at least one correct validator 534 /// has ensured the validity of the proposal contained in the certificate, 535 /// either by construction (by the author) or by checking (by an endorser): 536 /// given `N > 0` total stake, and `f` the largest integer `< N/3` (where `/` is exact rational division), 537 /// we have `N >= 3f + 1`, which implies `N - f >= 2f + 1`, which is always `> f`; 538 /// `N - f` is the quorum stake. 539 pub fn check_incoming_certificate(&self, certificate: &BatchCertificate<N>) -> Result<()> { 540 // Retrieve the certificate author and round. 541 let certificate_author = certificate.author(); 542 let certificate_round = certificate.round(); 543 544 // Retrieve the committee lookback. 545 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?; 546 547 // Ensure that the signers of the certificate reach the quorum threshold. 548 // Note that certificate.signatures() only returns the endorsing signatures, not the author's signature. 549 let mut signers: HashSet<Address<N>> = 550 certificate.signatures().map(|signature| signature.to_address()).collect(); 551 signers.insert(certificate_author); 552 ensure!( 553 committee_lookback.is_quorum_threshold_reached(&signers), 554 "Certificate '{}' for round {certificate_round} does not meet quorum requirements", 555 certificate.id() 556 ); 557 558 // Ensure that the signers of the certificate are in the committee. 559 cfg_iter!(&signers).try_for_each(|signer| { 560 ensure!( 561 committee_lookback.is_committee_member(*signer), 562 "Signer '{signer}' of certificate '{}' for round {certificate_round} is not in the committee", 563 certificate.id() 564 ); 565 Ok(()) 566 })?; 567 568 Ok(()) 569 } 570 571 /// Checks the given `certificate` for validity, returning the missing transmissions from storage. 572 /// 573 /// # Arguments 574 /// - `certificate`: The certificate to check. 575 /// - `transmissions`: The transmissions contained in the certificate. 576 /// - `aborted_transmissions`: The aborted transmission contained in the certificate. 577 /// 578 /// # Invariants 579 /// This method ensures the following invariants: 580 /// - The certificate ID does not already exist in storage. 581 /// - The batch ID does not already exist in storage. 582 /// - The author is a member of the committee for the batch round. 583 /// - The author has not already created a certificate for the batch round. 584 /// - The timestamp is within the allowed time range. 585 /// - None of the transmissions are from any past rounds (up to GC). 586 /// - All transmissions declared in the batch header are provided or exist in storage (up to GC). 587 /// - All previous certificates declared in the certificate exist in storage (up to GC). 588 /// - All previous certificates are for the previous round (i.e. round - 1). 589 /// - The previous certificates reached the quorum threshold (N - f). 590 /// - The timestamps from the signers are all within the allowed time range. 591 /// - The signers have reached the quorum threshold (N - f). 592 pub fn check_certificate( 593 &self, 594 certificate: &BatchCertificate<N>, 595 transmissions: HashMap<TransmissionID<N>, Transmission<N>>, 596 aborted_transmissions: HashSet<TransmissionID<N>>, 597 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> { 598 // Retrieve the round. 599 let round = certificate.round(); 600 // Retrieve the GC round. 601 let gc_round = self.gc_round(); 602 // Construct a GC log message. 603 let gc_log = format!("(gc = {gc_round})"); 604 605 // Ensure the certificate ID does not already exist in storage. 606 if self.contains_certificate(certificate.id()) { 607 bail!("Certificate for round {round} already exists in storage {gc_log}") 608 } 609 610 // Ensure the storage does not already contain a certificate for this author in this round. 611 if self.contains_certificate_in_round_from(round, certificate.author()) { 612 bail!("Certificate with this author for round {round} already exists in storage {gc_log}") 613 } 614 615 // Ensure the batch header is well-formed. 616 let Some(missing_transmissions) = 617 self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)? 618 else { 619 bail!("Certificate for round {round} already exists in storage {gc_log}") 620 }; 621 622 // Check the timestamp for liveness. 623 check_timestamp_for_liveness(certificate.timestamp())?; 624 625 // Retrieve the committee lookback for the batch round. 626 let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else { 627 bail!("Storage failed to retrieve the committee for round {round} {gc_log}") 628 }; 629 630 // Initialize a set of the signers. 631 let mut signers = HashSet::with_capacity(certificate.signatures().len() + 1); 632 // Append the batch author. 633 signers.insert(certificate.author()); 634 635 // Iterate over the signatures. 636 for signature in certificate.signatures() { 637 // Retrieve the signer. 638 let signer = signature.to_address(); 639 // Ensure the signer is in the committee. 640 if !committee_lookback.is_committee_member(signer) { 641 bail!("Signer {signer} is not in the committee for round {round} {gc_log}") 642 } 643 // Append the signer. 644 signers.insert(signer); 645 } 646 647 // Ensure the signatures have reached the quorum threshold. 648 if !committee_lookback.is_quorum_threshold_reached(&signers) { 649 bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}") 650 } 651 652 Ok(missing_transmissions) 653 } 654 655 /// Inserts the given `certificate` into storage. 656 /// 657 /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps. 658 /// 659 /// # Arguments 660 /// - `certificate`: The certificate to insert. 661 /// - `transmissions`: The transmissions contained in the certificate, or the subset of the transmissions that in the certificate that do not yet exist in storage. 662 /// - `aborted_transmissions`: The aborted transmission contained in the certificate. 663 /// 664 /// # Invariants 665 /// This method ensures the following invariants: 666 /// - The certificate ID does not already exist in storage. 667 /// - The batch ID does not already exist in storage. 668 /// - All transmissions declared in the certificate are provided or exist in storage (up to GC). 669 /// - All previous certificates declared in the certificate exist in storage (up to GC). 670 /// - All previous certificates are for the previous round (i.e. round - 1). 671 /// - The previous certificates reached the quorum threshold (N - f). 672 pub fn insert_certificate( 673 &self, 674 certificate: BatchCertificate<N>, 675 transmissions: HashMap<TransmissionID<N>, Transmission<N>>, 676 aborted_transmissions: HashSet<TransmissionID<N>>, 677 ) -> Result<()> { 678 // Ensure the certificate round is above the GC round. 679 ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round"); 680 // Ensure the certificate and its transmissions are valid. 681 let missing_transmissions = 682 self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?; 683 // Insert the certificate into storage. 684 self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions); 685 Ok(()) 686 } 687 688 /// Inserts the given `certificate` into storage. 689 /// 690 /// This method assumes **all missing** transmissions are provided in the `missing_transmissions` map. 691 /// 692 /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps. 693 fn insert_certificate_atomic( 694 &self, 695 certificate: BatchCertificate<N>, 696 aborted_transmission_ids: HashSet<TransmissionID<N>>, 697 missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>, 698 ) { 699 // Retrieve the round. 700 let round = certificate.round(); 701 // Retrieve the certificate ID. 702 let certificate_id = certificate.id(); 703 // Retrieve the author of the batch. 704 let author = certificate.author(); 705 706 // Insert the round to certificate ID entry. 707 self.rounds.write().entry(round).or_default().insert((certificate_id, author)); 708 // Obtain the certificate's transmission ids. 709 let transmission_ids = certificate.transmission_ids().clone(); 710 // Insert the certificate. 711 self.certificates.write().insert(certificate_id, certificate); 712 // Remove the unprocessed certificate. 713 self.unprocessed_certificates.write().pop(&certificate_id); 714 // Insert the batch ID. 715 self.batch_ids.write().insert(certificate_id, round); 716 // Insert the certificate ID for each of the transmissions into storage. 717 self.transmissions.insert_transmissions( 718 certificate_id, 719 transmission_ids, 720 aborted_transmission_ids, 721 missing_transmissions, 722 ); 723 } 724 725 /// Inserts the given unprocessed `certificate` into storage. 726 /// 727 /// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`. 728 pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> { 729 // Ensure the certificate round is above the GC round. 730 ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round"); 731 // Insert the certificate. 732 self.unprocessed_certificates.write().put(certificate.id(), certificate); 733 734 Ok(()) 735 } 736 737 /// Removes the given `certificate ID` from storage. This method is used to garbage collect individual certificates once blocks are committed. 738 /// 739 /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps. 740 /// 741 /// If the certificate was successfully removed, `true` is returned. 742 /// If the certificate did not exist in storage, `false` is returned. 743 fn remove_certificate(&self, certificate_id: Field<N>) -> bool { 744 // Retrieve the certificate. 745 let Some(certificate) = self.get_certificate(certificate_id) else { 746 warn!("Certificate {certificate_id} does not exist in storage"); 747 return false; 748 }; 749 // Retrieve the round. 750 let round = certificate.round(); 751 // Compute the author of the batch. 752 let author = certificate.author(); 753 754 // TODO (howardwu): We may want to use `shift_remove` below, in order to align compatibility 755 // with tests written to for `remove_certificate`. However, this will come with performance hits. 756 // It will be better to write tests that compare the union of the sets. 757 758 // Update the round. 759 match self.rounds.write().entry(round) { 760 Entry::Occupied(mut entry) => { 761 // Remove the round to certificate ID entry. 762 entry.get_mut().swap_remove(&(certificate_id, author)); 763 // If the round is empty, remove it. 764 if entry.get().is_empty() { 765 entry.swap_remove(); 766 } 767 } 768 Entry::Vacant(_) => {} 769 } 770 // Remove the certificate. 771 self.certificates.write().swap_remove(&certificate_id); 772 // Remove the unprocessed certificate. 773 self.unprocessed_certificates.write().pop(&certificate_id); 774 // Remove the batch ID. 775 self.batch_ids.write().swap_remove(&certificate_id); 776 // Remove the transmission entries in the certificate from storage. 777 self.transmissions.remove_transmissions(&certificate_id, certificate.transmission_ids()); 778 // Return successfully. 779 true 780 } 781 } 782 783 impl<N: Network> Storage<N> { 784 /// Syncs the current height with the block. 785 pub(crate) fn sync_height_with_block(&self, next_height: u32) { 786 // If the block height is greater than the current height in storage, sync the height. 787 if next_height > self.current_height() { 788 // Update the current height in storage. 789 self.current_height.store(next_height, Ordering::SeqCst); 790 } 791 } 792 793 /// Syncs the current round with the block. 794 pub(crate) fn sync_round_with_block(&self, next_round: u64) { 795 // Ensure we sync to at least round 1. 796 let next_round = next_round.max(1); 797 // If the round in the block is greater than the current round in storage, sync the round. 798 if next_round > self.current_round() { 799 // Update the current round in storage. 800 self.update_current_round(next_round); 801 // Log the updated round. 802 info!("Synced to round {next_round}..."); 803 } else { 804 trace!( 805 "Skipping sync to round {next_round} as it is less than the current round ({})", 806 self.current_round() 807 ); 808 } 809 } 810 811 /// Syncs the batch certificate with the block. 812 pub(crate) fn sync_certificate_with_block( 813 &self, 814 block: &Block<N>, 815 certificate: BatchCertificate<N>, 816 unconfirmed_transactions: &HashMap<N::TransactionID, Transaction<N>>, 817 ) { 818 // Skip if the certificate round is below the GC round. 819 let gc_round = self.gc_round(); 820 if certificate.round() <= gc_round { 821 trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round()); 822 return; 823 } 824 825 // If the certificate ID already exists in storage, skip it. 826 if self.contains_certificate(certificate.id()) { 827 trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round()); 828 return; 829 } 830 // Retrieve the transmissions for the certificate. 831 let mut missing_transmissions = HashMap::new(); 832 833 // Retrieve the aborted transmissions for the certificate. 834 let mut aborted_transmissions = HashSet::new(); 835 836 // Track the block's aborted solutions and transactions. 837 let aborted_solutions: IndexSet<_> = block.aborted_solution_ids().iter().collect(); 838 let aborted_transactions: IndexSet<_> = block.aborted_transaction_ids().iter().collect(); 839 840 // Iterate over the transmission IDs. 841 for transmission_id in certificate.transmission_ids() { 842 // If the transmission ID already exists in the map, skip it. 843 if missing_transmissions.contains_key(transmission_id) { 844 continue; 845 } 846 // If the transmission ID exists in storage, skip it. 847 if self.contains_transmission(*transmission_id) { 848 continue; 849 } 850 // Retrieve the transmission. 851 match transmission_id { 852 TransmissionID::Ratification => (), 853 TransmissionID::Solution(solution_id, _) => { 854 // Retrieve the solution. 855 match block.get_solution(solution_id) { 856 // Insert the solution. 857 Some(solution) => missing_transmissions.insert(*transmission_id, (*solution).into()), 858 // Otherwise, try to load the solution from the ledger. 859 None => match self.ledger.get_solution(solution_id) { 860 // Insert the solution. 861 Ok(solution) => missing_transmissions.insert(*transmission_id, solution.into()), 862 // Check if the solution is in the aborted solutions. 863 Err(_) => { 864 // Insert the aborted solution if it exists in the block or ledger. 865 match aborted_solutions.contains(solution_id) 866 || self.ledger.contains_transmission(transmission_id).unwrap_or(false) 867 { 868 true => { 869 aborted_transmissions.insert(*transmission_id); 870 } 871 false => error!("Missing solution {solution_id} in block {}", block.height()), 872 } 873 continue; 874 } 875 }, 876 }; 877 } 878 TransmissionID::Transaction(transaction_id, _) => { 879 // Retrieve the transaction. 880 match unconfirmed_transactions.get(transaction_id) { 881 // Insert the transaction. 882 Some(transaction) => missing_transmissions.insert(*transmission_id, transaction.clone().into()), 883 // Otherwise, try to load the unconfirmed transaction from the ledger. 884 None => match self.ledger.get_unconfirmed_transaction(*transaction_id) { 885 // Insert the transaction. 886 Ok(transaction) => missing_transmissions.insert(*transmission_id, transaction.into()), 887 // Check if the transaction is in the aborted transactions. 888 Err(_) => { 889 // Insert the aborted transaction if it exists in the block or ledger. 890 match aborted_transactions.contains(transaction_id) 891 || self.ledger.contains_transmission(transmission_id).unwrap_or(false) 892 { 893 true => { 894 aborted_transmissions.insert(*transmission_id); 895 } 896 false => warn!("Missing transaction {transaction_id} in block {}", block.height()), 897 } 898 continue; 899 } 900 }, 901 }; 902 } 903 } 904 } 905 // Insert the batch certificate into storage. 906 let certificate_id = fmt_id(certificate.id()); 907 debug!( 908 "Syncing certificate '{certificate_id}' for round {} with {} transmissions", 909 certificate.round(), 910 certificate.transmission_ids().len() 911 ); 912 913 if let Err(error) = self 914 .insert_certificate(certificate, missing_transmissions, aborted_transmissions) 915 .with_context(|| format!("Failed to insert certificate '{certificate_id}' from block {}", block.height())) 916 { 917 error!("{}", &flatten_error(&error)); 918 } 919 } 920 } 921 922 #[cfg(test)] 923 impl<N: Network> Storage<N> { 924 /// Returns the ledger service. 925 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> { 926 &self.ledger 927 } 928 929 /// Returns an iterator over the `(round, (certificate ID, batch ID, author))` entries. 930 pub fn rounds_iter(&self) -> impl Iterator<Item = (u64, IndexSet<(Field<N>, Address<N>)>)> { 931 self.rounds.read().clone().into_iter() 932 } 933 934 /// Returns an iterator over the `(certificate ID, certificate)` entries. 935 pub fn certificates_iter(&self) -> impl Iterator<Item = (Field<N>, BatchCertificate<N>)> { 936 self.certificates.read().clone().into_iter() 937 } 938 939 /// Returns an iterator over the `(batch ID, round)` entries. 940 pub fn batch_ids_iter(&self) -> impl Iterator<Item = (Field<N>, u64)> { 941 self.batch_ids.read().clone().into_iter() 942 } 943 944 /// Returns an iterator over the `(transmission ID, (transmission, certificate IDs))` entries. 945 pub fn transmissions_iter( 946 &self, 947 ) -> impl Iterator<Item = (TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>))> { 948 self.transmissions.as_hashmap().into_iter() 949 } 950 951 /// Inserts the given `certificate` into storage. 952 /// 953 /// Note: Do NOT use this in production. This is for **testing only**. 954 #[cfg(test)] 955 #[doc(hidden)] 956 pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) { 957 // Retrieve the round. 958 let round = certificate.round(); 959 // Retrieve the certificate ID. 960 let certificate_id = certificate.id(); 961 // Retrieve the author of the batch. 962 let author = certificate.author(); 963 964 // Insert the round to certificate ID entry. 965 self.rounds.write().entry(round).or_default().insert((certificate_id, author)); 966 // Obtain the certificate's transmission ids. 967 let transmission_ids = certificate.transmission_ids().clone(); 968 // Insert the certificate. 969 self.certificates.write().insert(certificate_id, certificate); 970 // Insert the batch ID. 971 self.batch_ids.write().insert(certificate_id, round); 972 973 // Construct the dummy missing transmissions (for testing purposes). 974 let missing_transmissions = transmission_ids 975 .iter() 976 .map(|id| (*id, Transmission::Transaction(alphavm::ledger::narwhal::Data::Buffer(bytes::Bytes::new())))) 977 .collect::<HashMap<_, _>>(); 978 // Insert the certificate ID for each of the transmissions into storage. 979 self.transmissions.insert_transmissions( 980 certificate_id, 981 transmission_ids, 982 Default::default(), 983 missing_transmissions, 984 ); 985 } 986 } 987 988 #[cfg(test)] 989 pub(crate) mod tests { 990 use super::*; 991 use alphaos_node_bft_ledger_service::MockLedgerService; 992 use alphaos_node_bft_storage_service::BFTMemoryService; 993 use alphavm::{ 994 ledger::narwhal::{batch_certificate::test_helpers::sample_batch_certificate_for_round_with_committee, Data}, 995 prelude::{Rng, TestRng}, 996 }; 997 998 use ::bytes::Bytes; 999 use indexmap::indexset; 1000 1001 type CurrentNetwork = alphavm::prelude::MainnetV0; 1002 1003 /// Asserts that the storage matches the expected layout. 1004 pub fn assert_storage<N: Network>( 1005 storage: &Storage<N>, 1006 rounds: &[(u64, IndexSet<(Field<N>, Address<N>)>)], 1007 certificates: &[(Field<N>, BatchCertificate<N>)], 1008 batch_ids: &[(Field<N>, u64)], 1009 transmissions: &HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>, 1010 ) { 1011 // Ensure the rounds are well-formed. 1012 assert_eq!(storage.rounds_iter().collect::<Vec<_>>(), *rounds); 1013 // Ensure the certificates are well-formed. 1014 assert_eq!(storage.certificates_iter().collect::<Vec<_>>(), *certificates); 1015 // Ensure the batch IDs are well-formed. 1016 assert_eq!(storage.batch_ids_iter().collect::<Vec<_>>(), *batch_ids); 1017 // Ensure the transmissions are well-formed. 1018 assert_eq!(storage.transmissions_iter().collect::<HashMap<_, _>>(), *transmissions); 1019 } 1020 1021 /// Samples a random transmission. 1022 fn sample_transmission(rng: &mut TestRng) -> Transmission<CurrentNetwork> { 1023 // Sample random fake solution bytes. 1024 let s = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 1025 // Sample random fake transaction bytes. 1026 let t = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..2048).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 1027 // Sample a random transmission. 1028 match rng.r#gen::<bool>() { 1029 true => Transmission::Solution(s(rng)), 1030 false => Transmission::Transaction(t(rng)), 1031 } 1032 } 1033 1034 /// Samples the random transmissions, returning the missing transmissions and the transmissions. 1035 pub(crate) fn sample_transmissions( 1036 certificate: &BatchCertificate<CurrentNetwork>, 1037 rng: &mut TestRng, 1038 ) -> ( 1039 HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>, 1040 HashMap<TransmissionID<CurrentNetwork>, (Transmission<CurrentNetwork>, IndexSet<Field<CurrentNetwork>>)>, 1041 ) { 1042 // Retrieve the certificate ID. 1043 let certificate_id = certificate.id(); 1044 1045 let mut missing_transmissions = HashMap::new(); 1046 let mut transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new(); 1047 for transmission_id in certificate.transmission_ids() { 1048 // Initialize the transmission. 1049 let transmission = sample_transmission(rng); 1050 // Update the missing transmissions. 1051 missing_transmissions.insert(*transmission_id, transmission.clone()); 1052 // Update the transmissions map. 1053 transmissions 1054 .entry(*transmission_id) 1055 .or_insert((transmission, Default::default())) 1056 .1 1057 .insert(certificate_id); 1058 } 1059 (missing_transmissions, transmissions) 1060 } 1061 1062 // TODO (howardwu): Testing with 'max_gc_rounds' set to '0' should ensure everything is cleared after insertion. 1063 1064 #[test] 1065 fn test_certificate_insert_remove() { 1066 let rng = &mut TestRng::default(); 1067 1068 // Sample a committee. 1069 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 1070 // Initialize the ledger. 1071 let ledger = Arc::new(MockLedgerService::new(committee)); 1072 // Initialize the storage. 1073 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1); 1074 1075 // Ensure the storage is empty. 1076 assert_storage(&storage, &[], &[], &[], &Default::default()); 1077 1078 // Create a new certificate. 1079 let certificate = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng); 1080 // Retrieve the certificate ID. 1081 let certificate_id = certificate.id(); 1082 // Retrieve the round. 1083 let round = certificate.round(); 1084 // Retrieve the author of the batch. 1085 let author = certificate.author(); 1086 1087 // Construct the sample 'transmissions'. 1088 let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng); 1089 1090 // Insert the certificate. 1091 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions); 1092 // Ensure the certificate exists in storage. 1093 assert!(storage.contains_certificate(certificate_id)); 1094 // Ensure the certificate is stored in the correct round. 1095 assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() }); 1096 // Ensure the certificate is stored for the correct round and author. 1097 assert_eq!(storage.get_certificate_for_round_with_author(round, author), Some(certificate.clone())); 1098 1099 // Check that the underlying storage representation is correct. 1100 { 1101 // Construct the expected layout for 'rounds'. 1102 let rounds = [(round, indexset! { (certificate_id, author) })]; 1103 // Construct the expected layout for 'certificates'. 1104 let certificates = [(certificate_id, certificate.clone())]; 1105 // Construct the expected layout for 'batch_ids'. 1106 let batch_ids = [(certificate_id, round)]; 1107 // Assert the storage is well-formed. 1108 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions); 1109 } 1110 1111 // Retrieve the certificate. 1112 let candidate_certificate = storage.get_certificate(certificate_id).unwrap(); 1113 // Ensure the retrieved certificate is the same as the inserted certificate. 1114 assert_eq!(certificate, candidate_certificate); 1115 1116 // Remove the certificate. 1117 assert!(storage.remove_certificate(certificate_id)); 1118 // Ensure the certificate does not exist in storage. 1119 assert!(!storage.contains_certificate(certificate_id)); 1120 // Ensure the certificate is no longer stored in the round. 1121 assert!(storage.get_certificates_for_round(round).is_empty()); 1122 // Ensure the certificate is no longer stored for the round and author. 1123 assert_eq!(storage.get_certificate_for_round_with_author(round, author), None); 1124 // Ensure the storage is empty. 1125 assert_storage(&storage, &[], &[], &[], &Default::default()); 1126 } 1127 1128 #[test] 1129 fn test_certificate_duplicate() { 1130 let rng = &mut TestRng::default(); 1131 1132 // Sample a committee. 1133 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 1134 // Initialize the ledger. 1135 let ledger = Arc::new(MockLedgerService::new(committee)); 1136 // Initialize the storage. 1137 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1); 1138 1139 // Ensure the storage is empty. 1140 assert_storage(&storage, &[], &[], &[], &Default::default()); 1141 1142 // Create a new certificate. 1143 let certificate = alphavm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng); 1144 // Retrieve the certificate ID. 1145 let certificate_id = certificate.id(); 1146 // Retrieve the round. 1147 let round = certificate.round(); 1148 // Retrieve the author of the batch. 1149 let author = certificate.author(); 1150 1151 // Construct the expected layout for 'rounds'. 1152 let rounds = [(round, indexset! { (certificate_id, author) })]; 1153 // Construct the expected layout for 'certificates'. 1154 let certificates = [(certificate_id, certificate.clone())]; 1155 // Construct the expected layout for 'batch_ids'. 1156 let batch_ids = [(certificate_id, round)]; 1157 // Construct the sample 'transmissions'. 1158 let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng); 1159 1160 // Insert the certificate. 1161 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone()); 1162 // Ensure the certificate exists in storage. 1163 assert!(storage.contains_certificate(certificate_id)); 1164 // Check that the underlying storage representation is correct. 1165 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions); 1166 1167 // Insert the certificate again - without any missing transmissions. 1168 storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default()); 1169 // Ensure the certificate exists in storage. 1170 assert!(storage.contains_certificate(certificate_id)); 1171 // Check that the underlying storage representation remains unchanged. 1172 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions); 1173 1174 // Insert the certificate again - with all of the original missing transmissions. 1175 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions); 1176 // Ensure the certificate exists in storage. 1177 assert!(storage.contains_certificate(certificate_id)); 1178 // Check that the underlying storage representation remains unchanged. 1179 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions); 1180 } 1181 1182 /// Test that `check_incoming_certificate` does not reject a valid cert. 1183 #[test] 1184 fn test_valid_incoming_certificate() { 1185 let rng = &mut TestRng::default(); 1186 1187 // Sample a committee. 1188 let (committee, private_keys) = 1189 alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 5, rng); 1190 // Initialize the ledger. 1191 let ledger = Arc::new(MockLedgerService::new(committee)); 1192 // Initialize the storage. 1193 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1); 1194 1195 // Go through many rounds of valid certificates and ensure they're accepted. 1196 let mut previous_certs = IndexSet::default(); 1197 1198 for round in 1..=100 { 1199 let mut new_certs = IndexSet::default(); 1200 1201 // Generate one cert per validator 1202 for private_key in private_keys.iter() { 1203 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect(); 1204 1205 let certificate = sample_batch_certificate_for_round_with_committee( 1206 round, 1207 previous_certs.clone(), 1208 private_key, 1209 &other_keys, 1210 rng, 1211 ); 1212 storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected"); 1213 new_certs.insert(certificate.id()); 1214 1215 // Construct the sample 'transmissions'. 1216 let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng); 1217 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions); 1218 } 1219 1220 previous_certs = new_certs; 1221 } 1222 } 1223 1224 /// Make sure that we reject all certificates without sufficient signatures early. 1225 #[test] 1226 fn test_invalid_incoming_certificate_missing_signature() { 1227 let rng = &mut TestRng::default(); 1228 1229 // Sample a committee. 1230 let (committee, private_keys) = 1231 alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng); 1232 // Initialize the ledger. 1233 let ledger = Arc::new(MockLedgerService::new(committee)); 1234 // Initialize the storage. 1235 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1); 1236 1237 // Go through many rounds of valid certificates and ensure they're accepted. 1238 let mut previous_certs = IndexSet::default(); 1239 1240 for round in 1..=5 { 1241 let mut new_certs = IndexSet::default(); 1242 1243 // Generate one cert per validator 1244 for private_key in private_keys.iter() { 1245 if round < 5 { 1246 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect(); 1247 1248 let certificate = sample_batch_certificate_for_round_with_committee( 1249 round, 1250 previous_certs.clone(), 1251 private_key, 1252 &other_keys, 1253 rng, 1254 ); 1255 storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected"); 1256 new_certs.insert(certificate.id()); 1257 1258 // Construct the sample 'transmissions'. 1259 let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng); 1260 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions); 1261 } else { 1262 // Pick a few signers, but not enough to form a quorum. 1263 let other_keys: Vec<_> = private_keys[0..=3].iter().cloned().filter(|k| k != private_key).collect(); 1264 1265 let certificate = sample_batch_certificate_for_round_with_committee( 1266 round, 1267 previous_certs.clone(), 1268 private_key, 1269 &other_keys, 1270 rng, 1271 ); 1272 assert!(storage.check_incoming_certificate(&certificate).is_err()); 1273 } 1274 } 1275 1276 previous_certs = new_certs; 1277 } 1278 } 1279 1280 /// Verify that `insert_certificate` rejects certs with less edges than required. 1281 #[test] 1282 fn test_invalid_certificate_insufficient_previous_certs() { 1283 let rng = &mut TestRng::default(); 1284 1285 // Sample a committee. 1286 let (committee, private_keys) = 1287 alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng); 1288 // Initialize the ledger. 1289 let ledger = Arc::new(MockLedgerService::new(committee)); 1290 // Initialize the storage. 1291 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1); 1292 1293 // Go through many rounds of valid certificates and ensure they're accepted. 1294 let mut previous_certs = IndexSet::default(); 1295 1296 for round in 1..=6 { 1297 let mut new_certs = IndexSet::default(); 1298 1299 // Generate one cert per validator 1300 for private_key in private_keys.iter() { 1301 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect(); 1302 1303 let certificate = sample_batch_certificate_for_round_with_committee( 1304 round, 1305 previous_certs.clone(), 1306 private_key, 1307 &other_keys, 1308 rng, 1309 ); 1310 1311 // Construct the sample 'transmissions'. 1312 let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng); 1313 let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect(); 1314 1315 if round <= 5 { 1316 new_certs.insert(certificate.id()); 1317 storage 1318 .insert_certificate(certificate, transmissions, Default::default()) 1319 .expect("Valid certificate rejected"); 1320 } else { 1321 assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err()); 1322 } 1323 } 1324 1325 if round < 5 { 1326 previous_certs = new_certs; 1327 } else { 1328 // Remove more than half of the previous certs. 1329 previous_certs = new_certs.into_iter().skip(6).collect(); 1330 } 1331 } 1332 } 1333 1334 /// Verify that `insert_certificate` rejects certs that do not increment the round number. 1335 #[test] 1336 fn test_invalid_certificate_wrong_round_number() { 1337 let rng = &mut TestRng::default(); 1338 1339 // Sample a committee. 1340 let (committee, private_keys) = 1341 alphavm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng); 1342 // Initialize the ledger. 1343 let ledger = Arc::new(MockLedgerService::new(committee)); 1344 // Initialize the storage. 1345 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1); 1346 1347 // Go through many rounds of valid certificates and ensure they're accepted. 1348 let mut previous_certs = IndexSet::default(); 1349 1350 for round in 1..=6 { 1351 let mut new_certs = IndexSet::default(); 1352 1353 // Generate one cert per validator 1354 for private_key in private_keys.iter() { 1355 let cert_round = round.min(5); // In the sixth round, do not increment 1356 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect(); 1357 1358 let certificate = sample_batch_certificate_for_round_with_committee( 1359 cert_round, 1360 previous_certs.clone(), 1361 private_key, 1362 &other_keys, 1363 rng, 1364 ); 1365 1366 // Construct the sample 'transmissions'. 1367 let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng); 1368 let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect(); 1369 1370 if round <= 5 { 1371 new_certs.insert(certificate.id()); 1372 storage 1373 .insert_certificate(certificate, transmissions, Default::default()) 1374 .expect("Valid certificate rejected"); 1375 } else { 1376 assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err()); 1377 } 1378 } 1379 1380 if round < 5 { 1381 previous_certs = new_certs; 1382 } else { 1383 // Remove more than half of the previous certs. 1384 previous_certs = new_certs.into_iter().skip(6).collect(); 1385 } 1386 } 1387 } 1388 } 1389 1390 #[cfg(test)] 1391 pub mod prop_tests { 1392 use super::*; 1393 use crate::helpers::{now, storage::tests::assert_storage}; 1394 use alphaos_node_bft_ledger_service::MockLedgerService; 1395 use alphaos_node_bft_storage_service::BFTMemoryService; 1396 use alphavm::{ 1397 ledger::{ 1398 committee::prop_tests::{CommitteeContext, ValidatorSet}, 1399 narwhal::{BatchHeader, Data}, 1400 puzzle::SolutionID, 1401 }, 1402 prelude::{Signature, Uniform}, 1403 }; 1404 1405 use ::bytes::Bytes; 1406 use indexmap::indexset; 1407 use proptest::{ 1408 collection, 1409 prelude::{any, Arbitrary, BoxedStrategy, Just, Strategy}, 1410 prop_oneof, 1411 sample::{size_range, Selector}, 1412 test_runner::TestRng, 1413 }; 1414 use rand::{CryptoRng, Error, Rng, RngCore}; 1415 use std::fmt::Debug; 1416 use test_strategy::proptest; 1417 1418 type CurrentNetwork = alphavm::prelude::MainnetV0; 1419 1420 impl Arbitrary for Storage<CurrentNetwork> { 1421 type Parameters = CommitteeContext; 1422 type Strategy = BoxedStrategy<Storage<CurrentNetwork>>; 1423 1424 fn arbitrary() -> Self::Strategy { 1425 (any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64) 1426 .prop_map(|(CommitteeContext(committee, _), gc_rounds)| { 1427 let ledger = Arc::new(MockLedgerService::new(committee)); 1428 Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds) 1429 }) 1430 .boxed() 1431 } 1432 1433 fn arbitrary_with(context: Self::Parameters) -> Self::Strategy { 1434 (Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64) 1435 .prop_map(|(CommitteeContext(committee, _), gc_rounds)| { 1436 let ledger = Arc::new(MockLedgerService::new(committee)); 1437 Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds) 1438 }) 1439 .boxed() 1440 } 1441 } 1442 1443 // The `proptest::TestRng` doesn't implement `rand_core::CryptoRng` trait which is required in alphavm, so we use a wrapper 1444 #[derive(Debug)] 1445 pub struct CryptoTestRng(TestRng); 1446 1447 impl Arbitrary for CryptoTestRng { 1448 type Parameters = (); 1449 type Strategy = BoxedStrategy<CryptoTestRng>; 1450 1451 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { 1452 Just(0).prop_perturb(|_, rng| CryptoTestRng(rng)).boxed() 1453 } 1454 } 1455 impl RngCore for CryptoTestRng { 1456 fn next_u32(&mut self) -> u32 { 1457 self.0.next_u32() 1458 } 1459 1460 fn next_u64(&mut self) -> u64 { 1461 self.0.next_u64() 1462 } 1463 1464 fn fill_bytes(&mut self, dest: &mut [u8]) { 1465 self.0.fill_bytes(dest); 1466 } 1467 1468 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> { 1469 self.0.try_fill_bytes(dest) 1470 } 1471 } 1472 1473 impl CryptoRng for CryptoTestRng {} 1474 1475 #[derive(Debug, Clone)] 1476 pub struct AnyTransmission(pub Transmission<CurrentNetwork>); 1477 1478 impl Arbitrary for AnyTransmission { 1479 type Parameters = (); 1480 type Strategy = BoxedStrategy<AnyTransmission>; 1481 1482 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { 1483 any_transmission().prop_map(AnyTransmission).boxed() 1484 } 1485 } 1486 1487 #[derive(Debug, Clone)] 1488 pub struct AnyTransmissionID(pub TransmissionID<CurrentNetwork>); 1489 1490 impl Arbitrary for AnyTransmissionID { 1491 type Parameters = (); 1492 type Strategy = BoxedStrategy<AnyTransmissionID>; 1493 1494 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { 1495 any_transmission_id().prop_map(AnyTransmissionID).boxed() 1496 } 1497 } 1498 1499 fn any_transmission() -> BoxedStrategy<Transmission<CurrentNetwork>> { 1500 prop_oneof![ 1501 (collection::vec(any::<u8>(), 512..=512)) 1502 .prop_map(|bytes| Transmission::Solution(Data::Buffer(Bytes::from(bytes)))), 1503 (collection::vec(any::<u8>(), 2048..=2048)) 1504 .prop_map(|bytes| Transmission::Transaction(Data::Buffer(Bytes::from(bytes)))), 1505 ] 1506 .boxed() 1507 } 1508 1509 pub fn any_solution_id() -> BoxedStrategy<SolutionID<CurrentNetwork>> { 1510 Just(0).prop_perturb(|_, rng| CryptoTestRng(rng).r#gen::<u64>().into()).boxed() 1511 } 1512 1513 pub fn any_transaction_id() -> BoxedStrategy<<CurrentNetwork as Network>::TransactionID> { 1514 Just(0) 1515 .prop_perturb(|_, rng| { 1516 <CurrentNetwork as Network>::TransactionID::from(Field::rand(&mut CryptoTestRng(rng))) 1517 }) 1518 .boxed() 1519 } 1520 1521 pub fn any_transmission_id() -> BoxedStrategy<TransmissionID<CurrentNetwork>> { 1522 prop_oneof![ 1523 any_transaction_id().prop_perturb(|id, mut rng| TransmissionID::Transaction( 1524 id, 1525 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>() 1526 )), 1527 any_solution_id().prop_perturb(|id, mut rng| TransmissionID::Solution( 1528 id, 1529 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>() 1530 )), 1531 ] 1532 .boxed() 1533 } 1534 1535 pub fn sign_batch_header<R: Rng + CryptoRng>( 1536 validator_set: &ValidatorSet, 1537 batch_header: &BatchHeader<CurrentNetwork>, 1538 rng: &mut R, 1539 ) -> IndexSet<Signature<CurrentNetwork>> { 1540 let mut signatures = IndexSet::with_capacity(validator_set.0.len()); 1541 for validator in validator_set.0.iter() { 1542 let private_key = validator.private_key; 1543 signatures.insert(private_key.sign(&[batch_header.batch_id()], rng).unwrap()); 1544 } 1545 signatures 1546 } 1547 1548 #[proptest] 1549 fn test_certificate_duplicate( 1550 context: CommitteeContext, 1551 #[any(size_range(1..16).lift())] transmissions: Vec<(AnyTransmissionID, AnyTransmission)>, 1552 mut rng: CryptoTestRng, 1553 selector: Selector, 1554 ) { 1555 let CommitteeContext(committee, ValidatorSet(validators)) = context; 1556 let committee_id = committee.id(); 1557 1558 // Initialize the storage. 1559 let ledger = Arc::new(MockLedgerService::new(committee)); 1560 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1); 1561 1562 // Ensure the storage is empty. 1563 assert_storage(&storage, &[], &[], &[], &Default::default()); 1564 1565 // Create a new certificate. 1566 let signer = selector.select(&validators); 1567 1568 let mut transmission_map = IndexMap::new(); 1569 1570 for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter() { 1571 transmission_map.insert(*id, t.clone()); 1572 } 1573 1574 let batch_header = BatchHeader::new( 1575 &signer.private_key, 1576 0, 1577 now(), 1578 committee_id, 1579 transmission_map.keys().cloned().collect(), 1580 Default::default(), 1581 &mut rng, 1582 ) 1583 .unwrap(); 1584 1585 // Remove the author from the validator set passed to create the batch 1586 // certificate, the author should not sign their own batch. 1587 let mut validators = validators.clone(); 1588 validators.remove(signer); 1589 1590 let certificate = BatchCertificate::from( 1591 batch_header.clone(), 1592 sign_batch_header(&ValidatorSet(validators), &batch_header, &mut rng), 1593 ) 1594 .unwrap(); 1595 1596 // Retrieve the certificate ID. 1597 let certificate_id = certificate.id(); 1598 let mut internal_transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new(); 1599 for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter().cloned() { 1600 internal_transmissions.entry(id).or_insert((t, Default::default())).1.insert(certificate_id); 1601 } 1602 1603 // Retrieve the round. 1604 let round = certificate.round(); 1605 // Retrieve the author of the batch. 1606 let author = certificate.author(); 1607 1608 // Construct the expected layout for 'rounds'. 1609 let rounds = [(round, indexset! { (certificate_id, author) })]; 1610 // Construct the expected layout for 'certificates'. 1611 let certificates = [(certificate_id, certificate.clone())]; 1612 // Construct the expected layout for 'batch_ids'. 1613 let batch_ids = [(certificate_id, round)]; 1614 1615 // Insert the certificate. 1616 let missing_transmissions: HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>> = 1617 transmission_map.into_iter().collect(); 1618 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone()); 1619 // Ensure the certificate exists in storage. 1620 assert!(storage.contains_certificate(certificate_id)); 1621 // Check that the underlying storage representation is correct. 1622 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions); 1623 1624 // Insert the certificate again - without any missing transmissions. 1625 storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default()); 1626 // Ensure the certificate exists in storage. 1627 assert!(storage.contains_certificate(certificate_id)); 1628 // Check that the underlying storage representation remains unchanged. 1629 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions); 1630 1631 // Insert the certificate again - with all of the original missing transmissions. 1632 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions); 1633 // Ensure the certificate exists in storage. 1634 assert!(storage.contains_certificate(certificate_id)); 1635 // Check that the underlying storage representation remains unchanged. 1636 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions); 1637 } 1638 }