mod.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::{ 20 events::DataBlocks, 21 helpers::{fmt_id, max_redundant_requests, BFTSender, Pending, Storage, SyncReceiver}, 22 spawn_blocking, 23 Gateway, 24 Transport, 25 MAX_FETCH_TIMEOUT_IN_MS, 26 }; 27 use alphaos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; 28 use alphaos_node_bft_ledger_service::LedgerService; 29 use alphaos_node_network::PeerPoolHandling; 30 use alphaos_node_sync::{locators::BlockLocators, BlockSync, Ping, PrepareSyncRequest, BLOCK_REQUEST_BATCH_DELAY}; 31 32 use alphavm::{ 33 console::{ 34 network::{ConsensusVersion, Network}, 35 types::Field, 36 }, 37 ledger::{authority::Authority, block::Block, narwhal::BatchCertificate, PendingBlock}, 38 utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error}, 39 }; 40 41 use anyhow::{anyhow, bail, ensure, Context, Result}; 42 use indexmap::IndexMap; 43 #[cfg(feature = "locktick")] 44 use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex}; 45 #[cfg(not(feature = "locktick"))] 46 use parking_lot::Mutex; 47 #[cfg(not(feature = "serial"))] 48 use rayon::prelude::*; 49 use std::{ 50 collections::{HashMap, VecDeque}, 51 future::Future, 52 net::SocketAddr, 53 sync::Arc, 54 time::Duration, 55 }; 56 #[cfg(not(feature = "locktick"))] 57 use tokio::sync::Mutex as TMutex; 58 use tokio::{ 59 sync::{oneshot, OnceCell}, 60 task::JoinHandle, 61 }; 62 63 /// Block synchronization logic for validators. 64 /// 65 /// Synchronization works differently for nodes that act as validators in AlphaBFT; 66 /// In the common case, validators generate blocks after receiving an anchor block that has been accepted 67 /// by a supermajority of the committee instead of fetching entire blocks from other nodes. 68 /// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes. 69 /// 70 /// This struct also manages fetching certificates from other validators during normal operation, 71 /// and blocks when falling behind. 72 /// 73 /// Finally, `Sync` handles synchronization of blocks with the validator's local storage: 74 /// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them. 75 #[derive(Clone)] 76 pub struct Sync<N: Network> { 77 /// The gateway enables communication with other validators. 78 gateway: Gateway<N>, 79 /// The storage. 80 storage: Storage<N>, 81 /// The ledger service. 82 ledger: Arc<dyn LedgerService<N>>, 83 /// The block synchronization logic. 84 block_sync: Arc<BlockSync<N>>, 85 /// The pending certificates queue. 86 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>, 87 /// The BFT sender. 88 bft_sender: Arc<OnceCell<BFTSender<N>>>, 89 /// Handles to the spawned background tasks. 90 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 91 /// The response lock. 92 response_lock: Arc<TMutex<()>>, 93 /// The sync lock. Ensures that only one task syncs the ledger at a time. 94 sync_lock: Arc<TMutex<()>>, 95 /// The latest block responses. 96 /// 97 /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks 98 /// whose addition to the ledger is deferred until certain checks pass. 99 /// Blocks need to be processed in order, hence a BTree map. 100 /// 101 /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called. 102 pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>, 103 } 104 105 impl<N: Network> Sync<N> { 106 /// The maximum time to wait for peer updates before timing out and attempting to issue new requests. 107 /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates. 108 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30); 109 110 /// Initializes a new sync instance. 111 pub fn new( 112 gateway: Gateway<N>, 113 storage: Storage<N>, 114 ledger: Arc<dyn LedgerService<N>>, 115 block_sync: Arc<BlockSync<N>>, 116 ) -> Self { 117 // Return the sync instance. 118 Self { 119 gateway, 120 storage, 121 ledger, 122 block_sync, 123 pending: Default::default(), 124 bft_sender: Default::default(), 125 handles: Default::default(), 126 response_lock: Default::default(), 127 sync_lock: Default::default(), 128 pending_blocks: Default::default(), 129 } 130 } 131 132 /// Initializes the sync module and sync the storage with the ledger at bootup. 133 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> { 134 // If a BFT sender was provided, set it. 135 if let Some(bft_sender) = bft_sender { 136 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway"); 137 } 138 139 info!("Syncing storage with the ledger..."); 140 141 // Sync the storage with the ledger. 142 self.sync_storage_with_ledger_at_bootup() 143 .await 144 .with_context(|| "Syncing storage with the ledger at bootup failed")?; 145 146 debug!("Finished initial block synchronization at startup"); 147 Ok(()) 148 } 149 150 /// Sends the given batch of block requests to peers. 151 /// 152 /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`. 153 #[inline] 154 async fn send_block_requests( 155 &self, 156 157 block_requests: Vec<(u32, PrepareSyncRequest<N>)>, 158 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>, 159 ) { 160 trace!("Prepared {num_requests} block requests", num_requests = block_requests.len()); 161 162 // Sends the block requests to the sync peers. 163 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) { 164 if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await { 165 // Stop if we fail to process a batch of requests. 166 break; 167 } 168 169 // Sleep to avoid triggering spam detection. 170 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; 171 } 172 } 173 174 /// Starts the sync module. 175 /// 176 /// When this function returns successfully, the sync module will have spawned background tasks 177 /// that fetch blocks from other validators. 178 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> { 179 info!("Starting the sync module..."); 180 181 // Start the block request generation loop (outgoing). 182 let self_ = self.clone(); 183 self.spawn(async move { 184 loop { 185 // Wait for peer updates or timeout 186 let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await; 187 188 // Issue block requests to peers. 189 self_.try_issuing_block_requests().await; 190 191 // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here. 192 } 193 }); 194 195 // Start the block response processing loop (incoming). 196 let self_ = self.clone(); 197 let ping = ping.clone(); 198 self.spawn(async move { 199 loop { 200 // Wait until there is something to do or until the timeout. 201 let _ = 202 tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await; 203 204 let ping = ping.clone(); 205 let self_ = self_.clone(); 206 let hdl = tokio::spawn(async move { 207 self_.try_advancing_block_synchronization(&ping).await; 208 }); 209 210 if let Err(err) = hdl.await { 211 if let Ok(panic) = err.try_into_panic() { 212 error!("Sync block advancement panicked: {panic:?}"); 213 } 214 } 215 216 // We perform no additional rate limiting here as 217 // requests are already rate-limited. 218 } 219 }); 220 221 // Start the pending queue expiration loop. 222 let self_ = self.clone(); 223 self.spawn(async move { 224 loop { 225 // Sleep briefly. 226 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; 227 228 // Remove the expired pending transmission requests. 229 let self__ = self_.clone(); 230 let _ = spawn_blocking!({ 231 self__.pending.clear_expired_callbacks(); 232 Ok(()) 233 }); 234 } 235 }); 236 237 /* Set up callbacks for events from the Gateway */ 238 239 // Retrieve the sync receiver. 240 let SyncReceiver { 241 mut rx_block_sync_insert_block_response, 242 mut rx_block_sync_remove_peer, 243 mut rx_block_sync_update_peer_locators, 244 mut rx_certificate_request, 245 mut rx_certificate_response, 246 } = sync_receiver; 247 248 // Process the block sync request to advance with sync blocks. 249 // Each iteration of this loop is triggered by an incoming [`BlockResponse`], 250 // which is initially handled by [`Gateway::inbound()`], 251 // which calls [`SyncSender::advance_with_sync_blocks()`], 252 // which calls [`tx_block_sync_advance_with_sync_blocks.send()`], 253 // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return. 254 let self_ = self.clone(); 255 self.spawn(async move { 256 while let Some((peer_ip, blocks, latest_consensus_version, callback)) = 257 rx_block_sync_insert_block_response.recv().await 258 { 259 let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await; 260 //TODO remove this once channels are gone 261 if let Err(err) = &result { 262 warn!("Failed to insret block response: {err:?}"); 263 } 264 265 callback.send(result).ok(); 266 } 267 }); 268 269 // Process the block sync request to remove the peer. 270 let self_ = self.clone(); 271 self.spawn(async move { 272 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await { 273 self_.remove_peer(peer_ip); 274 } 275 }); 276 277 // Process each block sync request to update peer locators. 278 // Each iteration of this loop is triggered by an incoming [`PrimaryPing`], 279 // which is initially handled by [`Gateway::inbound()`], 280 // which calls [`SyncSender::update_peer_locators()`], 281 // which calls [`tx_block_sync_update_peer_locators.send()`], 282 // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return. 283 let self_ = self.clone(); 284 self.spawn(async move { 285 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await { 286 let self_clone = self_.clone(); 287 tokio::spawn(async move { 288 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok(); 289 }); 290 } 291 }); 292 293 // Process each certificate request. 294 // Each iteration of this loop is triggered by an incoming [`CertificateRequest`], 295 // which is initially handled by [`Gateway::inbound()`], 296 // which calls [`tx_certificate_request.send()`], 297 // which causes the `rx_certificate_request.recv()` call below to return. 298 let self_ = self.clone(); 299 self.spawn(async move { 300 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await { 301 self_.send_certificate_response(peer_ip, certificate_request); 302 } 303 }); 304 305 // Process each certificate response. 306 // Each iteration of this loop is triggered by an incoming [`CertificateResponse`], 307 // which is initially handled by [`Gateway::inbound()`], 308 // which calls [`tx_certificate_response.send()`], 309 // which causes the `rx_certificate_response.recv()` call below to return. 310 let self_ = self.clone(); 311 self.spawn(async move { 312 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await { 313 self_.finish_certificate_request(peer_ip, certificate_response); 314 } 315 }); 316 317 Ok(()) 318 } 319 320 /// BFT-specific version of `Client::try_issuing_block_requests()`. 321 /// 322 /// This method handles timeout removal, checks if block sync is possible, 323 /// and issues block requests to peers. 324 async fn try_issuing_block_requests(&self) { 325 // Update the sync height to the latest ledger height. 326 // (if the ledger height is lower or equal to the current sync height, this is a noop) 327 self.block_sync.set_sync_height(self.ledger.latest_block_height()); 328 329 // Check if any existing requests can be removed. 330 // We should do this even if we cannot block sync, to ensure 331 // there are no dangling block requests. 332 match self.block_sync.handle_block_request_timeouts(&self.gateway) { 333 Ok(Some((requests, sync_peers))) => { 334 // Re-request blocks instead of performing regular block sync. 335 self.send_block_requests(requests, sync_peers).await; 336 return; 337 } 338 Ok(None) => {} 339 Err(err) => { 340 // Abort and retry later. 341 error!("{}", &flatten_error(err)); 342 return; 343 } 344 } 345 346 // Do not attempt to sync if there are no blocks to sync. 347 // This prevents redundant log messages and performing unnecessary computation. 348 if !self.block_sync.can_block_sync() { 349 return; 350 } 351 352 // Prepare the block requests, if any. 353 // In the process, we update the state of `is_block_synced` for the sync module. 354 let (requests, sync_peers) = self.block_sync.prepare_block_requests(); 355 356 // If there are no block requests, return early. 357 if requests.is_empty() { 358 return; 359 } 360 361 // Send the block requests to peers. 362 self.send_block_requests(requests, sync_peers).await; 363 } 364 365 /// Test-only method to manually trigger block synchronization. 366 /// This combines both request generation and response processing for testing purposes. 367 #[cfg(test)] 368 pub(crate) async fn testing_only_try_block_sync_testing_only(&self) { 369 // First try issuing block requests 370 self.try_issuing_block_requests().await; 371 372 // Then try advancing with any available responses 373 self.try_advancing_block_synchronization(&None).await; 374 } 375 } 376 377 // Callbacks used when receiving messages from the Gateway 378 impl<N: Network> Sync<N> { 379 /// We received a block response and can (possibly) advance synchronization. 380 async fn insert_block_response( 381 &self, 382 peer_ip: SocketAddr, 383 blocks: Vec<Block<N>>, 384 latest_consensus_version: Option<ConsensusVersion>, 385 ) -> Result<()> { 386 // Verify that the response is valid and add it to block sync. 387 self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) 388 389 // No need to advance block sync here, as the new response will 390 // notify the incoming task. 391 } 392 393 /// We received new peer locators during a Ping. 394 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> { 395 self.block_sync.update_peer_locators(peer_ip, &locators) 396 } 397 398 /// A peer disconnected. 399 fn remove_peer(&self, peer_ip: SocketAddr) { 400 self.block_sync.remove_peer(&peer_ip); 401 } 402 403 #[cfg(test)] 404 pub fn testing_only_update_peer_locators_testing_only( 405 &self, 406 peer_ip: SocketAddr, 407 locators: BlockLocators<N>, 408 ) -> Result<()> { 409 self.update_peer_locators(peer_ip, locators) 410 } 411 } 412 413 // Methods to manage storage. 414 impl<N: Network> Sync<N> { 415 /// Syncs the storage with the ledger at bootup. 416 /// 417 /// This is called when starting the validator and after finishing a sync without BFT. 418 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> { 419 // Retrieve the latest block in the ledger. 420 let latest_block = self.ledger.latest_block(); 421 422 // Retrieve the block height. 423 let block_height = latest_block.height(); 424 // Determine the maximum number of blocks corresponding to rounds 425 // that would not have been garbage collected, i.e. that would be kept in storage. 426 // Since at most one block is created every two rounds, 427 // this is half of the maximum number of rounds kept in storage. 428 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2); 429 // Determine the earliest height of blocks corresponding to rounds kept in storage, 430 // conservatively set to the block height minus the maximum number of blocks calculated above. 431 // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded. 432 let gc_height = block_height.saturating_sub(max_gc_blocks); 433 // Retrieve the blocks. 434 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?; 435 436 // Acquire the sync lock. 437 let _lock = self.sync_lock.lock().await; 438 439 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1)); 440 441 /* Sync storage */ 442 443 // Sync the height with the block. 444 self.storage.sync_height_with_block(latest_block.height()); 445 // Sync the round with the block. 446 self.storage.sync_round_with_block(latest_block.round()); 447 // Perform GC on the latest block round. 448 self.storage.garbage_collect_certificates(latest_block.round()); 449 // Iterate over the blocks. 450 for block in &blocks { 451 // If the block authority is a sub-DAG, then sync the batch certificates with the block. 452 // Note that the block authority is always a sub-DAG in production; 453 // beacon signatures are only used for testing, 454 // and as placeholder (irrelevant) block authority in the genesis block. 455 if let Authority::Quorum(subdag) = block.authority() { 456 // Reconstruct the unconfirmed transactions. 457 let unconfirmed_transactions = cfg_iter!(block.transactions()) 458 .filter_map(|tx| { 459 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok() 460 }) 461 .collect::<HashMap<_, _>>(); 462 463 // Iterate over the certificates. 464 for certificates in subdag.values().cloned() { 465 cfg_into_iter!(certificates).for_each(|certificate| { 466 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions); 467 }); 468 } 469 470 // Update the validator telemetry. 471 #[cfg(feature = "telemetry")] 472 self.gateway.validator_telemetry().insert_subdag(subdag); 473 } 474 } 475 476 /* Sync the BFT DAG */ 477 478 // Construct a list of the certificates. 479 let certificates = blocks 480 .iter() 481 .flat_map(|block| { 482 match block.authority() { 483 // If the block authority is a beacon, then skip the block. 484 Authority::Beacon(_) => None, 485 // If the block authority is a subdag, then retrieve the certificates. 486 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()), 487 } 488 }) 489 .flatten() 490 .collect::<Vec<_>>(); 491 492 // If a BFT sender was provided, send the certificates to the BFT. 493 if let Some(bft_sender) = self.bft_sender.get() { 494 // Await the callback to continue. 495 bft_sender 496 .tx_sync_bft_dag_at_bootup 497 .send(certificates) 498 .await 499 .with_context(|| "Failed to update the BFT DAG from sync")?; 500 } 501 502 self.block_sync.set_sync_height(block_height); 503 504 Ok(()) 505 } 506 507 /// Returns which height we are synchronized to. 508 /// If there are queued block responses, this might be higher than the latest block in the ledger. 509 async fn compute_sync_height(&self) -> u32 { 510 let ledger_height = self.ledger.latest_block_height(); 511 let mut pending_blocks = self.pending_blocks.lock().await; 512 513 // Remove any old responses. 514 while let Some(b) = pending_blocks.front() { 515 if b.height() <= ledger_height { 516 pending_blocks.pop_front(); 517 } else { 518 break; 519 } 520 } 521 522 // Ensure the returned value is always greater or equal than ledger height. 523 pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height) 524 } 525 526 /// BFT-version of [`alphaos_node_client::Client::try_advancing_block_synchronization`]. 527 async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) { 528 // Process block responses and advance the ledger. 529 let new_blocks = match self 530 .try_advancing_block_synchronization_inner() 531 .await 532 .with_context(|| "Block synchronization failed") 533 { 534 Ok(new_blocks) => new_blocks, 535 Err(err) => { 536 error!("{}", &flatten_error(err)); 537 false 538 } 539 }; 540 541 if let Some(ping) = &ping { 542 if new_blocks { 543 match self.get_block_locators() { 544 Ok(locators) => ping.update_block_locators(locators), 545 Err(err) => error!("Failed to update block locators: {err}"), 546 } 547 } 548 } 549 } 550 551 /// Aims to advance synchronization using any recent block responses received from peers. 552 /// 553 /// This is the validator's version of `BlockSync::try_advancing_block_synchronization` 554 /// and is called periodically at runtime. 555 /// 556 /// This returns Ok(true) if we successfully advanced the ledger by at least one new block. 557 /// 558 /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network. 559 /// If blocks are not confirmed yet, they will be kept in [`Self::pending_blocks`]. 560 /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected 561 /// (see [`Self::sync_storage_with_block`] for more details). 562 /// 563 /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead, 564 /// which syncs without updating the BFT state. 565 async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> { 566 // Acquire the response lock. 567 let _lock = self.response_lock.lock().await; 568 569 // For sanity, set the sync height again. 570 // (if the sync height is already larger or equal, this is a noop) 571 let ledger_height = self.ledger.latest_block_height(); 572 self.block_sync.set_sync_height(ledger_height); 573 574 // Retrieve the maximum block height of the peers. 575 let tip = self 576 .block_sync 577 .find_sync_peers() 578 .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0)) 579 .unwrap_or(0); 580 581 // Determine the maximum number of blocks corresponding to rounds 582 // that would not have been garbage collected, i.e. that would be kept in storage. 583 // Since at most one block is created every two rounds, 584 // this is half of the maximum number of rounds kept in storage. 585 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2); 586 587 // Updates sync state and returns the error (if any). 588 let cleanup = |start_height, current_height, error| { 589 let new_blocks = current_height > start_height; 590 591 // Make the underlying `BlockSync` instance aware of the new sync height. 592 if new_blocks { 593 self.block_sync.set_sync_height(current_height); 594 } 595 596 if let Some(err) = error { 597 Err(err) 598 } else { 599 Ok(new_blocks) 600 } 601 }; 602 603 // Determine the earliest height of blocks corresponding to rounds kept in storage, 604 // conservatively set to the block height minus the maximum number of blocks calculated above. 605 // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded. 606 let max_gc_height = tip.saturating_sub(max_gc_blocks); 607 let within_gc = (ledger_height + 1) > max_gc_height; 608 609 if within_gc { 610 // Retrieve the current height, based on the ledger height and the 611 // (unconfirmed) blocks that are already queued up. 612 let start_height = self.compute_sync_height().await; 613 614 // The height is incremented as blocks are added. 615 let mut current_height = start_height; 616 trace!( 617 "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})", 618 self.block_sync.get_sync_speed() 619 ); 620 621 // If we already were within GC or successfully caught up with GC, try to advance BFT normally again. 622 loop { 623 let next_height = current_height + 1; 624 let Some(block) = self.block_sync.peek_next_block(next_height) else { 625 break; 626 }; 627 info!("Syncing the BFT to block {}...", block.height()); 628 // Sync the storage with the block. 629 match self.sync_storage_with_block(block).await { 630 Ok(_) => { 631 // Update the current height if sync succeeds. 632 current_height = next_height; 633 } 634 Err(err) => { 635 // Mark the current height as processed in block_sync. 636 self.block_sync.remove_block_response(next_height); 637 return cleanup(start_height, current_height, Some(err)); 638 } 639 } 640 } 641 642 cleanup(start_height, current_height, None) 643 } else { 644 // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately 645 // added to it and not queue up in `latest_block_responses`. 646 let start_height = ledger_height; 647 let mut current_height = start_height; 648 649 trace!("Try advancing block responses without BFT (starting at block {current_height})"); 650 651 // Try to advance the ledger *to tip* without updating the BFT. 652 // TODO(kaimast): why to tip and not to tip-GC? 653 loop { 654 let next_height = current_height + 1; 655 656 let Some(block) = self.block_sync.peek_next_block(next_height) else { 657 break; 658 }; 659 info!("Syncing the ledger to block {}...", block.height()); 660 661 // Sync the ledger with the block without BFT. 662 match self.sync_ledger_with_block_without_bft(block).await { 663 Ok(_) => { 664 // Update the current height if sync succeeds. 665 current_height = next_height; 666 self.block_sync.count_request_completed(); 667 } 668 Err(err) => { 669 // Mark the current height as processed in block_sync. 670 self.block_sync.remove_block_response(next_height); 671 return cleanup(start_height, current_height, Some(err)); 672 } 673 } 674 } 675 676 // Sync the storage with the ledger if we should transition to the BFT sync. 677 let within_gc = (current_height + 1) > max_gc_height; 678 if within_gc { 679 info!("Finished catching up with the network. Switching back to BFT sync."); 680 self.sync_storage_with_ledger_at_bootup() 681 .await 682 .with_context(|| "BFT sync (with bootup routine) failed")?; 683 } 684 685 cleanup(start_height, current_height, None) 686 } 687 } 688 689 /// Syncs the ledger with the given block without updating the BFT. 690 /// 691 /// This is only used by `[Self::try_advancing_block_synchronization`]. 692 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> { 693 // Acquire the sync lock. 694 let _lock = self.sync_lock.lock().await; 695 696 let self_ = self.clone(); 697 spawn_blocking!({ 698 // Check the next block. 699 self_.ledger.check_next_block(&block)?; 700 // Attempt to advance to the next block. 701 self_.ledger.advance_to_next_block(&block)?; 702 703 // Sync the height with the block. 704 self_.storage.sync_height_with_block(block.height()); 705 // Sync the round with the block. 706 self_.storage.sync_round_with_block(block.round()); 707 // Mark the block height as processed in block_sync. 708 self_.block_sync.remove_block_response(block.height()); 709 710 Ok(()) 711 }) 712 } 713 714 /// Helper function for [`Self::sync_storage_with_block`]. 715 /// It syncs the batch certificates with the BFT, if the block's authority is a sub-DAG. 716 /// 717 /// Note that the block authority is always a sub-DAG in production; beacon signatures are only used for testing, 718 /// and as placeholder (irrelevant) block authority in the genesis block. 719 async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> { 720 // Nothing to do if this is a beacon block 721 let Authority::Quorum(subdag) = block.authority() else { 722 return Ok(()); 723 }; 724 725 // Reconstruct the unconfirmed transactions. 726 let unconfirmed_transactions = cfg_iter!(block.transactions()) 727 .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()) 728 .collect::<HashMap<_, _>>(); 729 730 // Iterate over the certificates. 731 for certificates in subdag.values().cloned() { 732 cfg_into_iter!(certificates.clone()).for_each(|certificate| { 733 // Sync the batch certificate with the block. 734 self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions); 735 }); 736 737 // Sync the BFT DAG with the certificates. 738 for certificate in certificates { 739 // If a BFT sender was provided, send the certificate to the BFT. 740 // For validators, BFT spawns a receiver task in `BFT::start_handlers`. 741 if let Some(bft_sender) = self.bft_sender.get() { 742 let (callback_tx, callback_rx) = oneshot::channel(); 743 bft_sender 744 .tx_sync_bft 745 .send((certificate, callback_tx)) 746 .await 747 .with_context(|| "Failed to sync certificate")?; 748 callback_rx.await?.with_context(|| "Failed to sync certificate")?; 749 } 750 } 751 } 752 Ok(()) 753 } 754 755 /// Helper function for [`Self::sync_storage_with_block`]. 756 /// 757 /// It checks that successor of a given block contains enough votes to commit it. 758 /// This can only return `Ok(true)` if the certificates of the block's successor were added to the storage. 759 fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> { 760 // Fetch the leader certificate and the relevant rounds. 761 let leader_certificate = match block.authority() { 762 Authority::Quorum(subdag) => subdag.leader_certificate().clone(), 763 _ => bail!("Received a block with an unexpected authority type."), 764 }; 765 let commit_round = leader_certificate.round(); 766 let certificate_round = 767 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?; 768 769 // Get the committee lookback for the round just after the leader. 770 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?; 771 // Retrieve all of the certificates for the round just after the leader. 772 let certificates = self.storage.get_certificates_for_round(certificate_round); 773 // Construct a set over the authors, at the round just after the leader, 774 // who included the leader's certificate in their previous certificate IDs. 775 let authors = certificates 776 .iter() 777 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) { 778 true => Some(c.author()), 779 false => None, 780 }) 781 .collect(); 782 783 // Check if the leader is ready to be committed. 784 if certificate_committee_lookback.is_availability_threshold_reached(&authors) { 785 trace!( 786 "Block {hash} at height {height} has reached availability threshold", 787 hash = block.hash(), 788 height = block.height() 789 ); 790 Ok(true) 791 } else { 792 Ok(false) 793 } 794 } 795 796 /// Advances the ledger by the given block and updates the storage accordingly. 797 /// 798 /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate 799 /// meets the voter availability threshold (i.e. > f voting stake) 800 /// or is reachable via a DAG path from a later leader certificate that does. 801 /// Since performing this check requires DAG certificates from later blocks, 802 /// the block is stored in `Sync::pending_blocks`, 803 /// and its addition to the ledger is deferred until the check passes. 804 /// Several blocks may be stored in `Sync::pending_blocks` 805 /// before they can be all checked and added to the ledger. 806 /// 807 /// # Usage 808 /// This function assumes that blocks are passed in order, i.e., 809 /// that the given block is a direct successor of the block that was last passed to this function. 810 async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> { 811 // Acquire the sync lock. 812 let _lock = self.sync_lock.lock().await; 813 let new_block_height = new_block.height(); 814 815 // If this block has already been processed, return early. 816 // TODO(kaimast): Should we remove the response here? 817 if self.ledger.contains_block_height(new_block.height()) { 818 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",); 819 return Ok(()); 820 } 821 822 // Acquire the pending blocks lock. 823 let mut pending_blocks = self.pending_blocks.lock().await; 824 825 // Append the certificates to the storage. 826 self.add_block_subdag_to_bft(&new_block).await?; 827 828 // Fetch the latest block height. 829 let ledger_block_height = self.ledger.latest_block_height(); 830 831 // First, clear any older pending blocks. 832 // TODO(kaimast): ensure there are no dangling block requests 833 while let Some(pending_block) = pending_blocks.front() { 834 if pending_block.height() > ledger_block_height { 835 break; 836 } 837 838 pending_blocks.pop_front(); 839 } 840 841 if let Some(tail) = pending_blocks.back() { 842 if tail.height() >= new_block.height() { 843 debug!( 844 "A unconfirmed block is queued already for height {height}. \ 845 Will not sync.", 846 height = new_block.height() 847 ); 848 return Ok(()); 849 } 850 851 ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block"); 852 } 853 854 // Fetch the latest block height. 855 let ledger_block_height = self.ledger.latest_block_height(); 856 857 // Clear any older pending blocks. 858 // TODO(kaimast): ensure there are no dangling block requests 859 while let Some(pending_block) = pending_blocks.front() { 860 if pending_block.height() > ledger_block_height { 861 break; 862 } 863 864 trace!( 865 "Pending block {hash} at height {height} became obsolete", 866 hash = pending_block.hash(), 867 height = pending_block.height() 868 ); 869 pending_blocks.pop_front(); 870 } 871 872 // Check the block against the chain of pending blocks and append it on success. 873 let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) { 874 Ok(new_block) => new_block, 875 Err(err) => { 876 // TODO(kaimast): this shoud not return an error on the alphavm side. 877 if err.to_string().contains("already in the ledger") { 878 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",); 879 880 return Ok(()); 881 } else { 882 return Err(err); 883 } 884 } 885 }; 886 887 trace!( 888 "Adding new pending block {hash} at height {height}", 889 hash = new_block.hash(), 890 height = new_block.height() 891 ); 892 pending_blocks.push_back(new_block); 893 894 // Now, figure out if and which pending block we can commit. 895 // To do this effectively and because commits are transitive, 896 // we iterate in reverse so that we can stop at the first successful check. 897 // 898 // Note, that if the storage already contains certificates for the round after new block, 899 // the availability threshold for the new block could also be reached. 900 let mut commit_height = None; 901 for block in pending_blocks.iter().rev() { 902 // This check assumes that the pending blocks are properly linked together, based on the fact that, 903 // to generate the sequence of `PendingBlocks`, each block needs to successfully be processed by `Ledger::check_block_subdag`. 904 // As a result, the safety of this piece of code relies on the correctness `Ledger::check_block_subdag`, 905 // which is tested in `alphavm/ledger/tests/pending_block.rs`. 906 if self 907 .is_block_availability_threshold_reached(block) 908 .with_context(|| "Availability threshold check failed")? 909 { 910 commit_height = Some(block.height()); 911 break; 912 } 913 } 914 915 if let Some(commit_height) = commit_height { 916 let start_height = ledger_block_height + 1; 917 ensure!(commit_height >= start_height, "Invalid commit height"); 918 let num_blocks = (commit_height - start_height + 1) as usize; 919 920 // Create a more detailed log message if we are committing more than one block at a time. 921 if num_blocks > 1 { 922 trace!( 923 "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.", 924 chain_length = pending_blocks.len(), 925 ); 926 } 927 928 for pending_block in pending_blocks.drain(0..num_blocks) { 929 let hash = pending_block.hash(); 930 let height = pending_block.height(); 931 let ledger = self.ledger.clone(); 932 let storage = self.storage.clone(); 933 934 spawn_blocking!({ 935 let block = ledger.check_block_content(pending_block).with_context(|| { 936 format!("Failed to check contents of pending block {hash} at height {height}") 937 })?; 938 939 trace!("Adding pending block {hash} at height {height} to the ledger"); 940 ledger.advance_to_next_block(&block)?; 941 // Sync the height with the block. 942 storage.sync_height_with_block(block.height()); 943 // Sync the round with the block. 944 storage.sync_round_with_block(block.round()); 945 946 Ok(()) 947 })? 948 } 949 } else { 950 trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len()); 951 } 952 953 Ok(()) 954 } 955 } 956 957 // Methods to assist with the block sync module. 958 impl<N: Network> Sync<N> { 959 /// Returns `true` if the node is synced and has connected peers. 960 pub fn is_synced(&self) -> bool { 961 // Ensure the validator is connected to other validators, 962 // not just clients. 963 if self.gateway.number_of_connected_peers() == 0 { 964 return false; 965 } 966 967 self.block_sync.is_block_synced() 968 } 969 970 /// Returns the number of blocks the node is behind the greatest peer height. 971 pub fn num_blocks_behind(&self) -> Option<u32> { 972 self.block_sync.num_blocks_behind() 973 } 974 975 /// Returns the current block locators of the node. 976 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> { 977 self.block_sync.get_block_locators() 978 } 979 } 980 981 // Methods to assist with fetching batch certificates from peers. 982 impl<N: Network> Sync<N> { 983 /// Sends a certificate request to the specified peer. 984 pub async fn send_certificate_request( 985 &self, 986 peer_ip: SocketAddr, 987 certificate_id: Field<N>, 988 ) -> Result<BatchCertificate<N>> { 989 // Initialize a oneshot channel. 990 let (callback_sender, callback_receiver) = oneshot::channel(); 991 // Determine how many sent requests are pending. 992 let num_sent_requests = self.pending.num_sent_requests(certificate_id); 993 // Determine if we've already sent a request to the peer. 994 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip); 995 // Determine the maximum number of redundant requests. 996 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?; 997 // Determine if we should send a certificate request to the peer. 998 // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time. 999 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request; 1000 1001 // Insert the certificate ID into the pending queue. 1002 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request))); 1003 1004 // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer. 1005 if should_send_request { 1006 // Send the certificate request to the peer. 1007 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() { 1008 bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)") 1009 } 1010 } else { 1011 debug!( 1012 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)", 1013 fmt_id(certificate_id) 1014 ); 1015 } 1016 // Wait for the certificate to be fetched. 1017 // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators. 1018 tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver) 1019 .await 1020 .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))? 1021 .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id))) 1022 } 1023 1024 /// Handles the incoming certificate request. 1025 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) { 1026 // Attempt to retrieve the certificate. 1027 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) { 1028 // Send the certificate response to the peer. 1029 let self_ = self.clone(); 1030 tokio::spawn(async move { 1031 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await; 1032 }); 1033 } 1034 } 1035 1036 /// Handles the incoming certificate response. 1037 /// This method ensures the certificate response is well-formed and matches the certificate ID. 1038 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) { 1039 let certificate = response.certificate; 1040 // Check if the peer IP exists in the pending queue for the given certificate ID. 1041 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip); 1042 // If the peer IP exists, finish the pending request. 1043 if exists { 1044 // TODO: Validate the certificate. 1045 // Remove the certificate ID from the pending queue. 1046 self.pending.remove(certificate.id(), Some(certificate)); 1047 } 1048 } 1049 } 1050 1051 impl<N: Network> Sync<N> { 1052 /// Spawns a task with the given future; it should only be used for long-running tasks. 1053 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 1054 self.handles.lock().push(tokio::spawn(future)); 1055 } 1056 1057 /// Shuts down the primary. 1058 pub async fn shut_down(&self) { 1059 info!("Shutting down the sync module..."); 1060 // Acquire the response lock. 1061 let _lock = self.response_lock.lock().await; 1062 // Acquire the sync lock. 1063 let _lock = self.sync_lock.lock().await; 1064 // Abort the tasks. 1065 self.handles.lock().iter().for_each(|handle| handle.abort()); 1066 } 1067 } 1068 1069 #[cfg(test)] 1070 mod tests { 1071 use super::*; 1072 1073 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService}; 1074 1075 use alphaos_account::Account; 1076 use alphaos_node_sync::BlockSync; 1077 use alphaos_utilities::SimpleStoppable; 1078 use alphavm::{ 1079 console::{ 1080 account::{Address, PrivateKey}, 1081 network::MainnetV0, 1082 }, 1083 ledger::{ 1084 narwhal::{BatchCertificate, BatchHeader, Subdag}, 1085 store::{helpers::memory::ConsensusMemory, ConsensusStore}, 1086 }, 1087 prelude::{Ledger, VM}, 1088 utilities::TestRng, 1089 }; 1090 1091 use alphastd::StorageMode; 1092 use indexmap::IndexSet; 1093 use rand::Rng; 1094 use std::collections::BTreeMap; 1095 1096 type CurrentNetwork = MainnetV0; 1097 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>; 1098 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>; 1099 1100 /// Tests that commits work as expected when some anchors are not committed immediately. 1101 #[tokio::test] 1102 #[tracing_test::traced_test] 1103 async fn test_commit_chain() -> anyhow::Result<()> { 1104 let rng = &mut TestRng::default(); 1105 // Initialize the round parameters. 1106 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64; 1107 1108 // The first round of the first block. 1109 let first_round = 1; 1110 // The total number of blocks we test 1111 let num_blocks = 3; 1112 // The number of certificate rounds needed. 1113 // There is one additional round to provide availability for the inal block. 1114 let num_rounds = first_round + num_blocks * 2 + 1; 1115 // The first round that has at least N-f certificates referencing the anchor from the previous round. 1116 // This is also the last round we use in the test. 1117 let first_committed_round = num_rounds - 1; 1118 1119 // Initialize the store. 1120 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap(); 1121 let account: Account<CurrentNetwork> = Account::new(rng)?; 1122 1123 // Create a genesis block with a seeded RNG to reproduce the same genesis private keys. 1124 let seed: u64 = rng.r#gen(); 1125 let vm = VM::from(store).unwrap(); 1126 let genesis_pk = *account.private_key(); 1127 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap(); 1128 1129 // Extract the private keys from the genesis committee by using the same RNG to sample private keys. 1130 let genesis_rng = &mut TestRng::from_seed(seed); 1131 let private_keys = [ 1132 *account.private_key(), 1133 PrivateKey::new(genesis_rng)?, 1134 PrivateKey::new(genesis_rng)?, 1135 PrivateKey::new(genesis_rng)?, 1136 ]; 1137 1138 // Initialize the ledger with the genesis block. 1139 let genesis_clone = genesis.clone(); 1140 let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap(); 1141 // Initialize the ledger. 1142 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new())); 1143 1144 // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors. 1145 let (round_to_certificates_map, committee) = { 1146 let addresses = [ 1147 Address::try_from(private_keys[0])?, 1148 Address::try_from(private_keys[1])?, 1149 Address::try_from(private_keys[2])?, 1150 Address::try_from(private_keys[3])?, 1151 ]; 1152 1153 let committee = ledger.latest_committee().unwrap(); 1154 1155 // Initialize a mapping from the round number to the set of batch certificates in the round. 1156 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = 1157 HashMap::new(); 1158 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4); 1159 1160 for round in first_round..=first_committed_round { 1161 let mut current_certificates = IndexSet::new(); 1162 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { 1163 IndexSet::new() 1164 } else { 1165 previous_certificates.iter().map(|c| c.id()).collect() 1166 }; 1167 1168 let committee_id = committee.id(); 1169 let prev_leader = committee.get_leader(round - 1).unwrap(); 1170 1171 // For the first two blocks non-leaders will not reference the leader certificate. 1172 // This means, while there is an anchor, it is isn't committed 1173 // until later. 1174 for (i, private_key) in private_keys.iter().enumerate() { 1175 let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap(); 1176 let is_certificate_round = round % 2 == 1; 1177 let is_leader = i == leader_index; 1178 1179 let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader { 1180 previous_certificate_ids 1181 .iter() 1182 .cloned() 1183 .enumerate() 1184 .filter(|(idx, _)| *idx != leader_index) 1185 .map(|(_, id)| id) 1186 .collect() 1187 } else { 1188 previous_certificate_ids.clone() 1189 }; 1190 1191 let batch_header = BatchHeader::new( 1192 private_key, 1193 round, 1194 now(), 1195 committee_id, 1196 Default::default(), 1197 previous_certs, 1198 rng, 1199 ) 1200 .unwrap(); 1201 1202 // Sign the batch header. 1203 let mut signatures = IndexSet::with_capacity(4); 1204 for (j, private_key_2) in private_keys.iter().enumerate() { 1205 if i != j { 1206 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); 1207 } 1208 } 1209 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap()); 1210 } 1211 1212 // Update the map of certificates. 1213 round_to_certificates_map.insert(round, current_certificates.clone()); 1214 previous_certificates = current_certificates; 1215 } 1216 (round_to_certificates_map, committee) 1217 }; 1218 1219 // Initialize the storage. 1220 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1221 // Insert all certificates into storage. 1222 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new(); 1223 for i in first_round..=first_committed_round { 1224 let c = (*round_to_certificates_map.get(&i).unwrap()).clone(); 1225 certificates.extend(c); 1226 } 1227 for certificate in certificates.clone().iter() { 1228 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1229 } 1230 1231 // Create the blocks 1232 let mut previous_leader_cert = None; 1233 let mut blocks = vec![]; 1234 1235 for block_height in 1..=num_blocks { 1236 let leader_round = block_height * 2; 1237 1238 let leader = committee.get_leader(leader_round).unwrap(); 1239 let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap(); 1240 1241 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1242 let mut leader_cert_map = IndexSet::new(); 1243 leader_cert_map.insert(leader_certificate.clone()); 1244 1245 let previous_cert_map = storage.get_certificates_for_round(leader_round - 1); 1246 1247 subdag_map.insert(leader_round, leader_cert_map.clone()); 1248 subdag_map.insert(leader_round - 1, previous_cert_map.clone()); 1249 1250 if leader_round > 2 { 1251 let previous_commit_cert_map: IndexSet<_> = storage 1252 .get_certificates_for_round(leader_round - 2) 1253 .into_iter() 1254 .filter(|cert| { 1255 if let Some(previous_leader_cert) = &previous_leader_cert { 1256 cert != previous_leader_cert 1257 } else { 1258 true 1259 } 1260 }) 1261 .collect(); 1262 subdag_map.insert(leader_round - 2, previous_commit_cert_map); 1263 } 1264 1265 let subdag = Subdag::from(subdag_map.clone())?; 1266 previous_leader_cert = Some(leader_certificate); 1267 1268 let core_ledger = core_ledger.clone(); 1269 let block = spawn_blocking!({ 1270 let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?; 1271 core_ledger.advance_to_next_block(&block)?; 1272 Ok(block) 1273 })?; 1274 1275 blocks.push(block); 1276 } 1277 1278 // ### Test that sync works as expected ### 1279 let storage_mode = StorageMode::new_test(None); 1280 1281 // Create a new ledger to test with, but use the existing storage 1282 // so that the certificates exist. 1283 let syncing_ledger = { 1284 let storage_mode = storage_mode.clone(); 1285 Arc::new(CoreLedgerService::new( 1286 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(), 1287 SimpleStoppable::new(), 1288 )) 1289 }; 1290 1291 // Set up sync and its dependencies. 1292 let gateway = Gateway::new( 1293 account.clone(), 1294 storage.clone(), 1295 syncing_ledger.clone(), 1296 None, 1297 &[], 1298 false, 1299 StorageMode::new_test(None), 1300 None, 1301 )?; 1302 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone())); 1303 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync); 1304 1305 let mut block_iter = blocks.into_iter(); 1306 1307 // Insert the blocks into the new sync module 1308 for _ in 0..num_blocks - 1 { 1309 let block = block_iter.next().unwrap(); 1310 sync.sync_storage_with_block(block).await?; 1311 1312 // Availability threshold is not met, so we should not advance yet. 1313 assert_eq!(syncing_ledger.latest_block_height(), 0); 1314 } 1315 1316 // Only for the final block, the availability threshold is met, 1317 // because certificates for the subsequent round are already in storage. 1318 sync.sync_storage_with_block(block_iter.next().unwrap()).await?; 1319 assert_eq!(syncing_ledger.latest_block_height(), 3); 1320 1321 // Ensure blocks 1 and 2 were added to the ledger. 1322 assert!(syncing_ledger.contains_block_height(1)); 1323 assert!(syncing_ledger.contains_block_height(2)); 1324 1325 Ok(()) 1326 } 1327 1328 #[tokio::test] 1329 #[tracing_test::traced_test] 1330 async fn test_pending_certificates() -> anyhow::Result<()> { 1331 let rng = &mut TestRng::default(); 1332 // Initialize the round parameters. 1333 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64; 1334 let commit_round = 2; 1335 1336 // Initialize the store. 1337 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap(); 1338 let account: Account<CurrentNetwork> = Account::new(rng)?; 1339 1340 // Create a genesis block with a seeded RNG to reproduce the same genesis private keys. 1341 let seed: u64 = rng.r#gen(); 1342 let vm = VM::from(store).unwrap(); 1343 let genesis_pk = *account.private_key(); 1344 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap(); 1345 1346 // Extract the private keys from the genesis committee by using the same RNG to sample private keys. 1347 let genesis_rng = &mut TestRng::from_seed(seed); 1348 let private_keys = [ 1349 *account.private_key(), 1350 PrivateKey::new(genesis_rng)?, 1351 PrivateKey::new(genesis_rng)?, 1352 PrivateKey::new(genesis_rng)?, 1353 ]; 1354 // Initialize the ledger with the genesis block. 1355 let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap(); 1356 // Initialize the ledger. 1357 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new())); 1358 // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors. 1359 let (round_to_certificates_map, committee) = { 1360 // Initialize the committee. 1361 let committee = core_ledger.current_committee().unwrap(); 1362 // Initialize a mapping from the round number to the set of batch certificates in the round. 1363 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = 1364 HashMap::new(); 1365 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4); 1366 1367 for round in 0..=commit_round + 8 { 1368 let mut current_certificates = IndexSet::new(); 1369 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { 1370 IndexSet::new() 1371 } else { 1372 previous_certificates.iter().map(|c| c.id()).collect() 1373 }; 1374 let committee_id = committee.id(); 1375 // Create a certificate for each validator. 1376 for (i, private_key_1) in private_keys.iter().enumerate() { 1377 let batch_header = BatchHeader::new( 1378 private_key_1, 1379 round, 1380 now(), 1381 committee_id, 1382 Default::default(), 1383 previous_certificate_ids.clone(), 1384 rng, 1385 ) 1386 .unwrap(); 1387 // Sign the batch header. 1388 let mut signatures = IndexSet::with_capacity(4); 1389 for (j, private_key_2) in private_keys.iter().enumerate() { 1390 if i != j { 1391 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); 1392 } 1393 } 1394 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap()); 1395 } 1396 1397 // Update the map of certificates. 1398 round_to_certificates_map.insert(round, current_certificates.clone()); 1399 previous_certificates = current_certificates.clone(); 1400 } 1401 (round_to_certificates_map, committee) 1402 }; 1403 1404 // Initialize the storage. 1405 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1406 // Insert certificates into storage. 1407 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new(); 1408 for i in 1..=commit_round + 8 { 1409 let c = (*round_to_certificates_map.get(&i).unwrap()).clone(); 1410 certificates.extend(c); 1411 } 1412 for certificate in certificates.clone().iter() { 1413 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1414 } 1415 // Create block 1. 1416 let leader_round_1 = commit_round; 1417 let leader_1 = committee.get_leader(leader_round_1).unwrap(); 1418 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap(); 1419 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1420 let block_1 = { 1421 let mut leader_cert_map = IndexSet::new(); 1422 leader_cert_map.insert(leader_certificate.clone()); 1423 let mut previous_cert_map = IndexSet::new(); 1424 for cert in storage.get_certificates_for_round(commit_round - 1) { 1425 previous_cert_map.insert(cert); 1426 } 1427 subdag_map.insert(commit_round, leader_cert_map.clone()); 1428 subdag_map.insert(commit_round - 1, previous_cert_map.clone()); 1429 let subdag = Subdag::from(subdag_map.clone())?; 1430 let ledger = core_ledger.clone(); 1431 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))? 1432 }; 1433 // Insert block 1. 1434 let ledger = core_ledger.clone(); 1435 let block = block_1.clone(); 1436 spawn_blocking!(ledger.advance_to_next_block(&block))?; 1437 1438 // Create block 2. 1439 let leader_round_2 = commit_round + 2; 1440 let leader_2 = committee.get_leader(leader_round_2).unwrap(); 1441 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap(); 1442 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1443 let block_2 = { 1444 let mut leader_cert_map_2 = IndexSet::new(); 1445 leader_cert_map_2.insert(leader_certificate_2.clone()); 1446 let mut previous_cert_map_2 = IndexSet::new(); 1447 for cert in storage.get_certificates_for_round(leader_round_2 - 1) { 1448 previous_cert_map_2.insert(cert); 1449 } 1450 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone()); 1451 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone()); 1452 let subdag_2 = Subdag::from(subdag_map_2.clone())?; 1453 let ledger = core_ledger.clone(); 1454 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))? 1455 }; 1456 // Insert block 2. 1457 let ledger = core_ledger.clone(); 1458 let block = block_2.clone(); 1459 spawn_blocking!(ledger.advance_to_next_block(&block))?; 1460 1461 // Create block 3 1462 let leader_round_3 = commit_round + 4; 1463 let leader_3 = committee.get_leader(leader_round_3).unwrap(); 1464 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap(); 1465 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1466 let block_3 = { 1467 let mut leader_cert_map_3 = IndexSet::new(); 1468 leader_cert_map_3.insert(leader_certificate_3.clone()); 1469 let mut previous_cert_map_3 = IndexSet::new(); 1470 for cert in storage.get_certificates_for_round(leader_round_3 - 1) { 1471 previous_cert_map_3.insert(cert); 1472 } 1473 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone()); 1474 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone()); 1475 let subdag_3 = Subdag::from(subdag_map_3.clone())?; 1476 let ledger = core_ledger.clone(); 1477 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))? 1478 }; 1479 // Insert block 3. 1480 let ledger = core_ledger.clone(); 1481 let block = block_3.clone(); 1482 spawn_blocking!(ledger.advance_to_next_block(&block))?; 1483 1484 /* 1485 Check that the pending certificates are computed correctly. 1486 */ 1487 1488 // Retrieve the pending certificates. 1489 let pending_certificates = storage.get_pending_certificates(); 1490 // Check that all of the pending certificates are not contained in the ledger. 1491 for certificate in pending_certificates.clone() { 1492 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false)); 1493 } 1494 // Initialize an empty set to be populated with the committed certificates in the block subdags. 1495 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new(); 1496 { 1497 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3]; 1498 for subdag in subdag_maps.iter() { 1499 for subdag_certificates in subdag.values() { 1500 committed_certificates.extend(subdag_certificates.iter().cloned()); 1501 } 1502 } 1503 }; 1504 // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates. 1505 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new(); 1506 for certificate in certificates.clone() { 1507 if !committed_certificates.contains(&certificate) { 1508 candidate_pending_certificates.insert(certificate); 1509 } 1510 } 1511 // Check that the set of pending certificates is equal to the set of candidate pending certificates. 1512 assert_eq!(pending_certificates, candidate_pending_certificates); 1513 1514 Ok(()) 1515 } 1516 }