mod.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 mod router; 17 18 use crate::{ 19 bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking}, 20 cdn::CdnBlockSync, 21 traits::NodeInterface, 22 }; 23 24 use alphaos_account::Account; 25 use alphaos_node_network::NodeType; 26 use alphaos_node_rest::Rest; 27 use alphaos_node_router::{ 28 Heartbeat, 29 Inbound, 30 Outbound, 31 Router, 32 Routing, 33 messages::{Message, UnconfirmedSolution, UnconfirmedTransaction}, 34 }; 35 use alphaos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators}; 36 use alphaos_node_tcp::{ 37 P2P, 38 protocols::{Disconnect, Handshake, OnConnect, Reading}, 39 }; 40 use alphaos_utilities::{SignalHandler, Stoppable}; 41 42 use alphavm::{ 43 console::network::Network, 44 ledger::{ 45 Ledger, 46 block::{Block, Header}, 47 puzzle::{Puzzle, Solution, SolutionID}, 48 store::ConsensusStorage, 49 }, 50 prelude::{VM, block::Transaction}, 51 utilities::flatten_error, 52 }; 53 54 use alpha_std::StorageMode; 55 use anyhow::{Context, Result}; 56 use core::future::Future; 57 use indexmap::IndexMap; 58 #[cfg(feature = "locktick")] 59 use locktick::parking_lot::Mutex; 60 use lru::LruCache; 61 #[cfg(not(feature = "locktick"))] 62 use parking_lot::Mutex; 63 use std::{ 64 net::SocketAddr, 65 num::NonZeroUsize, 66 sync::{ 67 Arc, 68 atomic::{ 69 AtomicUsize, 70 Ordering::{Acquire, Relaxed}, 71 }, 72 }, 73 time::Duration, 74 }; 75 use tokio::{ 76 task::JoinHandle, 77 time::{sleep, timeout}, 78 }; 79 80 /// The maximum number of solutions to verify in parallel. 81 /// Note: worst case memory to verify a solution is 0.5 GiB. 82 const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20; 83 /// The capacity for storing unconfirmed deployments. 84 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 85 const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10; 86 /// The capacity for storing unconfirmed executions. 87 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 88 const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10; 89 /// The capacity for storing unconfirmed solutions. 90 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 91 const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10; 92 93 /// Transaction details needed for propagation. 94 /// We preserve the serialized transaction for faster propagation. 95 type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>); 96 /// Solution details needed for propagation. 97 /// We preserve the serialized solution for faster propagation. 98 type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>); 99 100 /// A client node is a full node, capable of querying with the network. 101 #[derive(Clone)] 102 pub struct Client<N: Network, C: ConsensusStorage<N>> { 103 /// The ledger of the node. 104 ledger: Ledger<N, C>, 105 /// The router of the node. 106 router: Router<N>, 107 /// The REST server of the node. 108 rest: Option<Rest<N, C, Self>>, 109 /// The block synchronization logic. 110 sync: Arc<BlockSync<N>>, 111 /// The genesis block. 112 genesis: Block<N>, 113 /// The puzzle. 114 puzzle: Puzzle<N>, 115 /// The unconfirmed solutions queue. 116 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>, 117 /// The unconfirmed deployments queue. 118 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>, 119 /// The unconfirmed executions queue. 120 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>, 121 /// The amount of solutions currently being verified. 122 num_verifying_solutions: Arc<AtomicUsize>, 123 /// The amount of deployments currently being verified. 124 num_verifying_deploys: Arc<AtomicUsize>, 125 /// The amount of executions currently being verified. 126 num_verifying_executions: Arc<AtomicUsize>, 127 /// The spawned handles. 128 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 129 /// Keeps track of sending pings. 130 ping: Arc<Ping<N>>, 131 /// The signal handling logic. 132 signal_handler: Arc<SignalHandler>, 133 } 134 135 impl<N: Network, C: ConsensusStorage<N>> Client<N, C> { 136 /// Initializes a new client node. 137 pub async fn new( 138 node_ip: SocketAddr, 139 rest_ip: Option<SocketAddr>, 140 rest_rps: u32, 141 account: Account<N>, 142 trusted_peers: &[SocketAddr], 143 genesis: Block<N>, 144 cdn: Option<http::Uri>, 145 storage_mode: StorageMode, 146 trusted_peers_only: bool, 147 dev: Option<u16>, 148 signal_handler: Arc<SignalHandler>, 149 ) -> Result<Self> { 150 // Initialize the ledger. 151 let ledger = { 152 let storage_mode = storage_mode.clone(); 153 let genesis = genesis.clone(); 154 155 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode)) 156 } 157 .with_context(|| "Failed to initialize the ledger")?; 158 159 // Initialize the ledger service. 160 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone())); 161 // Initialize the node router. 162 let router = Router::new( 163 node_ip, 164 NodeType::Client, 165 account, 166 ledger_service.clone(), 167 trusted_peers, 168 Self::MAXIMUM_NUMBER_OF_PEERS as u16, 169 trusted_peers_only, 170 storage_mode.clone(), 171 dev.is_some(), 172 ) 173 .await?; 174 175 // Initialize the sync module. 176 let sync = Arc::new(BlockSync::new(ledger_service.clone())); 177 178 // Set up the ping logic. 179 let locators = sync.get_block_locators()?; 180 let ping = Arc::new(Ping::new(router.clone(), locators)); 181 182 // Initialize the node. 183 let mut node = Self { 184 ledger: ledger.clone(), 185 router, 186 rest: None, 187 sync: sync.clone(), 188 genesis, 189 ping, 190 puzzle: ledger.puzzle().clone(), 191 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))), 192 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))), 193 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))), 194 num_verifying_solutions: Default::default(), 195 num_verifying_deploys: Default::default(), 196 num_verifying_executions: Default::default(), 197 handles: Default::default(), 198 signal_handler: signal_handler.clone(), 199 }; 200 201 // Perform sync with CDN (if enabled). 202 let cdn_sync = cdn.map(|base_url| { 203 trace!("CDN sync is enabled"); 204 Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)) 205 }); 206 207 // Initialize the REST server. 208 if let Some(rest_ip) = rest_ip { 209 node.rest = Some( 210 Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync) 211 .await?, 212 ); 213 } 214 215 // Set up everything else after CDN sync is done. 216 if let Some(cdn_sync) = cdn_sync { 217 if let Err(error) = cdn_sync.wait().await { 218 crate::log_clean_error(&storage_mode); 219 node.shut_down().await; 220 return Err(error); 221 } 222 } 223 224 // Initialize the routing. 225 node.initialize_routing().await; 226 // Initialize the sync module. 227 node.initialize_sync(); 228 // Initialize solution verification. 229 node.initialize_solution_verification(); 230 // Initialize deployment verification. 231 node.initialize_deploy_verification(); 232 // Initialize execution verification. 233 node.initialize_execute_verification(); 234 // Initialize the notification message loop. 235 node.handles.lock().push(crate::start_notification_message_loop()); 236 // Return the node. 237 Ok(node) 238 } 239 240 /// Returns the ledger. 241 pub fn ledger(&self) -> &Ledger<N, C> { 242 &self.ledger 243 } 244 245 /// Returns the REST server. 246 pub fn rest(&self) -> &Option<Rest<N, C, Self>> { 247 &self.rest 248 } 249 250 /// Returns the router. 251 pub fn router(&self) -> &Router<N> { 252 &self.router 253 } 254 } 255 256 /// Sync-specific code. 257 impl<N: Network, C: ConsensusStorage<N>> Client<N, C> { 258 /// The maximum time to wait for peer updates before timing out and attempting to issue new requests. 259 /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates. 260 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30); 261 262 /// Spawns the tasks that performs the syncing logic for this client. 263 fn initialize_sync(&self) { 264 // Start the block request generation loop (outgoing). 265 let self_ = self.clone(); 266 self.spawn(async move { 267 while !self_.signal_handler.is_stopped() { 268 // Perform the sync routine. 269 self_.try_issuing_block_requests().await; 270 } 271 272 info!("Stopped block request generation"); 273 }); 274 275 // Start the block response processing loop (incoming). 276 let self_ = self.clone(); 277 self.spawn(async move { 278 while !self_.signal_handler.is_stopped() { 279 // Wait until there is something to do or until the timeout. 280 let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await; 281 282 // Perform the sync routine. 283 self_.try_advancing_block_synchronization().await; 284 285 // We perform no additional rate limiting here as 286 // requests are already rate-limited. 287 } 288 289 debug!("Stopped block response processing"); 290 }); 291 } 292 293 /// Client-side version of [`snarkvm_node_bft::Sync::try_advancing_block_synchronization`]. 294 async fn try_advancing_block_synchronization(&self) { 295 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await { 296 Ok(val) => val, 297 Err(err) => { 298 error!("Block synchronization failed - {err}"); 299 return; 300 } 301 }; 302 303 // If there are new blocks, we need to update the block locators. 304 if has_new_blocks { 305 match self.sync.get_block_locators() { 306 Ok(locators) => self.ping.update_block_locators(locators), 307 Err(err) => error!("Failed to get block locators: {err}"), 308 } 309 } 310 } 311 312 /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`. 313 async fn try_issuing_block_requests(&self) { 314 // Wait for peer updates or timeout 315 let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await; 316 317 // For sanity, check that sync height is never below ledger height. 318 // (if the ledger height is lower or equal to the current sync height, this is a noop) 319 self.sync.set_sync_height(self.ledger.latest_height()); 320 321 match self.sync.handle_block_request_timeouts(&self.router) { 322 Ok(Some((requests, sync_peers))) => { 323 // Re-request blocks instead of performing regular block sync. 324 self.send_block_requests(requests, sync_peers).await; 325 return; 326 } 327 Ok(None) => {} 328 Err(err) => { 329 // Abort and retry later. 330 error!("{}", flatten_error(&err)); 331 return; 332 } 333 } 334 335 // Do not attempt to sync if there are not blocks to sync. 336 // This prevents redundant log messages and performing unnecessary computation. 337 if !self.sync.can_block_sync() { 338 trace!("Nothing to sync. Will not issue new block requests"); 339 return; 340 } 341 342 // First, try to advance the ledger with new responses. 343 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await { 344 Ok(val) => val, 345 Err(err) => { 346 error!("{err}"); 347 return; 348 } 349 }; 350 351 if has_new_blocks { 352 match self.sync.get_block_locators() { 353 Ok(locators) => self.ping.update_block_locators(locators), 354 Err(err) => error!("Failed to get block locators: {err}"), 355 } 356 357 // If these were the last blocks to process, do not continue. 358 if !self.sync.can_block_sync() { 359 return; 360 } 361 } 362 363 // Prepare the block requests, if any. 364 // In the process, we update the state of `is_block_synced` for the sync module. 365 let (block_requests, sync_peers) = self.sync.prepare_block_requests(); 366 367 // If there are no block requests, but there are pending block responses in the sync pool, 368 // then try to advance the ledger using these pending block responses. 369 if block_requests.is_empty() { 370 let total_requests = self.sync.num_total_block_requests(); 371 let num_outstanding = self.sync.num_outstanding_block_requests(); 372 if total_requests > 0 { 373 trace!( 374 "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses." 375 ); 376 } else { 377 // This can happen during peer rotation and should not be a warning. 378 debug!( 379 "Not block synced yet, and there are no outstanding block requests or \ 380 new block requests to send" 381 ); 382 } 383 } else { 384 self.send_block_requests(block_requests, sync_peers).await; 385 } 386 } 387 388 async fn send_block_requests( 389 &self, 390 block_requests: Vec<(u32, PrepareSyncRequest<N>)>, 391 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>, 392 ) { 393 // Issues the block requests in batches. 394 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) { 395 if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await { 396 // Stop if we fail to process a batch of requests. 397 break; 398 } 399 400 // Sleep to avoid triggering spam detection. 401 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; 402 } 403 } 404 405 /// Initializes solution verification. 406 fn initialize_solution_verification(&self) { 407 // Start the solution verification loop. 408 let node = self.clone(); 409 self.spawn(async move { 410 loop { 411 // If the Ctrl-C handler registered the signal, stop the node. 412 if node.signal_handler.is_stopped() { 413 info!("Shutting down solution verification"); 414 break; 415 } 416 417 // Determine if the queue contains txs to verify. 418 let queue_is_empty = node.solution_queue.lock().is_empty(); 419 // Determine if our verification counter has space to verify new solutions. 420 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS; 421 422 // Sleep to allow the queue to be filled or solutions to be validated. 423 if queue_is_empty || counter_is_full { 424 sleep(Duration::from_millis(50)).await; 425 continue; 426 } 427 428 // Try to verify solutions. 429 let mut solution_queue = node.solution_queue.lock(); 430 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() { 431 // Increment the verification counter. 432 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed); 433 let _node = node.clone(); 434 // For each solution, spawn a task to verify it. 435 tokio::task::spawn_blocking(move || { 436 // Retrieve the latest epoch hash. 437 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() { 438 // Check if the prover has reached their solution limit. 439 // While snarkVM will ultimately abort any excess solutions for safety, performing this check 440 // here prevents the to-be aborted solutions from propagating through the network. 441 let prover_address = solution.address(); 442 if _node.ledger.is_solution_limit_reached(&prover_address, 0) { 443 debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id())); 444 } 445 // Retrieve the latest proof target. 446 let proof_target = _node.ledger.latest_block().header().proof_target(); 447 // Ensure that the solution is valid for the given epoch. 448 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target); 449 450 match is_valid { 451 // If the solution is valid, propagate the `UnconfirmedSolution`. 452 Ok(()) => { 453 let message = Message::UnconfirmedSolution(serialized); 454 // Propagate the "UnconfirmedSolution". 455 _node.propagate(message, &[peer_ip]); 456 } 457 // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore. 458 Err(error) => { 459 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 { 460 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}") 461 } 462 } 463 } 464 } else { 465 warn!("Failed to retrieve the latest epoch hash."); 466 } 467 // Decrement the verification counter. 468 _node.num_verifying_solutions.fetch_sub(1, Relaxed); 469 }); 470 // If we are already at capacity, don't verify more solutions. 471 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS { 472 break; 473 } 474 } 475 } 476 }); 477 } 478 479 /// Initializes deploy verification. 480 fn initialize_deploy_verification(&self) { 481 // Start the deploy verification loop. 482 let node = self.clone(); 483 self.spawn(async move { 484 loop { 485 // If the Ctrl-C handler registered the signal, stop the node. 486 if node.signal_handler.is_stopped() { 487 info!("Shutting down deployment verification"); 488 break; 489 } 490 491 // Determine if the queue contains txs to verify. 492 let queue_is_empty = node.deploy_queue.lock().is_empty(); 493 // Determine if our verification counter has space to verify new txs. 494 let counter_is_full = 495 node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS; 496 497 // Sleep to allow the queue to be filled or transactions to be validated. 498 if queue_is_empty || counter_is_full { 499 sleep(Duration::from_millis(50)).await; 500 continue; 501 } 502 503 // Try to verify deployments. 504 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() { 505 // Increment the verification counter. 506 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed); 507 let _node = node.clone(); 508 // For each deployment, spawn a task to verify it. 509 tokio::task::spawn_blocking(move || { 510 // First collect the state root. 511 let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else { 512 debug!("Failed to access global state root for deployment from peer_ip {peer_ip}"); 513 _node.num_verifying_deploys.fetch_sub(1, Relaxed); 514 return; 515 }; 516 // Check if the state root is in the ledger. 517 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) { 518 debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway"); 519 // Propagate the `UnconfirmedTransaction`. 520 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 521 _node.num_verifying_deploys.fetch_sub(1, Relaxed); 522 return; 523 // Also skip the `check_transaction_basic` call if it is already propagated. 524 } 525 // Check the deployment. 526 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) { 527 Ok(_) => { 528 // Propagate the `UnconfirmedTransaction`. 529 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 530 } 531 Err(error) => { 532 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}"); 533 } 534 } 535 // Decrement the verification counter. 536 _node.num_verifying_deploys.fetch_sub(1, Relaxed); 537 }); 538 // If we are already at capacity, don't verify more deployments. 539 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS { 540 break; 541 } 542 } 543 } 544 }); 545 } 546 547 /// Initializes execute verification. 548 fn initialize_execute_verification(&self) { 549 // Start the execute verification loop. 550 let node = self.clone(); 551 self.spawn(async move { 552 loop { 553 // If the Ctrl-C handler registered the signal, stop the node. 554 if node.signal_handler.is_stopped() { 555 info!("Shutting down execution verification"); 556 break; 557 } 558 559 // Determine if the queue contains txs to verify. 560 let queue_is_empty = node.execute_queue.lock().is_empty(); 561 // Determine if our verification counter has space to verify new txs. 562 let counter_is_full = 563 node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS; 564 565 // Sleep to allow the queue to be filled or transactions to be validated. 566 if queue_is_empty || counter_is_full { 567 sleep(Duration::from_millis(50)).await; 568 continue; 569 } 570 571 // Try to verify executions. 572 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() { 573 // Increment the verification counter. 574 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed); 575 let _node = node.clone(); 576 // For each execution, spawn a task to verify it. 577 tokio::task::spawn_blocking(move || { 578 // First collect the state roots. 579 let state_roots = [ 580 transaction.execution().map(|t| t.global_state_root()), 581 transaction.fee_transition().map(|t| t.global_state_root()), 582 ] 583 .into_iter() 584 .flatten(); 585 586 for state_root in state_roots { 587 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) { 588 debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway"); 589 // Propagate the `UnconfirmedTransaction`. 590 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 591 _node.num_verifying_executions.fetch_sub(1, Relaxed); 592 return; 593 // Also skip the `check_transaction_basic` call if it is already propagated. 594 } 595 } 596 // Check the execution. 597 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) { 598 Ok(_) => { 599 // Propagate the `UnconfirmedTransaction`. 600 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 601 } 602 Err(error) => { 603 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}"); 604 } 605 } 606 // Decrement the verification counter. 607 _node.num_verifying_executions.fetch_sub(1, Relaxed); 608 }); 609 // If we are already at capacity, don't verify more executions. 610 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS { 611 break; 612 } 613 } 614 } 615 }); 616 } 617 618 /// Spawns a task with the given future; it should only be used for long-running tasks. 619 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 620 self.handles.lock().push(tokio::spawn(future)); 621 } 622 } 623 624 #[async_trait] 625 impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> { 626 /// Shuts down the node. 627 async fn shut_down(&self) { 628 info!("Shutting down..."); 629 630 // Shut down the node. 631 trace!("Shutting down the node..."); 632 633 // Abort the tasks. 634 trace!("Shutting down the client..."); 635 self.handles.lock().iter().for_each(|handle| handle.abort()); 636 637 // Shut down the router. 638 self.router.shut_down().await; 639 640 info!("Node has shut down."); 641 } 642 }