mod.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{ 17 Gateway, 18 MAX_FETCH_TIMEOUT_IN_MS, 19 Transport, 20 events::DataBlocks, 21 helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests}, 22 spawn_blocking, 23 }; 24 use alphaos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; 25 use alphaos_node_bft_ledger_service::LedgerService; 26 use alphaos_node_network::PeerPoolHandling; 27 use alphaos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators}; 28 29 use alphavm::{ 30 console::{ 31 network::{ConsensusVersion, Network}, 32 types::Field, 33 }, 34 ledger::{PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate}, 35 utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error}, 36 }; 37 38 use anyhow::{Context, Result, anyhow, bail, ensure}; 39 use indexmap::IndexMap; 40 #[cfg(feature = "locktick")] 41 use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex}; 42 #[cfg(not(feature = "locktick"))] 43 use parking_lot::Mutex; 44 #[cfg(not(feature = "serial"))] 45 use rayon::prelude::*; 46 use std::{ 47 collections::{HashMap, VecDeque}, 48 future::Future, 49 net::SocketAddr, 50 sync::Arc, 51 time::Duration, 52 }; 53 #[cfg(not(feature = "locktick"))] 54 use tokio::sync::Mutex as TMutex; 55 use tokio::{ 56 sync::{OnceCell, oneshot}, 57 task::JoinHandle, 58 }; 59 60 /// Block synchronization logic for validators. 61 /// 62 /// Synchronization works differently for nodes that act as validators in AleoBFT; 63 /// In the common case, validators generate blocks after receiving an anchor block that has been accepted 64 /// by a supermajority of the committee instead of fetching entire blocks from other nodes. 65 /// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes. 66 /// 67 /// This struct also manages fetching certificates from other validators during normal operation, 68 /// and blocks when falling behind. 69 /// 70 /// Finally, `Sync` handles synchronization of blocks with the validator's local storage: 71 /// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them. 72 #[derive(Clone)] 73 pub struct Sync<N: Network> { 74 /// The gateway enables communication with other validators. 75 gateway: Gateway<N>, 76 /// The storage. 77 storage: Storage<N>, 78 /// The ledger service. 79 ledger: Arc<dyn LedgerService<N>>, 80 /// The block synchronization logic. 81 block_sync: Arc<BlockSync<N>>, 82 /// The pending certificates queue. 83 pending: Arc<Pending<Field<N>, BatchCertificate<N>>>, 84 /// The BFT sender. 85 bft_sender: Arc<OnceCell<BFTSender<N>>>, 86 /// Handles to the spawned background tasks. 87 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 88 /// The response lock. 89 response_lock: Arc<TMutex<()>>, 90 /// The sync lock. Ensures that only one task syncs the ledger at a time. 91 sync_lock: Arc<TMutex<()>>, 92 /// The latest block responses. 93 /// 94 /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks 95 /// whose addition to the ledger is deferred until certain checks pass. 96 /// Blocks need to be processed in order, hence a BTree map. 97 /// 98 /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called. 99 pending_blocks: Arc<TMutex<VecDeque<PendingBlock<N>>>>, 100 } 101 102 impl<N: Network> Sync<N> { 103 /// The maximum time to wait for peer updates before timing out and attempting to issue new requests. 104 /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates. 105 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30); 106 107 /// Initializes a new sync instance. 108 pub fn new( 109 gateway: Gateway<N>, 110 storage: Storage<N>, 111 ledger: Arc<dyn LedgerService<N>>, 112 block_sync: Arc<BlockSync<N>>, 113 ) -> Self { 114 // Return the sync instance. 115 Self { 116 gateway, 117 storage, 118 ledger, 119 block_sync, 120 pending: Default::default(), 121 bft_sender: Default::default(), 122 handles: Default::default(), 123 response_lock: Default::default(), 124 sync_lock: Default::default(), 125 pending_blocks: Default::default(), 126 } 127 } 128 129 /// Initializes the sync module and sync the storage with the ledger at bootup. 130 pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> { 131 // If a BFT sender was provided, set it. 132 if let Some(bft_sender) = bft_sender { 133 self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway"); 134 } 135 136 info!("Syncing storage with the ledger..."); 137 138 // Sync the storage with the ledger. 139 self.sync_storage_with_ledger_at_bootup() 140 .await 141 .with_context(|| "Syncing storage with the ledger at bootup failed")?; 142 143 debug!("Finished initial block synchronization at startup"); 144 Ok(()) 145 } 146 147 /// Sends the given batch of block requests to peers. 148 /// 149 /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`. 150 #[inline] 151 async fn send_block_requests( 152 &self, 153 154 block_requests: Vec<(u32, PrepareSyncRequest<N>)>, 155 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>, 156 ) { 157 trace!("Prepared {num_requests} block requests", num_requests = block_requests.len()); 158 159 // Sends the block requests to the sync peers. 160 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) { 161 if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await { 162 // Stop if we fail to process a batch of requests. 163 break; 164 } 165 166 // Sleep to avoid triggering spam detection. 167 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; 168 } 169 } 170 171 /// Starts the sync module. 172 /// 173 /// When this function returns successfully, the sync module will have spawned background tasks 174 /// that fetch blocks from other validators. 175 pub async fn run(&self, ping: Option<Arc<Ping<N>>>, sync_receiver: SyncReceiver<N>) -> Result<()> { 176 info!("Starting the sync module..."); 177 178 // Start the block request generation loop (outgoing). 179 let self_ = self.clone(); 180 self.spawn(async move { 181 loop { 182 // Wait for peer updates or timeout 183 let _ = tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_peer_update()).await; 184 185 // Issue block requests to peers. 186 self_.try_issuing_block_requests().await; 187 188 // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here. 189 } 190 }); 191 192 // Start the block response processing loop (incoming). 193 let self_ = self.clone(); 194 let ping = ping.clone(); 195 self.spawn(async move { 196 loop { 197 // Wait until there is something to do or until the timeout. 198 let _ = 199 tokio::time::timeout(Self::MAX_SYNC_INTERVAL, self_.block_sync.wait_for_block_responses()).await; 200 201 let ping = ping.clone(); 202 let self_ = self_.clone(); 203 let hdl = tokio::spawn(async move { 204 self_.try_advancing_block_synchronization(&ping).await; 205 }); 206 207 if let Err(err) = hdl.await 208 && let Ok(panic) = err.try_into_panic() 209 { 210 error!("Sync block advancement panicked: {panic:?}"); 211 } 212 213 // We perform no additional rate limiting here as 214 // requests are already rate-limited. 215 } 216 }); 217 218 // Start the pending queue expiration loop. 219 let self_ = self.clone(); 220 self.spawn(async move { 221 loop { 222 // Sleep briefly. 223 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; 224 225 // Remove the expired pending transmission requests. 226 let self__ = self_.clone(); 227 let _ = spawn_blocking!({ 228 self__.pending.clear_expired_callbacks(); 229 Ok(()) 230 }); 231 } 232 }); 233 234 /* Set up callbacks for events from the Gateway */ 235 236 // Retrieve the sync receiver. 237 let SyncReceiver { 238 mut rx_block_sync_insert_block_response, 239 mut rx_block_sync_remove_peer, 240 mut rx_block_sync_update_peer_locators, 241 mut rx_certificate_request, 242 mut rx_certificate_response, 243 } = sync_receiver; 244 245 // Process the block sync request to advance with sync blocks. 246 // Each iteration of this loop is triggered by an incoming [`BlockResponse`], 247 // which is initially handled by [`Gateway::inbound()`], 248 // which calls [`SyncSender::advance_with_sync_blocks()`], 249 // which calls [`tx_block_sync_advance_with_sync_blocks.send()`], 250 // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return. 251 let self_ = self.clone(); 252 self.spawn(async move { 253 while let Some((peer_ip, blocks, latest_consensus_version, callback)) = 254 rx_block_sync_insert_block_response.recv().await 255 { 256 let result = self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await; 257 //TODO remove this once channels are gone 258 if let Err(err) = &result { 259 warn!("Failed to insret block response: {err:?}"); 260 } 261 262 callback.send(result).ok(); 263 } 264 }); 265 266 // Process the block sync request to remove the peer. 267 let self_ = self.clone(); 268 self.spawn(async move { 269 while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await { 270 self_.remove_peer(peer_ip); 271 } 272 }); 273 274 // Process each block sync request to update peer locators. 275 // Each iteration of this loop is triggered by an incoming [`PrimaryPing`], 276 // which is initially handled by [`Gateway::inbound()`], 277 // which calls [`SyncSender::update_peer_locators()`], 278 // which calls [`tx_block_sync_update_peer_locators.send()`], 279 // which causes the `rx_block_sync_update_peer_locators.recv()` call below to return. 280 let self_ = self.clone(); 281 self.spawn(async move { 282 while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await { 283 let self_clone = self_.clone(); 284 tokio::spawn(async move { 285 callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok(); 286 }); 287 } 288 }); 289 290 // Process each certificate request. 291 // Each iteration of this loop is triggered by an incoming [`CertificateRequest`], 292 // which is initially handled by [`Gateway::inbound()`], 293 // which calls [`tx_certificate_request.send()`], 294 // which causes the `rx_certificate_request.recv()` call below to return. 295 let self_ = self.clone(); 296 self.spawn(async move { 297 while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await { 298 self_.send_certificate_response(peer_ip, certificate_request); 299 } 300 }); 301 302 // Process each certificate response. 303 // Each iteration of this loop is triggered by an incoming [`CertificateResponse`], 304 // which is initially handled by [`Gateway::inbound()`], 305 // which calls [`tx_certificate_response.send()`], 306 // which causes the `rx_certificate_response.recv()` call below to return. 307 let self_ = self.clone(); 308 self.spawn(async move { 309 while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await { 310 self_.finish_certificate_request(peer_ip, certificate_response); 311 } 312 }); 313 314 Ok(()) 315 } 316 317 /// BFT-specific version of `Client::try_issuing_block_requests()`. 318 /// 319 /// This method handles timeout removal, checks if block sync is possible, 320 /// and issues block requests to peers. 321 async fn try_issuing_block_requests(&self) { 322 // Update the sync height to the latest ledger height. 323 // (if the ledger height is lower or equal to the current sync height, this is a noop) 324 self.block_sync.set_sync_height(self.ledger.latest_block_height()); 325 326 // Check if any existing requests can be removed. 327 // We should do this even if we cannot block sync, to ensure 328 // there are no dangling block requests. 329 match self.block_sync.handle_block_request_timeouts(&self.gateway) { 330 Ok(Some((requests, sync_peers))) => { 331 // Re-request blocks instead of performing regular block sync. 332 self.send_block_requests(requests, sync_peers).await; 333 return; 334 } 335 Ok(None) => {} 336 Err(err) => { 337 // Abort and retry later. 338 error!("{}", &flatten_error(err)); 339 return; 340 } 341 } 342 343 // Do not attempt to sync if there are no blocks to sync. 344 // This prevents redundant log messages and performing unnecessary computation. 345 if !self.block_sync.can_block_sync() { 346 return; 347 } 348 349 // Prepare the block requests, if any. 350 // In the process, we update the state of `is_block_synced` for the sync module. 351 let (requests, sync_peers) = self.block_sync.prepare_block_requests(); 352 353 // If there are no block requests, return early. 354 if requests.is_empty() { 355 return; 356 } 357 358 // Send the block requests to peers. 359 self.send_block_requests(requests, sync_peers).await; 360 } 361 362 /// Test-only method to manually trigger block synchronization. 363 /// This combines both request generation and response processing for testing purposes. 364 #[cfg(test)] 365 pub(crate) async fn testing_only_try_block_sync_testing_only(&self) { 366 // First try issuing block requests 367 self.try_issuing_block_requests().await; 368 369 // Then try advancing with any available responses 370 self.try_advancing_block_synchronization(&None).await; 371 } 372 } 373 374 // Callbacks used when receiving messages from the Gateway 375 impl<N: Network> Sync<N> { 376 /// We received a block response and can (possibly) advance synchronization. 377 async fn insert_block_response( 378 &self, 379 peer_ip: SocketAddr, 380 blocks: Vec<Block<N>>, 381 latest_consensus_version: Option<ConsensusVersion>, 382 ) -> Result<()> { 383 // Verify that the response is valid and add it to block sync. 384 self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) 385 386 // No need to advance block sync here, as the new response will 387 // notify the incoming task. 388 } 389 390 /// We received new peer locators during a Ping. 391 fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators<N>) -> Result<()> { 392 self.block_sync.update_peer_locators(peer_ip, &locators) 393 } 394 395 /// A peer disconnected. 396 fn remove_peer(&self, peer_ip: SocketAddr) { 397 self.block_sync.remove_peer(&peer_ip); 398 } 399 400 #[cfg(test)] 401 pub fn testing_only_update_peer_locators_testing_only( 402 &self, 403 peer_ip: SocketAddr, 404 locators: BlockLocators<N>, 405 ) -> Result<()> { 406 self.update_peer_locators(peer_ip, locators) 407 } 408 } 409 410 // Methods to manage storage. 411 impl<N: Network> Sync<N> { 412 /// Syncs the storage with the ledger at bootup. 413 /// 414 /// This is called when starting the validator and after finishing a sync without BFT. 415 async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> { 416 // Retrieve the latest block in the ledger. 417 let latest_block = self.ledger.latest_block(); 418 419 // Retrieve the block height. 420 let block_height = latest_block.height(); 421 // Determine the maximum number of blocks corresponding to rounds 422 // that would not have been garbage collected, i.e. that would be kept in storage. 423 // Since at most one block is created every two rounds, 424 // this is half of the maximum number of rounds kept in storage. 425 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2); 426 // Determine the earliest height of blocks corresponding to rounds kept in storage, 427 // conservatively set to the block height minus the maximum number of blocks calculated above. 428 // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded. 429 let gc_height = block_height.saturating_sub(max_gc_blocks); 430 // Retrieve the blocks. 431 let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?; 432 433 // Acquire the sync lock. 434 let _lock = self.sync_lock.lock().await; 435 436 debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1)); 437 438 /* Sync storage */ 439 440 // Sync the height with the block. 441 self.storage.sync_height_with_block(latest_block.height()); 442 // Sync the round with the block. 443 self.storage.sync_round_with_block(latest_block.round()); 444 // Perform GC on the latest block round. 445 self.storage.garbage_collect_certificates(latest_block.round()); 446 // Iterate over the blocks. 447 for block in &blocks { 448 // If the block authority is a sub-DAG, then sync the batch certificates with the block. 449 // Note that the block authority is always a sub-DAG in production; 450 // beacon signatures are only used for testing, 451 // and as placeholder (irrelevant) block authority in the genesis block. 452 if let Authority::Quorum(subdag) = block.authority() { 453 // Reconstruct the unconfirmed transactions. 454 let unconfirmed_transactions = cfg_iter!(block.transactions()) 455 .filter_map(|tx| { 456 tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok() 457 }) 458 .collect::<HashMap<_, _>>(); 459 460 // Iterate over the certificates. 461 for certificates in subdag.values().cloned() { 462 cfg_into_iter!(certificates).for_each(|certificate| { 463 self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions); 464 }); 465 } 466 467 // Update the validator telemetry. 468 #[cfg(feature = "telemetry")] 469 self.gateway.validator_telemetry().insert_subdag(subdag); 470 } 471 } 472 473 /* Sync the BFT DAG */ 474 475 // Construct a list of the certificates. 476 let certificates = blocks 477 .iter() 478 .flat_map(|block| { 479 match block.authority() { 480 // If the block authority is a beacon, then skip the block. 481 Authority::Beacon(_) => None, 482 // If the block authority is a subdag, then retrieve the certificates. 483 Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()), 484 } 485 }) 486 .flatten() 487 .collect::<Vec<_>>(); 488 489 // If a BFT sender was provided, send the certificates to the BFT. 490 if let Some(bft_sender) = self.bft_sender.get() { 491 // Await the callback to continue. 492 bft_sender 493 .tx_sync_bft_dag_at_bootup 494 .send(certificates) 495 .await 496 .with_context(|| "Failed to update the BFT DAG from sync")?; 497 } 498 499 self.block_sync.set_sync_height(block_height); 500 501 Ok(()) 502 } 503 504 /// Returns which height we are synchronized to. 505 /// If there are queued block responses, this might be higher than the latest block in the ledger. 506 async fn compute_sync_height(&self) -> u32 { 507 let ledger_height = self.ledger.latest_block_height(); 508 let mut pending_blocks = self.pending_blocks.lock().await; 509 510 // Remove any old responses. 511 while let Some(b) = pending_blocks.front() 512 && b.height() <= ledger_height 513 { 514 pending_blocks.pop_front(); 515 } 516 517 // Ensure the returned value is always greater or equal than ledger height. 518 pending_blocks.back().map(|b| b.height()).unwrap_or(0).max(ledger_height) 519 } 520 521 /// BFT-version of [`alphaos_node_client::Client::try_advancing_block_synchronization`]. 522 async fn try_advancing_block_synchronization(&self, ping: &Option<Arc<Ping<N>>>) { 523 // Process block responses and advance the ledger. 524 let new_blocks = match self 525 .try_advancing_block_synchronization_inner() 526 .await 527 .with_context(|| "Block synchronization failed") 528 { 529 Ok(new_blocks) => new_blocks, 530 Err(err) => { 531 error!("{}", &flatten_error(err)); 532 false 533 } 534 }; 535 536 if let Some(ping) = &ping 537 && new_blocks 538 { 539 match self.get_block_locators() { 540 Ok(locators) => ping.update_block_locators(locators), 541 Err(err) => error!("Failed to update block locators: {err}"), 542 } 543 } 544 } 545 546 /// Aims to advance synchronization using any recent block responses received from peers. 547 /// 548 /// This is the validator's version of `BlockSync::try_advancing_block_synchronization` 549 /// and is called periodically at runtime. 550 /// 551 /// This returns Ok(true) if we successfully advanced the ledger by at least one new block. 552 /// 553 /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network. 554 /// If blocks are not confirmed yet, they will be kept in [`Self::pending_blocks`]. 555 /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected 556 /// (see [`Self::sync_storage_with_block`] for more details). 557 /// 558 /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead, 559 /// which syncs without updating the BFT state. 560 async fn try_advancing_block_synchronization_inner(&self) -> Result<bool> { 561 // Acquire the response lock. 562 let _lock = self.response_lock.lock().await; 563 564 // For sanity, set the sync height again. 565 // (if the sync height is already larger or equal, this is a noop) 566 let ledger_height = self.ledger.latest_block_height(); 567 self.block_sync.set_sync_height(ledger_height); 568 569 // Retrieve the maximum block height of the peers. 570 let tip = self 571 .block_sync 572 .find_sync_peers() 573 .map(|(sync_peers, _)| *sync_peers.values().max().unwrap_or(&0)) 574 .unwrap_or(0); 575 576 // Determine the maximum number of blocks corresponding to rounds 577 // that would not have been garbage collected, i.e. that would be kept in storage. 578 // Since at most one block is created every two rounds, 579 // this is half of the maximum number of rounds kept in storage. 580 let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2); 581 582 // Updates sync state and returns the error (if any). 583 let cleanup = |start_height, current_height, error| { 584 let new_blocks = current_height > start_height; 585 586 // Make the underlying `BlockSync` instance aware of the new sync height. 587 if new_blocks { 588 self.block_sync.set_sync_height(current_height); 589 } 590 591 if let Some(err) = error { Err(err) } else { Ok(new_blocks) } 592 }; 593 594 // Determine the earliest height of blocks corresponding to rounds kept in storage, 595 // conservatively set to the block height minus the maximum number of blocks calculated above. 596 // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded. 597 let max_gc_height = tip.saturating_sub(max_gc_blocks); 598 let within_gc = (ledger_height + 1) > max_gc_height; 599 600 if within_gc { 601 // Retrieve the current height, based on the ledger height and the 602 // (unconfirmed) blocks that are already queued up. 603 let start_height = self.compute_sync_height().await; 604 605 // The height is incremented as blocks are added. 606 let mut current_height = start_height; 607 trace!( 608 "Try advancing blocks responses with BFT (starting at block {current_height}, current sync speed is {})", 609 self.block_sync.get_sync_speed() 610 ); 611 612 // If we already were within GC or successfully caught up with GC, try to advance BFT normally again. 613 loop { 614 let next_height = current_height + 1; 615 let Some(block) = self.block_sync.peek_next_block(next_height) else { 616 break; 617 }; 618 info!("Syncing the BFT to block {}...", block.height()); 619 // Sync the storage with the block. 620 match self.sync_storage_with_block(block).await { 621 Ok(_) => { 622 // Update the current height if sync succeeds. 623 current_height = next_height; 624 } 625 Err(err) => { 626 // Mark the current height as processed in block_sync. 627 self.block_sync.remove_block_response(next_height); 628 return cleanup(start_height, current_height, Some(err)); 629 } 630 } 631 } 632 633 cleanup(start_height, current_height, None) 634 } else { 635 // For non-BFT sync we need to start at the current height of the ledger,as blocks are immediately 636 // added to it and not queue up in `latest_block_responses`. 637 let start_height = ledger_height; 638 let mut current_height = start_height; 639 640 trace!("Try advancing block responses without BFT (starting at block {current_height})"); 641 642 // Try to advance the ledger *to tip* without updating the BFT. 643 // TODO(kaimast): why to tip and not to tip-GC? 644 loop { 645 let next_height = current_height + 1; 646 647 let Some(block) = self.block_sync.peek_next_block(next_height) else { 648 break; 649 }; 650 info!("Syncing the ledger to block {}...", block.height()); 651 652 // Sync the ledger with the block without BFT. 653 match self.sync_ledger_with_block_without_bft(block).await { 654 Ok(_) => { 655 // Update the current height if sync succeeds. 656 current_height = next_height; 657 self.block_sync.count_request_completed(); 658 } 659 Err(err) => { 660 // Mark the current height as processed in block_sync. 661 self.block_sync.remove_block_response(next_height); 662 return cleanup(start_height, current_height, Some(err)); 663 } 664 } 665 } 666 667 // Sync the storage with the ledger if we should transition to the BFT sync. 668 let within_gc = (current_height + 1) > max_gc_height; 669 if within_gc { 670 info!("Finished catching up with the network. Switching back to BFT sync."); 671 self.sync_storage_with_ledger_at_bootup() 672 .await 673 .with_context(|| "BFT sync (with bootup routine) failed")?; 674 } 675 676 cleanup(start_height, current_height, None) 677 } 678 } 679 680 /// Syncs the ledger with the given block without updating the BFT. 681 /// 682 /// This is only used by `[Self::try_advancing_block_synchronization`]. 683 async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> { 684 // Acquire the sync lock. 685 let _lock = self.sync_lock.lock().await; 686 687 let self_ = self.clone(); 688 spawn_blocking!({ 689 // Check the next block. 690 self_.ledger.check_next_block(&block)?; 691 // Attempt to advance to the next block. 692 self_.ledger.advance_to_next_block(&block)?; 693 694 // Sync the height with the block. 695 self_.storage.sync_height_with_block(block.height()); 696 // Sync the round with the block. 697 self_.storage.sync_round_with_block(block.round()); 698 // Mark the block height as processed in block_sync. 699 self_.block_sync.remove_block_response(block.height()); 700 701 Ok(()) 702 }) 703 } 704 705 /// Helper function for [`Self::sync_storage_with_block`]. 706 /// It syncs the batch certificates with the BFT, if the block's authority is a sub-DAG. 707 /// 708 /// Note that the block authority is always a sub-DAG in production; beacon signatures are only used for testing, 709 /// and as placeholder (irrelevant) block authority in the genesis block. 710 async fn add_block_subdag_to_bft(&self, block: &Block<N>) -> Result<()> { 711 // Nothing to do if this is a beacon block 712 let Authority::Quorum(subdag) = block.authority() else { 713 return Ok(()); 714 }; 715 716 // Reconstruct the unconfirmed transactions. 717 let unconfirmed_transactions = cfg_iter!(block.transactions()) 718 .filter_map(|tx| tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()) 719 .collect::<HashMap<_, _>>(); 720 721 // Iterate over the certificates. 722 for certificates in subdag.values().cloned() { 723 cfg_into_iter!(certificates.clone()).for_each(|certificate| { 724 // Sync the batch certificate with the block. 725 self.storage.sync_certificate_with_block(block, certificate.clone(), &unconfirmed_transactions); 726 }); 727 728 // Sync the BFT DAG with the certificates. 729 for certificate in certificates { 730 // If a BFT sender was provided, send the certificate to the BFT. 731 // For validators, BFT spawns a receiver task in `BFT::start_handlers`. 732 if let Some(bft_sender) = self.bft_sender.get() { 733 let (callback_tx, callback_rx) = oneshot::channel(); 734 bft_sender 735 .tx_sync_bft 736 .send((certificate, callback_tx)) 737 .await 738 .with_context(|| "Failed to sync certificate")?; 739 callback_rx.await?.with_context(|| "Failed to sync certificate")?; 740 } 741 } 742 } 743 Ok(()) 744 } 745 746 /// Helper function for [`Self::sync_storage_with_block`]. 747 /// 748 /// It checks that successor of a given block contains enough votes to commit it. 749 /// This can only return `Ok(true)` if the certificates of the block's successor were added to the storage. 750 fn is_block_availability_threshold_reached(&self, block: &PendingBlock<N>) -> Result<bool> { 751 // Fetch the leader certificate and the relevant rounds. 752 let leader_certificate = match block.authority() { 753 Authority::Quorum(subdag) => subdag.leader_certificate().clone(), 754 _ => bail!("Received a block with an unexpected authority type."), 755 }; 756 let commit_round = leader_certificate.round(); 757 let certificate_round = 758 commit_round.checked_add(1).ok_or_else(|| anyhow!("Integer overflow on round number"))?; 759 760 // Get the committee lookback for the round just after the leader. 761 let certificate_committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?; 762 // Retrieve all of the certificates for the round just after the leader. 763 let certificates = self.storage.get_certificates_for_round(certificate_round); 764 // Construct a set over the authors, at the round just after the leader, 765 // who included the leader's certificate in their previous certificate IDs. 766 let authors = certificates 767 .iter() 768 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) { 769 true => Some(c.author()), 770 false => None, 771 }) 772 .collect(); 773 774 // Check if the leader is ready to be committed. 775 if certificate_committee_lookback.is_availability_threshold_reached(&authors) { 776 trace!( 777 "Block {hash} at height {height} has reached availability threshold", 778 hash = block.hash(), 779 height = block.height() 780 ); 781 Ok(true) 782 } else { 783 Ok(false) 784 } 785 } 786 787 /// Advances the ledger by the given block and updates the storage accordingly. 788 /// 789 /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate 790 /// meets the voter availability threshold (i.e. > f voting stake) 791 /// or is reachable via a DAG path from a later leader certificate that does. 792 /// Since performing this check requires DAG certificates from later blocks, 793 /// the block is stored in `Sync::pending_blocks`, 794 /// and its addition to the ledger is deferred until the check passes. 795 /// Several blocks may be stored in `Sync::pending_blocks` 796 /// before they can be all checked and added to the ledger. 797 /// 798 /// # Usage 799 /// This function assumes that blocks are passed in order, i.e., 800 /// that the given block is a direct successor of the block that was last passed to this function. 801 async fn sync_storage_with_block(&self, new_block: Block<N>) -> Result<()> { 802 // Acquire the sync lock. 803 let _lock = self.sync_lock.lock().await; 804 let new_block_height = new_block.height(); 805 806 // If this block has already been processed, return early. 807 // TODO(kaimast): Should we remove the response here? 808 if self.ledger.contains_block_height(new_block.height()) { 809 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",); 810 return Ok(()); 811 } 812 813 // Acquire the pending blocks lock. 814 let mut pending_blocks = self.pending_blocks.lock().await; 815 816 // Append the certificates to the storage. 817 self.add_block_subdag_to_bft(&new_block).await?; 818 819 // Fetch the latest block height. 820 let ledger_block_height = self.ledger.latest_block_height(); 821 822 // First, clear any older pending blocks. 823 // TODO(kaimast): ensure there are no dangling block requests 824 while let Some(pending_block) = pending_blocks.front() { 825 if pending_block.height() > ledger_block_height { 826 break; 827 } 828 829 pending_blocks.pop_front(); 830 } 831 832 if let Some(tail) = pending_blocks.back() { 833 if tail.height() >= new_block.height() { 834 debug!( 835 "A unconfirmed block is queued already for height {height}. \ 836 Will not sync.", 837 height = new_block.height() 838 ); 839 return Ok(()); 840 } 841 842 ensure_equals!(tail.height() + 1, new_block.height(), "Got an out-of-order block"); 843 } 844 845 // Fetch the latest block height. 846 let ledger_block_height = self.ledger.latest_block_height(); 847 848 // Clear any older pending blocks. 849 // TODO(kaimast): ensure there are no dangling block requests 850 while let Some(pending_block) = pending_blocks.front() { 851 if pending_block.height() > ledger_block_height { 852 break; 853 } 854 855 trace!( 856 "Pending block {hash} at height {height} became obsolete", 857 hash = pending_block.hash(), 858 height = pending_block.height() 859 ); 860 pending_blocks.pop_front(); 861 } 862 863 // Check the block against the chain of pending blocks and append it on success. 864 let new_block = match self.ledger.check_block_subdag(new_block, pending_blocks.make_contiguous()) { 865 Ok(new_block) => new_block, 866 Err(err) => { 867 // TODO(kaimast): this shoud not return an error on the snarkVM side. 868 if err.to_string().contains("already in the ledger") { 869 debug!("Ledger is already synced with block at height {new_block_height}. Will not sync.",); 870 871 return Ok(()); 872 } else { 873 return Err(err); 874 } 875 } 876 }; 877 878 trace!( 879 "Adding new pending block {hash} at height {height}", 880 hash = new_block.hash(), 881 height = new_block.height() 882 ); 883 pending_blocks.push_back(new_block); 884 885 // Now, figure out if and which pending block we can commit. 886 // To do this effectively and because commits are transitive, 887 // we iterate in reverse so that we can stop at the first successful check. 888 // 889 // Note, that if the storage already contains certificates for the round after new block, 890 // the availability threshold for the new block could also be reached. 891 let mut commit_height = None; 892 for block in pending_blocks.iter().rev() { 893 // This check assumes that the pending blocks are properly linked together, based on the fact that, 894 // to generate the sequence of `PendingBlocks`, each block needs to successfully be processed by `Ledger::check_block_subdag`. 895 // As a result, the safety of this piece of code relies on the correctness `Ledger::check_block_subdag`, 896 // which is tested in `snarkvm/ledger/tests/pending_block.rs`. 897 if self 898 .is_block_availability_threshold_reached(block) 899 .with_context(|| "Availability threshold check failed")? 900 { 901 commit_height = Some(block.height()); 902 break; 903 } 904 } 905 906 if let Some(commit_height) = commit_height { 907 let start_height = ledger_block_height + 1; 908 ensure!(commit_height >= start_height, "Invalid commit height"); 909 let num_blocks = (commit_height - start_height + 1) as usize; 910 911 // Create a more detailed log message if we are committing more than one block at a time. 912 if num_blocks > 1 { 913 trace!( 914 "Attempting to commit {chain_length} pending block(s) starting at height {start_height}.", 915 chain_length = pending_blocks.len(), 916 ); 917 } 918 919 for pending_block in pending_blocks.drain(0..num_blocks) { 920 let hash = pending_block.hash(); 921 let height = pending_block.height(); 922 let ledger = self.ledger.clone(); 923 let storage = self.storage.clone(); 924 925 spawn_blocking!({ 926 let block = ledger.check_block_content(pending_block).with_context(|| { 927 format!("Failed to check contents of pending block {hash} at height {height}") 928 })?; 929 930 trace!("Adding pending block {hash} at height {height} to the ledger"); 931 ledger.advance_to_next_block(&block)?; 932 // Sync the height with the block. 933 storage.sync_height_with_block(block.height()); 934 // Sync the round with the block. 935 storage.sync_round_with_block(block.round()); 936 937 Ok(()) 938 })? 939 } 940 } else { 941 trace!("No pending block are ready to be committed ({} block(s) are pending)", pending_blocks.len()); 942 } 943 944 Ok(()) 945 } 946 } 947 948 // Methods to assist with the block sync module. 949 impl<N: Network> Sync<N> { 950 /// Returns `true` if the node is synced and has connected peers. 951 pub fn is_synced(&self) -> bool { 952 // Ensure the validator is connected to other validators, 953 // not just clients. 954 if self.gateway.number_of_connected_peers() == 0 { 955 return false; 956 } 957 958 self.block_sync.is_block_synced() 959 } 960 961 /// Returns the number of blocks the node is behind the greatest peer height. 962 pub fn num_blocks_behind(&self) -> Option<u32> { 963 self.block_sync.num_blocks_behind() 964 } 965 966 /// Returns the current block locators of the node. 967 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> { 968 self.block_sync.get_block_locators() 969 } 970 } 971 972 // Methods to assist with fetching batch certificates from peers. 973 impl<N: Network> Sync<N> { 974 /// Sends a certificate request to the specified peer. 975 pub async fn send_certificate_request( 976 &self, 977 peer_ip: SocketAddr, 978 certificate_id: Field<N>, 979 ) -> Result<BatchCertificate<N>> { 980 // Initialize a oneshot channel. 981 let (callback_sender, callback_receiver) = oneshot::channel(); 982 // Determine how many sent requests are pending. 983 let num_sent_requests = self.pending.num_sent_requests(certificate_id); 984 // Determine if we've already sent a request to the peer. 985 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip); 986 // Determine the maximum number of redundant requests. 987 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?; 988 // Determine if we should send a certificate request to the peer. 989 // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time. 990 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request; 991 992 // Insert the certificate ID into the pending queue. 993 self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request))); 994 995 // If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer. 996 if should_send_request { 997 // Send the certificate request to the peer. 998 if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() { 999 bail!("Unable to fetch batch certificate {certificate_id} (failed to send request)") 1000 } 1001 } else { 1002 debug!( 1003 "Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)", 1004 fmt_id(certificate_id) 1005 ); 1006 } 1007 // Wait for the certificate to be fetched. 1008 // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators. 1009 tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver) 1010 .await 1011 .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))? 1012 .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id))) 1013 } 1014 1015 /// Handles the incoming certificate request. 1016 fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) { 1017 // Attempt to retrieve the certificate. 1018 if let Some(certificate) = self.storage.get_certificate(request.certificate_id) { 1019 // Send the certificate response to the peer. 1020 let self_ = self.clone(); 1021 tokio::spawn(async move { 1022 let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await; 1023 }); 1024 } 1025 } 1026 1027 /// Handles the incoming certificate response. 1028 /// This method ensures the certificate response is well-formed and matches the certificate ID. 1029 fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) { 1030 let certificate = response.certificate; 1031 // Check if the peer IP exists in the pending queue for the given certificate ID. 1032 let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip); 1033 // If the peer IP exists, finish the pending request. 1034 if exists { 1035 // TODO: Validate the certificate. 1036 // Remove the certificate ID from the pending queue. 1037 self.pending.remove(certificate.id(), Some(certificate)); 1038 } 1039 } 1040 } 1041 1042 impl<N: Network> Sync<N> { 1043 /// Spawns a task with the given future; it should only be used for long-running tasks. 1044 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 1045 self.handles.lock().push(tokio::spawn(future)); 1046 } 1047 1048 /// Shuts down the primary. 1049 pub async fn shut_down(&self) { 1050 info!("Shutting down the sync module..."); 1051 // Acquire the response lock. 1052 let _lock = self.response_lock.lock().await; 1053 // Acquire the sync lock. 1054 let _lock = self.sync_lock.lock().await; 1055 // Abort the tasks. 1056 self.handles.lock().iter().for_each(|handle| handle.abort()); 1057 } 1058 } 1059 1060 #[cfg(test)] 1061 mod tests { 1062 use super::*; 1063 1064 use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService}; 1065 1066 use alphaos_account::Account; 1067 use alphaos_node_sync::BlockSync; 1068 use alphaos_utilities::SimpleStoppable; 1069 use alphavm::{ 1070 console::{ 1071 account::{Address, PrivateKey}, 1072 network::MainnetV0, 1073 }, 1074 ledger::{ 1075 narwhal::{BatchCertificate, BatchHeader, Subdag}, 1076 store::{ConsensusStore, helpers::memory::ConsensusMemory}, 1077 }, 1078 prelude::{Ledger, VM}, 1079 utilities::TestRng, 1080 }; 1081 1082 use alpha_std::StorageMode; 1083 use indexmap::IndexSet; 1084 use rand::Rng; 1085 use std::collections::BTreeMap; 1086 1087 type CurrentNetwork = MainnetV0; 1088 type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>; 1089 type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>; 1090 1091 /// Tests that commits work as expected when some anchors are not committed immediately. 1092 #[tokio::test] 1093 #[tracing_test::traced_test] 1094 async fn test_commit_chain() -> anyhow::Result<()> { 1095 let rng = &mut TestRng::default(); 1096 // Initialize the round parameters. 1097 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64; 1098 1099 // The first round of the first block. 1100 let first_round = 1; 1101 // The total number of blocks we test 1102 let num_blocks = 3; 1103 // The number of certificate rounds needed. 1104 // There is one additional round to provide availability for the inal block. 1105 let num_rounds = first_round + num_blocks * 2 + 1; 1106 // The first round that has at least N-f certificates referencing the anchor from the previous round. 1107 // This is also the last round we use in the test. 1108 let first_committed_round = num_rounds - 1; 1109 1110 // Initialize the store. 1111 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap(); 1112 let account: Account<CurrentNetwork> = Account::new(rng)?; 1113 1114 // Create a genesis block with a seeded RNG to reproduce the same genesis private keys. 1115 let seed: u64 = rng.r#gen(); 1116 let vm = VM::from(store).unwrap(); 1117 let genesis_pk = *account.private_key(); 1118 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap(); 1119 1120 // Extract the private keys from the genesis committee by using the same RNG to sample private keys. 1121 let genesis_rng = &mut TestRng::from_seed(seed); 1122 let private_keys = [ 1123 *account.private_key(), 1124 PrivateKey::new(genesis_rng)?, 1125 PrivateKey::new(genesis_rng)?, 1126 PrivateKey::new(genesis_rng)?, 1127 ]; 1128 1129 // Initialize the ledger with the genesis block. 1130 let genesis_clone = genesis.clone(); 1131 let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap(); 1132 // Initialize the ledger. 1133 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new())); 1134 1135 // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors. 1136 let (round_to_certificates_map, committee) = { 1137 let addresses = vec![ 1138 Address::try_from(private_keys[0])?, 1139 Address::try_from(private_keys[1])?, 1140 Address::try_from(private_keys[2])?, 1141 Address::try_from(private_keys[3])?, 1142 ]; 1143 1144 let committee = ledger.latest_committee().unwrap(); 1145 1146 // Initialize a mapping from the round number to the set of batch certificates in the round. 1147 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = 1148 HashMap::new(); 1149 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4); 1150 1151 for round in first_round..=first_committed_round { 1152 let mut current_certificates = IndexSet::new(); 1153 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { 1154 IndexSet::new() 1155 } else { 1156 previous_certificates.iter().map(|c| c.id()).collect() 1157 }; 1158 1159 let committee_id = committee.id(); 1160 let prev_leader = committee.get_leader(round - 1).unwrap(); 1161 1162 // For the first two blocks non-leaders will not reference the leader certificate. 1163 // This means, while there is an anchor, it is isn't committed 1164 // until later. 1165 for (i, private_key) in private_keys.iter().enumerate() { 1166 let leader_index = addresses.iter().position(|&address| address == prev_leader).unwrap(); 1167 let is_certificate_round = round % 2 == 1; 1168 let is_leader = i == leader_index; 1169 1170 let previous_certs = if round < first_committed_round && is_certificate_round && !is_leader { 1171 previous_certificate_ids 1172 .iter() 1173 .cloned() 1174 .enumerate() 1175 .filter(|(idx, _)| *idx != leader_index) 1176 .map(|(_, id)| id) 1177 .collect() 1178 } else { 1179 previous_certificate_ids.clone() 1180 }; 1181 1182 let batch_header = BatchHeader::new( 1183 private_key, 1184 round, 1185 now(), 1186 committee_id, 1187 Default::default(), 1188 previous_certs, 1189 rng, 1190 ) 1191 .unwrap(); 1192 1193 // Sign the batch header. 1194 let mut signatures = IndexSet::with_capacity(4); 1195 for (j, private_key_2) in private_keys.iter().enumerate() { 1196 if i != j { 1197 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); 1198 } 1199 } 1200 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap()); 1201 } 1202 1203 // Update the map of certificates. 1204 round_to_certificates_map.insert(round, current_certificates.clone()); 1205 previous_certificates = current_certificates; 1206 } 1207 (round_to_certificates_map, committee) 1208 }; 1209 1210 // Initialize the storage. 1211 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1212 // Insert all certificates into storage. 1213 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new(); 1214 for i in first_round..=first_committed_round { 1215 let c = (*round_to_certificates_map.get(&i).unwrap()).clone(); 1216 certificates.extend(c); 1217 } 1218 for certificate in certificates.clone().iter() { 1219 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1220 } 1221 1222 // Create the blocks 1223 let mut previous_leader_cert = None; 1224 let mut blocks = vec![]; 1225 1226 for block_height in 1..=num_blocks { 1227 let leader_round = block_height * 2; 1228 1229 let leader = committee.get_leader(leader_round).unwrap(); 1230 let leader_certificate = storage.get_certificate_for_round_with_author(leader_round, leader).unwrap(); 1231 1232 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1233 let mut leader_cert_map = IndexSet::new(); 1234 leader_cert_map.insert(leader_certificate.clone()); 1235 1236 let previous_cert_map = storage.get_certificates_for_round(leader_round - 1); 1237 1238 subdag_map.insert(leader_round, leader_cert_map.clone()); 1239 subdag_map.insert(leader_round - 1, previous_cert_map.clone()); 1240 1241 if leader_round > 2 { 1242 let previous_commit_cert_map: IndexSet<_> = storage 1243 .get_certificates_for_round(leader_round - 2) 1244 .into_iter() 1245 .filter(|cert| { 1246 if let Some(previous_leader_cert) = &previous_leader_cert { 1247 cert != previous_leader_cert 1248 } else { 1249 true 1250 } 1251 }) 1252 .collect(); 1253 subdag_map.insert(leader_round - 2, previous_commit_cert_map); 1254 } 1255 1256 let subdag = Subdag::from(subdag_map.clone())?; 1257 previous_leader_cert = Some(leader_certificate); 1258 1259 let core_ledger = core_ledger.clone(); 1260 let block = spawn_blocking!({ 1261 let block = core_ledger.clone().prepare_advance_to_next_quorum_block(subdag, Default::default())?; 1262 core_ledger.advance_to_next_block(&block)?; 1263 Ok(block) 1264 })?; 1265 1266 blocks.push(block); 1267 } 1268 1269 // ### Test that sync works as expected ### 1270 let storage_mode = StorageMode::new_test(None); 1271 1272 // Create a new ledger to test with, but use the existing storage 1273 // so that the certificates exist. 1274 let syncing_ledger = { 1275 let storage_mode = storage_mode.clone(); 1276 Arc::new(CoreLedgerService::new( 1277 spawn_blocking!(CurrentLedger::load(genesis, storage_mode)).unwrap(), 1278 SimpleStoppable::new(), 1279 )) 1280 }; 1281 1282 // Set up sync and its dependencies. 1283 let gateway = Gateway::new( 1284 account.clone(), 1285 storage.clone(), 1286 syncing_ledger.clone(), 1287 None, 1288 &[], 1289 false, 1290 StorageMode::new_test(None), 1291 None, 1292 )?; 1293 let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone())); 1294 let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync); 1295 1296 let mut block_iter = blocks.into_iter(); 1297 1298 // Insert the blocks into the new sync module 1299 for _ in 0..num_blocks - 1 { 1300 let block = block_iter.next().unwrap(); 1301 sync.sync_storage_with_block(block).await?; 1302 1303 // Availability threshold is not met, so we should not advance yet. 1304 assert_eq!(syncing_ledger.latest_block_height(), 0); 1305 } 1306 1307 // Only for the final block, the availability threshold is met, 1308 // because certificates for the subsequent round are already in storage. 1309 sync.sync_storage_with_block(block_iter.next().unwrap()).await?; 1310 assert_eq!(syncing_ledger.latest_block_height(), 3); 1311 1312 // Ensure blocks 1 and 2 were added to the ledger. 1313 assert!(syncing_ledger.contains_block_height(1)); 1314 assert!(syncing_ledger.contains_block_height(2)); 1315 1316 Ok(()) 1317 } 1318 1319 #[tokio::test] 1320 #[tracing_test::traced_test] 1321 async fn test_pending_certificates() -> anyhow::Result<()> { 1322 let rng = &mut TestRng::default(); 1323 // Initialize the round parameters. 1324 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64; 1325 let commit_round = 2; 1326 1327 // Initialize the store. 1328 let store = CurrentConsensusStore::open(StorageMode::new_test(None)).unwrap(); 1329 let account: Account<CurrentNetwork> = Account::new(rng)?; 1330 1331 // Create a genesis block with a seeded RNG to reproduce the same genesis private keys. 1332 let seed: u64 = rng.r#gen(); 1333 let vm = VM::from(store).unwrap(); 1334 let genesis_pk = *account.private_key(); 1335 let genesis = spawn_blocking!(vm.genesis_beacon(&genesis_pk, &mut TestRng::from_seed(seed))).unwrap(); 1336 1337 // Extract the private keys from the genesis committee by using the same RNG to sample private keys. 1338 let genesis_rng = &mut TestRng::from_seed(seed); 1339 let private_keys = [ 1340 *account.private_key(), 1341 PrivateKey::new(genesis_rng)?, 1342 PrivateKey::new(genesis_rng)?, 1343 PrivateKey::new(genesis_rng)?, 1344 ]; 1345 // Initialize the ledger with the genesis block. 1346 let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap(); 1347 // Initialize the ledger. 1348 let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new())); 1349 // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors. 1350 let (round_to_certificates_map, committee) = { 1351 // Initialize the committee. 1352 let committee = core_ledger.current_committee().unwrap(); 1353 // Initialize a mapping from the round number to the set of batch certificates in the round. 1354 let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = 1355 HashMap::new(); 1356 let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4); 1357 1358 for round in 0..=commit_round + 8 { 1359 let mut current_certificates = IndexSet::new(); 1360 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { 1361 IndexSet::new() 1362 } else { 1363 previous_certificates.iter().map(|c| c.id()).collect() 1364 }; 1365 let committee_id = committee.id(); 1366 // Create a certificate for each validator. 1367 for (i, private_key_1) in private_keys.iter().enumerate() { 1368 let batch_header = BatchHeader::new( 1369 private_key_1, 1370 round, 1371 now(), 1372 committee_id, 1373 Default::default(), 1374 previous_certificate_ids.clone(), 1375 rng, 1376 ) 1377 .unwrap(); 1378 // Sign the batch header. 1379 let mut signatures = IndexSet::with_capacity(4); 1380 for (j, private_key_2) in private_keys.iter().enumerate() { 1381 if i != j { 1382 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); 1383 } 1384 } 1385 current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap()); 1386 } 1387 1388 // Update the map of certificates. 1389 round_to_certificates_map.insert(round, current_certificates.clone()); 1390 previous_certificates = current_certificates.clone(); 1391 } 1392 (round_to_certificates_map, committee) 1393 }; 1394 1395 // Initialize the storage. 1396 let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1397 // Insert certificates into storage. 1398 let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new(); 1399 for i in 1..=commit_round + 8 { 1400 let c = (*round_to_certificates_map.get(&i).unwrap()).clone(); 1401 certificates.extend(c); 1402 } 1403 for certificate in certificates.clone().iter() { 1404 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1405 } 1406 // Create block 1. 1407 let leader_round_1 = commit_round; 1408 let leader_1 = committee.get_leader(leader_round_1).unwrap(); 1409 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap(); 1410 let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1411 let block_1 = { 1412 let mut leader_cert_map = IndexSet::new(); 1413 leader_cert_map.insert(leader_certificate.clone()); 1414 let mut previous_cert_map = IndexSet::new(); 1415 for cert in storage.get_certificates_for_round(commit_round - 1) { 1416 previous_cert_map.insert(cert); 1417 } 1418 subdag_map.insert(commit_round, leader_cert_map.clone()); 1419 subdag_map.insert(commit_round - 1, previous_cert_map.clone()); 1420 let subdag = Subdag::from(subdag_map.clone())?; 1421 let ledger = core_ledger.clone(); 1422 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag, Default::default()))? 1423 }; 1424 // Insert block 1. 1425 let ledger = core_ledger.clone(); 1426 let block = block_1.clone(); 1427 spawn_blocking!(ledger.advance_to_next_block(&block))?; 1428 1429 // Create block 2. 1430 let leader_round_2 = commit_round + 2; 1431 let leader_2 = committee.get_leader(leader_round_2).unwrap(); 1432 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap(); 1433 let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1434 let block_2 = { 1435 let mut leader_cert_map_2 = IndexSet::new(); 1436 leader_cert_map_2.insert(leader_certificate_2.clone()); 1437 let mut previous_cert_map_2 = IndexSet::new(); 1438 for cert in storage.get_certificates_for_round(leader_round_2 - 1) { 1439 previous_cert_map_2.insert(cert); 1440 } 1441 subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone()); 1442 subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone()); 1443 let subdag_2 = Subdag::from(subdag_map_2.clone())?; 1444 let ledger = core_ledger.clone(); 1445 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default()))? 1446 }; 1447 // Insert block 2. 1448 let ledger = core_ledger.clone(); 1449 let block = block_2.clone(); 1450 spawn_blocking!(ledger.advance_to_next_block(&block))?; 1451 1452 // Create block 3 1453 let leader_round_3 = commit_round + 4; 1454 let leader_3 = committee.get_leader(leader_round_3).unwrap(); 1455 let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap(); 1456 let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new(); 1457 let block_3 = { 1458 let mut leader_cert_map_3 = IndexSet::new(); 1459 leader_cert_map_3.insert(leader_certificate_3.clone()); 1460 let mut previous_cert_map_3 = IndexSet::new(); 1461 for cert in storage.get_certificates_for_round(leader_round_3 - 1) { 1462 previous_cert_map_3.insert(cert); 1463 } 1464 subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone()); 1465 subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone()); 1466 let subdag_3 = Subdag::from(subdag_map_3.clone())?; 1467 let ledger = core_ledger.clone(); 1468 spawn_blocking!(ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default()))? 1469 }; 1470 // Insert block 3. 1471 let ledger = core_ledger.clone(); 1472 let block = block_3.clone(); 1473 spawn_blocking!(ledger.advance_to_next_block(&block))?; 1474 1475 /* 1476 Check that the pending certificates are computed correctly. 1477 */ 1478 1479 // Retrieve the pending certificates. 1480 let pending_certificates = storage.get_pending_certificates(); 1481 // Check that all of the pending certificates are not contained in the ledger. 1482 for certificate in pending_certificates.clone() { 1483 assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false)); 1484 } 1485 // Initialize an empty set to be populated with the committed certificates in the block subdags. 1486 let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new(); 1487 { 1488 let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3]; 1489 for subdag in subdag_maps.iter() { 1490 for subdag_certificates in subdag.values() { 1491 committed_certificates.extend(subdag_certificates.iter().cloned()); 1492 } 1493 } 1494 }; 1495 // Create the set of candidate pending certificates as the set of all certificates minus the set of the committed certificates. 1496 let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new(); 1497 for certificate in certificates.clone() { 1498 if !committed_certificates.contains(&certificate) { 1499 candidate_pending_certificates.insert(certificate); 1500 } 1501 } 1502 // Check that the set of pending certificates is equal to the set of candidate pending certificates. 1503 assert_eq!(pending_certificates, candidate_pending_certificates); 1504 1505 Ok(()) 1506 } 1507 }