mod.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 mod router; 20 21 use crate::{ 22 bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking}, 23 cdn::CdnBlockSync, 24 traits::NodeInterface, 25 }; 26 27 use alphaos_account::Account; 28 use alphaos_node_network::NodeType; 29 use alphaos_node_rest::Rest; 30 use alphaos_node_router::{ 31 messages::{Message, UnconfirmedSolution, UnconfirmedTransaction}, 32 Heartbeat, 33 Inbound, 34 Outbound, 35 Router, 36 Routing, 37 }; 38 use alphaos_node_sync::{locators::BlockLocators, BlockSync, Ping, PrepareSyncRequest, BLOCK_REQUEST_BATCH_DELAY}; 39 use alphaos_node_tcp::{ 40 protocols::{Disconnect, Handshake, OnConnect, Reading}, 41 P2P, 42 }; 43 use alphaos_utilities::{SignalHandler, Stoppable}; 44 45 use alphavm::{ 46 console::network::Network, 47 ledger::{ 48 block::{Block, Header}, 49 puzzle::{Puzzle, Solution, SolutionID}, 50 store::ConsensusStorage, 51 Ledger, 52 }, 53 prelude::{block::Transaction, VM}, 54 utilities::flatten_error, 55 }; 56 57 use alphastd::StorageMode; 58 use anyhow::{Context, Result}; 59 use core::future::Future; 60 use indexmap::IndexMap; 61 #[cfg(feature = "locktick")] 62 use locktick::parking_lot::Mutex; 63 use lru::LruCache; 64 #[cfg(not(feature = "locktick"))] 65 use parking_lot::Mutex; 66 use std::{ 67 net::SocketAddr, 68 num::NonZeroUsize, 69 sync::{ 70 atomic::{ 71 AtomicUsize, 72 Ordering::{Acquire, Relaxed}, 73 }, 74 Arc, 75 }, 76 time::Duration, 77 }; 78 use tokio::{ 79 task::JoinHandle, 80 time::{sleep, timeout}, 81 }; 82 83 /// The maximum number of solutions to verify in parallel. 84 /// Note: worst case memory to verify a solution is 0.5 GiB. 85 const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20; 86 /// The capacity for storing unconfirmed deployments. 87 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 88 const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10; 89 /// The capacity for storing unconfirmed executions. 90 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 91 const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10; 92 /// The capacity for storing unconfirmed solutions. 93 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 94 const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10; 95 96 /// Transaction details needed for propagation. 97 /// We preserve the serialized transaction for faster propagation. 98 type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>); 99 /// Solution details needed for propagation. 100 /// We preserve the serialized solution for faster propagation. 101 type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>); 102 103 /// A client node is a full node, capable of querying with the network. 104 #[derive(Clone)] 105 pub struct Client<N: Network, C: ConsensusStorage<N>> { 106 /// The ledger of the node. 107 ledger: Ledger<N, C>, 108 /// The router of the node. 109 router: Router<N>, 110 /// The REST server of the node. 111 rest: Option<Rest<N, C, Self>>, 112 /// The block synchronization logic. 113 sync: Arc<BlockSync<N>>, 114 /// The genesis block. 115 genesis: Block<N>, 116 /// The puzzle. 117 puzzle: Puzzle<N>, 118 /// The unconfirmed solutions queue. 119 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>, 120 /// The unconfirmed deployments queue. 121 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>, 122 /// The unconfirmed executions queue. 123 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>, 124 /// The amount of solutions currently being verified. 125 num_verifying_solutions: Arc<AtomicUsize>, 126 /// The amount of deployments currently being verified. 127 num_verifying_deploys: Arc<AtomicUsize>, 128 /// The amount of executions currently being verified. 129 num_verifying_executions: Arc<AtomicUsize>, 130 /// The spawned handles. 131 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 132 /// Keeps track of sending pings. 133 ping: Arc<Ping<N>>, 134 /// The signal handling logic. 135 signal_handler: Arc<SignalHandler>, 136 } 137 138 impl<N: Network, C: ConsensusStorage<N>> Client<N, C> { 139 /// Initializes a new client node. 140 pub async fn new( 141 node_ip: SocketAddr, 142 rest_ip: Option<SocketAddr>, 143 rest_rps: u32, 144 account: Account<N>, 145 trusted_peers: &[SocketAddr], 146 genesis: Block<N>, 147 cdn: Option<http::Uri>, 148 storage_mode: StorageMode, 149 trusted_peers_only: bool, 150 dev: Option<u16>, 151 signal_handler: Arc<SignalHandler>, 152 ) -> Result<Self> { 153 // Initialize the ledger. 154 let ledger = { 155 let storage_mode = storage_mode.clone(); 156 let genesis = genesis.clone(); 157 158 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode)) 159 } 160 .with_context(|| "Failed to initialize the ledger")?; 161 162 // Initialize the ledger service. 163 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone())); 164 // Initialize the node router. 165 let router = Router::new( 166 node_ip, 167 NodeType::Client, 168 account, 169 ledger_service.clone(), 170 trusted_peers, 171 Self::MAXIMUM_NUMBER_OF_PEERS as u16, 172 trusted_peers_only, 173 storage_mode.clone(), 174 dev.is_some(), 175 ) 176 .await?; 177 178 // Initialize the sync module. 179 let sync = Arc::new(BlockSync::new(ledger_service.clone())); 180 181 // Set up the ping logic. 182 let locators = sync.get_block_locators()?; 183 let ping = Arc::new(Ping::new(router.clone(), locators)); 184 185 // Initialize the node. 186 let mut node = Self { 187 ledger: ledger.clone(), 188 router, 189 rest: None, 190 sync: sync.clone(), 191 genesis, 192 ping, 193 puzzle: ledger.puzzle().clone(), 194 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))), 195 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))), 196 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))), 197 num_verifying_solutions: Default::default(), 198 num_verifying_deploys: Default::default(), 199 num_verifying_executions: Default::default(), 200 handles: Default::default(), 201 signal_handler: signal_handler.clone(), 202 }; 203 204 // Perform sync with CDN (if enabled). 205 let cdn_sync = cdn.map(|base_url| { 206 trace!("CDN sync is enabled"); 207 Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)) 208 }); 209 210 // Initialize the REST server. 211 if let Some(rest_ip) = rest_ip { 212 node.rest = Some( 213 Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync) 214 .await?, 215 ); 216 } 217 218 // Set up everything else after CDN sync is done. 219 if let Some(cdn_sync) = cdn_sync { 220 if let Err(error) = cdn_sync.wait().await { 221 crate::log_clean_error(&storage_mode); 222 node.shut_down().await; 223 return Err(error); 224 } 225 } 226 227 // Initialize the routing. 228 node.initialize_routing().await; 229 // Initialize the sync module. 230 node.initialize_sync(); 231 // Initialize solution verification. 232 node.initialize_solution_verification(); 233 // Initialize deployment verification. 234 node.initialize_deploy_verification(); 235 // Initialize execution verification. 236 node.initialize_execute_verification(); 237 // Initialize the notification message loop. 238 node.handles.lock().push(crate::start_notification_message_loop()); 239 // Return the node. 240 Ok(node) 241 } 242 243 /// Returns the ledger. 244 pub fn ledger(&self) -> &Ledger<N, C> { 245 &self.ledger 246 } 247 248 /// Returns the REST server. 249 pub fn rest(&self) -> &Option<Rest<N, C, Self>> { 250 &self.rest 251 } 252 253 /// Returns the router. 254 pub fn router(&self) -> &Router<N> { 255 &self.router 256 } 257 } 258 259 /// Sync-specific code. 260 impl<N: Network, C: ConsensusStorage<N>> Client<N, C> { 261 /// The maximum time to wait for peer updates before timing out and attempting to issue new requests. 262 /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates. 263 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30); 264 265 /// Spawns the tasks that performs the syncing logic for this client. 266 fn initialize_sync(&self) { 267 // Start the block request generation loop (outgoing). 268 let self_ = self.clone(); 269 self.spawn(async move { 270 while !self_.signal_handler.is_stopped() { 271 // Perform the sync routine. 272 self_.try_issuing_block_requests().await; 273 } 274 275 info!("Stopped block request generation"); 276 }); 277 278 // Start the block response processing loop (incoming). 279 let self_ = self.clone(); 280 self.spawn(async move { 281 while !self_.signal_handler.is_stopped() { 282 // Wait until there is something to do or until the timeout. 283 let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await; 284 285 // Perform the sync routine. 286 self_.try_advancing_block_synchronization().await; 287 288 // We perform no additional rate limiting here as 289 // requests are already rate-limited. 290 } 291 292 debug!("Stopped block response processing"); 293 }); 294 } 295 296 /// Client-side version of [`alphavm_node_bft::Sync::try_advancing_block_synchronization`]. 297 async fn try_advancing_block_synchronization(&self) { 298 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await { 299 Ok(val) => val, 300 Err(err) => { 301 error!("Block synchronization failed - {err}"); 302 return; 303 } 304 }; 305 306 // If there are new blocks, we need to update the block locators. 307 if has_new_blocks { 308 match self.sync.get_block_locators() { 309 Ok(locators) => self.ping.update_block_locators(locators), 310 Err(err) => error!("Failed to get block locators: {err}"), 311 } 312 } 313 } 314 315 /// Client-side version of `alphavm_node_bft::Sync::try_block_sync()`. 316 async fn try_issuing_block_requests(&self) { 317 // Wait for peer updates or timeout 318 let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await; 319 320 // For sanity, check that sync height is never below ledger height. 321 // (if the ledger height is lower or equal to the current sync height, this is a noop) 322 self.sync.set_sync_height(self.ledger.latest_height()); 323 324 match self.sync.handle_block_request_timeouts(&self.router) { 325 Ok(Some((requests, sync_peers))) => { 326 // Re-request blocks instead of performing regular block sync. 327 self.send_block_requests(requests, sync_peers).await; 328 return; 329 } 330 Ok(None) => {} 331 Err(err) => { 332 // Abort and retry later. 333 error!("{}", flatten_error(&err)); 334 return; 335 } 336 } 337 338 // Do not attempt to sync if there are not blocks to sync. 339 // This prevents redundant log messages and performing unnecessary computation. 340 if !self.sync.can_block_sync() { 341 trace!("Nothing to sync. Will not issue new block requests"); 342 return; 343 } 344 345 // First, try to advance the ledger with new responses. 346 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await { 347 Ok(val) => val, 348 Err(err) => { 349 error!("{err}"); 350 return; 351 } 352 }; 353 354 if has_new_blocks { 355 match self.sync.get_block_locators() { 356 Ok(locators) => self.ping.update_block_locators(locators), 357 Err(err) => error!("Failed to get block locators: {err}"), 358 } 359 360 // If these were the last blocks to process, do not continue. 361 if !self.sync.can_block_sync() { 362 return; 363 } 364 } 365 366 // Prepare the block requests, if any. 367 // In the process, we update the state of `is_block_synced` for the sync module. 368 let (block_requests, sync_peers) = self.sync.prepare_block_requests(); 369 370 // If there are no block requests, but there are pending block responses in the sync pool, 371 // then try to advance the ledger using these pending block responses. 372 if block_requests.is_empty() { 373 let total_requests = self.sync.num_total_block_requests(); 374 let num_outstanding = self.sync.num_outstanding_block_requests(); 375 if total_requests > 0 { 376 trace!( 377 "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses." 378 ); 379 } else { 380 // This can happen during peer rotation and should not be a warning. 381 debug!( 382 "Not block synced yet, and there are no outstanding block requests or \ 383 new block requests to send" 384 ); 385 } 386 } else { 387 self.send_block_requests(block_requests, sync_peers).await; 388 } 389 } 390 391 async fn send_block_requests( 392 &self, 393 block_requests: Vec<(u32, PrepareSyncRequest<N>)>, 394 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>, 395 ) { 396 // Issues the block requests in batches. 397 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) { 398 if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await { 399 // Stop if we fail to process a batch of requests. 400 break; 401 } 402 403 // Sleep to avoid triggering spam detection. 404 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; 405 } 406 } 407 408 /// Initializes solution verification. 409 fn initialize_solution_verification(&self) { 410 // Start the solution verification loop. 411 let node = self.clone(); 412 self.spawn(async move { 413 loop { 414 // If the Ctrl-C handler registered the signal, stop the node. 415 if node.signal_handler.is_stopped() { 416 info!("Shutting down solution verification"); 417 break; 418 } 419 420 // Determine if the queue contains txs to verify. 421 let queue_is_empty = node.solution_queue.lock().is_empty(); 422 // Determine if our verification counter has space to verify new solutions. 423 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS; 424 425 // Sleep to allow the queue to be filled or solutions to be validated. 426 if queue_is_empty || counter_is_full { 427 sleep(Duration::from_millis(50)).await; 428 continue; 429 } 430 431 // Try to verify solutions. 432 let mut solution_queue = node.solution_queue.lock(); 433 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() { 434 // Increment the verification counter. 435 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed); 436 let _node = node.clone(); 437 // For each solution, spawn a task to verify it. 438 tokio::task::spawn_blocking(move || { 439 // Retrieve the latest epoch hash. 440 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() { 441 // Check if the prover has reached their solution limit. 442 // While alphavm will ultimately abort any excess solutions for safety, performing this check 443 // here prevents the to-be aborted solutions from propagating through the network. 444 let prover_address = solution.address(); 445 if _node.ledger.is_solution_limit_reached(&prover_address, 0) { 446 debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id())); 447 } 448 // Retrieve the latest proof target. 449 let proof_target = _node.ledger.latest_block().header().proof_target(); 450 // Ensure that the solution is valid for the given epoch. 451 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target); 452 453 match is_valid { 454 // If the solution is valid, propagate the `UnconfirmedSolution`. 455 Ok(()) => { 456 let message = Message::UnconfirmedSolution(serialized); 457 // Propagate the "UnconfirmedSolution". 458 _node.propagate(message, &[peer_ip]); 459 } 460 // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore. 461 Err(error) => { 462 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 { 463 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}") 464 } 465 } 466 } 467 } else { 468 warn!("Failed to retrieve the latest epoch hash."); 469 } 470 // Decrement the verification counter. 471 _node.num_verifying_solutions.fetch_sub(1, Relaxed); 472 }); 473 // If we are already at capacity, don't verify more solutions. 474 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS { 475 break; 476 } 477 } 478 } 479 }); 480 } 481 482 /// Initializes deploy verification. 483 fn initialize_deploy_verification(&self) { 484 // Start the deploy verification loop. 485 let node = self.clone(); 486 self.spawn(async move { 487 loop { 488 // If the Ctrl-C handler registered the signal, stop the node. 489 if node.signal_handler.is_stopped() { 490 info!("Shutting down deployment verification"); 491 break; 492 } 493 494 // Determine if the queue contains txs to verify. 495 let queue_is_empty = node.deploy_queue.lock().is_empty(); 496 // Determine if our verification counter has space to verify new txs. 497 let counter_is_full = 498 node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS; 499 500 // Sleep to allow the queue to be filled or transactions to be validated. 501 if queue_is_empty || counter_is_full { 502 sleep(Duration::from_millis(50)).await; 503 continue; 504 } 505 506 // Try to verify deployments. 507 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() { 508 // Increment the verification counter. 509 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed); 510 let _node = node.clone(); 511 // For each deployment, spawn a task to verify it. 512 tokio::task::spawn_blocking(move || { 513 // First collect the state root. 514 let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else { 515 debug!("Failed to access global state root for deployment from peer_ip {peer_ip}"); 516 _node.num_verifying_deploys.fetch_sub(1, Relaxed); 517 return; 518 }; 519 // Check if the state root is in the ledger. 520 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) { 521 debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway"); 522 // Propagate the `UnconfirmedTransaction`. 523 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 524 _node.num_verifying_deploys.fetch_sub(1, Relaxed); 525 return; 526 // Also skip the `check_transaction_basic` call if it is already propagated. 527 } 528 // Check the deployment. 529 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) { 530 Ok(_) => { 531 // Propagate the `UnconfirmedTransaction`. 532 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 533 } 534 Err(error) => { 535 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}"); 536 } 537 } 538 // Decrement the verification counter. 539 _node.num_verifying_deploys.fetch_sub(1, Relaxed); 540 }); 541 // If we are already at capacity, don't verify more deployments. 542 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS { 543 break; 544 } 545 } 546 } 547 }); 548 } 549 550 /// Initializes execute verification. 551 fn initialize_execute_verification(&self) { 552 // Start the execute verification loop. 553 let node = self.clone(); 554 self.spawn(async move { 555 loop { 556 // If the Ctrl-C handler registered the signal, stop the node. 557 if node.signal_handler.is_stopped() { 558 info!("Shutting down execution verification"); 559 break; 560 } 561 562 // Determine if the queue contains txs to verify. 563 let queue_is_empty = node.execute_queue.lock().is_empty(); 564 // Determine if our verification counter has space to verify new txs. 565 let counter_is_full = 566 node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS; 567 568 // Sleep to allow the queue to be filled or transactions to be validated. 569 if queue_is_empty || counter_is_full { 570 sleep(Duration::from_millis(50)).await; 571 continue; 572 } 573 574 // Try to verify executions. 575 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() { 576 // Increment the verification counter. 577 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed); 578 let _node = node.clone(); 579 // For each execution, spawn a task to verify it. 580 tokio::task::spawn_blocking(move || { 581 // First collect the state roots. 582 let state_roots = [ 583 transaction.execution().map(|t| t.global_state_root()), 584 transaction.fee_transition().map(|t| t.global_state_root()), 585 ] 586 .into_iter() 587 .flatten(); 588 589 for state_root in state_roots { 590 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) { 591 debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway"); 592 // Propagate the `UnconfirmedTransaction`. 593 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 594 _node.num_verifying_executions.fetch_sub(1, Relaxed); 595 return; 596 // Also skip the `check_transaction_basic` call if it is already propagated. 597 } 598 } 599 // Check the execution. 600 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) { 601 Ok(_) => { 602 // Propagate the `UnconfirmedTransaction`. 603 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); 604 } 605 Err(error) => { 606 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}"); 607 } 608 } 609 // Decrement the verification counter. 610 _node.num_verifying_executions.fetch_sub(1, Relaxed); 611 }); 612 // If we are already at capacity, don't verify more executions. 613 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS { 614 break; 615 } 616 } 617 } 618 }); 619 } 620 621 /// Spawns a task with the given future; it should only be used for long-running tasks. 622 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 623 self.handles.lock().push(tokio::spawn(future)); 624 } 625 } 626 627 #[async_trait] 628 impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> { 629 /// Shuts down the node. 630 async fn shut_down(&self) { 631 info!("Shutting down..."); 632 633 // Shut down the node. 634 trace!("Shutting down the node..."); 635 636 // Abort the tasks. 637 trace!("Shutting down the client..."); 638 self.handles.lock().iter().for_each(|handle| handle.abort()); 639 640 // Shut down the router. 641 self.router.shut_down().await; 642 643 info!("Node has shut down."); 644 } 645 }