/ node / src / client / mod.rs
mod.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  mod router;
 20  
 21  use crate::{
 22      bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking},
 23      cdn::CdnBlockSync,
 24      traits::NodeInterface,
 25  };
 26  
 27  use alphaos_account::Account;
 28  use alphaos_node_network::NodeType;
 29  use alphaos_node_rest::Rest;
 30  use alphaos_node_router::{
 31      messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
 32      Heartbeat,
 33      Inbound,
 34      Outbound,
 35      Router,
 36      Routing,
 37  };
 38  use alphaos_node_sync::{locators::BlockLocators, BlockSync, Ping, PrepareSyncRequest, BLOCK_REQUEST_BATCH_DELAY};
 39  use alphaos_node_tcp::{
 40      protocols::{Disconnect, Handshake, OnConnect, Reading},
 41      P2P,
 42  };
 43  use alphaos_utilities::{SignalHandler, Stoppable};
 44  
 45  use alphavm::{
 46      console::network::Network,
 47      ledger::{
 48          block::{Block, Header},
 49          puzzle::{Puzzle, Solution, SolutionID},
 50          store::ConsensusStorage,
 51          Ledger,
 52      },
 53      prelude::{block::Transaction, VM},
 54      utilities::flatten_error,
 55  };
 56  
 57  use alphastd::StorageMode;
 58  use anyhow::{Context, Result};
 59  use core::future::Future;
 60  use indexmap::IndexMap;
 61  #[cfg(feature = "locktick")]
 62  use locktick::parking_lot::Mutex;
 63  use lru::LruCache;
 64  #[cfg(not(feature = "locktick"))]
 65  use parking_lot::Mutex;
 66  use std::{
 67      net::SocketAddr,
 68      num::NonZeroUsize,
 69      sync::{
 70          atomic::{
 71              AtomicUsize,
 72              Ordering::{Acquire, Relaxed},
 73          },
 74          Arc,
 75      },
 76      time::Duration,
 77  };
 78  use tokio::{
 79      task::JoinHandle,
 80      time::{sleep, timeout},
 81  };
 82  
 83  /// The maximum number of solutions to verify in parallel.
 84  /// Note: worst case memory to verify a solution is 0.5 GiB.
 85  const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
 86  /// The capacity for storing unconfirmed deployments.
 87  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
 88  const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
 89  /// The capacity for storing unconfirmed executions.
 90  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
 91  const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
 92  /// The capacity for storing unconfirmed solutions.
 93  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
 94  const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
 95  
 96  /// Transaction details needed for propagation.
 97  /// We preserve the serialized transaction for faster propagation.
 98  type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
 99  /// Solution details needed for propagation.
100  /// We preserve the serialized solution for faster propagation.
101  type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
102  
103  /// A client node is a full node, capable of querying with the network.
104  #[derive(Clone)]
105  pub struct Client<N: Network, C: ConsensusStorage<N>> {
106      /// The ledger of the node.
107      ledger: Ledger<N, C>,
108      /// The router of the node.
109      router: Router<N>,
110      /// The REST server of the node.
111      rest: Option<Rest<N, C, Self>>,
112      /// The block synchronization logic.
113      sync: Arc<BlockSync<N>>,
114      /// The genesis block.
115      genesis: Block<N>,
116      /// The puzzle.
117      puzzle: Puzzle<N>,
118      /// The unconfirmed solutions queue.
119      solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
120      /// The unconfirmed deployments queue.
121      deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
122      /// The unconfirmed executions queue.
123      execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
124      /// The amount of solutions currently being verified.
125      num_verifying_solutions: Arc<AtomicUsize>,
126      /// The amount of deployments currently being verified.
127      num_verifying_deploys: Arc<AtomicUsize>,
128      /// The amount of executions currently being verified.
129      num_verifying_executions: Arc<AtomicUsize>,
130      /// The spawned handles.
131      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
132      /// Keeps track of sending pings.
133      ping: Arc<Ping<N>>,
134      /// The signal handling logic.
135      signal_handler: Arc<SignalHandler>,
136  }
137  
138  impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
139      /// Initializes a new client node.
140      pub async fn new(
141          node_ip: SocketAddr,
142          rest_ip: Option<SocketAddr>,
143          rest_rps: u32,
144          account: Account<N>,
145          trusted_peers: &[SocketAddr],
146          genesis: Block<N>,
147          cdn: Option<http::Uri>,
148          storage_mode: StorageMode,
149          trusted_peers_only: bool,
150          dev: Option<u16>,
151          signal_handler: Arc<SignalHandler>,
152      ) -> Result<Self> {
153          // Initialize the ledger.
154          let ledger = {
155              let storage_mode = storage_mode.clone();
156              let genesis = genesis.clone();
157  
158              spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
159          }
160          .with_context(|| "Failed to initialize the ledger")?;
161  
162          // Initialize the ledger service.
163          let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone()));
164          // Initialize the node router.
165          let router = Router::new(
166              node_ip,
167              NodeType::Client,
168              account,
169              ledger_service.clone(),
170              trusted_peers,
171              Self::MAXIMUM_NUMBER_OF_PEERS as u16,
172              trusted_peers_only,
173              storage_mode.clone(),
174              dev.is_some(),
175          )
176          .await?;
177  
178          // Initialize the sync module.
179          let sync = Arc::new(BlockSync::new(ledger_service.clone()));
180  
181          // Set up the ping logic.
182          let locators = sync.get_block_locators()?;
183          let ping = Arc::new(Ping::new(router.clone(), locators));
184  
185          // Initialize the node.
186          let mut node = Self {
187              ledger: ledger.clone(),
188              router,
189              rest: None,
190              sync: sync.clone(),
191              genesis,
192              ping,
193              puzzle: ledger.puzzle().clone(),
194              solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
195              deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
196              execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
197              num_verifying_solutions: Default::default(),
198              num_verifying_deploys: Default::default(),
199              num_verifying_executions: Default::default(),
200              handles: Default::default(),
201              signal_handler: signal_handler.clone(),
202          };
203  
204          // Perform sync with CDN (if enabled).
205          let cdn_sync = cdn.map(|base_url| {
206              trace!("CDN sync is enabled");
207              Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
208          });
209  
210          // Initialize the REST server.
211          if let Some(rest_ip) = rest_ip {
212              node.rest = Some(
213                  Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
214                      .await?,
215              );
216          }
217  
218          // Set up everything else after CDN sync is done.
219          if let Some(cdn_sync) = cdn_sync {
220              if let Err(error) = cdn_sync.wait().await {
221                  crate::log_clean_error(&storage_mode);
222                  node.shut_down().await;
223                  return Err(error);
224              }
225          }
226  
227          // Initialize the routing.
228          node.initialize_routing().await;
229          // Initialize the sync module.
230          node.initialize_sync();
231          // Initialize solution verification.
232          node.initialize_solution_verification();
233          // Initialize deployment verification.
234          node.initialize_deploy_verification();
235          // Initialize execution verification.
236          node.initialize_execute_verification();
237          // Initialize the notification message loop.
238          node.handles.lock().push(crate::start_notification_message_loop());
239          // Return the node.
240          Ok(node)
241      }
242  
243      /// Returns the ledger.
244      pub fn ledger(&self) -> &Ledger<N, C> {
245          &self.ledger
246      }
247  
248      /// Returns the REST server.
249      pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
250          &self.rest
251      }
252  
253      /// Returns the router.
254      pub fn router(&self) -> &Router<N> {
255          &self.router
256      }
257  }
258  
259  /// Sync-specific code.
260  impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
261      /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
262      /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
263      const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
264  
265      /// Spawns the tasks that performs the syncing logic for this client.
266      fn initialize_sync(&self) {
267          // Start the block request generation loop (outgoing).
268          let self_ = self.clone();
269          self.spawn(async move {
270              while !self_.signal_handler.is_stopped() {
271                  // Perform the sync routine.
272                  self_.try_issuing_block_requests().await;
273              }
274  
275              info!("Stopped block request generation");
276          });
277  
278          // Start the block response processing loop (incoming).
279          let self_ = self.clone();
280          self.spawn(async move {
281              while !self_.signal_handler.is_stopped() {
282                  // Wait until there is something to do or until the timeout.
283                  let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
284  
285                  // Perform the sync routine.
286                  self_.try_advancing_block_synchronization().await;
287  
288                  // We perform no additional rate limiting here as
289                  // requests are already rate-limited.
290              }
291  
292              debug!("Stopped block response processing");
293          });
294      }
295  
296      /// Client-side version of [`alphavm_node_bft::Sync::try_advancing_block_synchronization`].
297      async fn try_advancing_block_synchronization(&self) {
298          let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
299              Ok(val) => val,
300              Err(err) => {
301                  error!("Block synchronization failed - {err}");
302                  return;
303              }
304          };
305  
306          // If there are new blocks, we need to update the block locators.
307          if has_new_blocks {
308              match self.sync.get_block_locators() {
309                  Ok(locators) => self.ping.update_block_locators(locators),
310                  Err(err) => error!("Failed to get block locators: {err}"),
311              }
312          }
313      }
314  
315      /// Client-side version of `alphavm_node_bft::Sync::try_block_sync()`.
316      async fn try_issuing_block_requests(&self) {
317          // Wait for peer updates or timeout
318          let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await;
319  
320          // For sanity, check that sync height is never below ledger height.
321          // (if the ledger height is lower or equal to the current sync height, this is a noop)
322          self.sync.set_sync_height(self.ledger.latest_height());
323  
324          match self.sync.handle_block_request_timeouts(&self.router) {
325              Ok(Some((requests, sync_peers))) => {
326                  // Re-request blocks instead of performing regular block sync.
327                  self.send_block_requests(requests, sync_peers).await;
328                  return;
329              }
330              Ok(None) => {}
331              Err(err) => {
332                  // Abort and retry later.
333                  error!("{}", flatten_error(&err));
334                  return;
335              }
336          }
337  
338          // Do not attempt to sync if there are not blocks to sync.
339          // This prevents redundant log messages and performing unnecessary computation.
340          if !self.sync.can_block_sync() {
341              trace!("Nothing to sync. Will not issue new block requests");
342              return;
343          }
344  
345          // First, try to advance the ledger with new responses.
346          let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
347              Ok(val) => val,
348              Err(err) => {
349                  error!("{err}");
350                  return;
351              }
352          };
353  
354          if has_new_blocks {
355              match self.sync.get_block_locators() {
356                  Ok(locators) => self.ping.update_block_locators(locators),
357                  Err(err) => error!("Failed to get block locators: {err}"),
358              }
359  
360              // If these were the last blocks to process, do not continue.
361              if !self.sync.can_block_sync() {
362                  return;
363              }
364          }
365  
366          // Prepare the block requests, if any.
367          // In the process, we update the state of `is_block_synced` for the sync module.
368          let (block_requests, sync_peers) = self.sync.prepare_block_requests();
369  
370          // If there are no block requests, but there are pending block responses in the sync pool,
371          // then try to advance the ledger using these pending block responses.
372          if block_requests.is_empty() {
373              let total_requests = self.sync.num_total_block_requests();
374              let num_outstanding = self.sync.num_outstanding_block_requests();
375              if total_requests > 0 {
376                  trace!(
377                      "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
378                  );
379              } else {
380                  // This can happen during peer rotation and should not be a warning.
381                  debug!(
382                      "Not block synced yet, and there are no outstanding block requests or \
383                   new block requests to send"
384                  );
385              }
386          } else {
387              self.send_block_requests(block_requests, sync_peers).await;
388          }
389      }
390  
391      async fn send_block_requests(
392          &self,
393          block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
394          sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
395      ) {
396          // Issues the block requests in batches.
397          for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
398              if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await {
399                  // Stop if we fail to process a batch of requests.
400                  break;
401              }
402  
403              // Sleep to avoid triggering spam detection.
404              tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
405          }
406      }
407  
408      /// Initializes solution verification.
409      fn initialize_solution_verification(&self) {
410          // Start the solution verification loop.
411          let node = self.clone();
412          self.spawn(async move {
413              loop {
414                  // If the Ctrl-C handler registered the signal, stop the node.
415                  if node.signal_handler.is_stopped() {
416                      info!("Shutting down solution verification");
417                      break;
418                  }
419  
420                  // Determine if the queue contains txs to verify.
421                  let queue_is_empty = node.solution_queue.lock().is_empty();
422                  // Determine if our verification counter has space to verify new solutions.
423                  let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
424  
425                  // Sleep to allow the queue to be filled or solutions to be validated.
426                  if queue_is_empty || counter_is_full {
427                      sleep(Duration::from_millis(50)).await;
428                      continue;
429                  }
430  
431                  // Try to verify solutions.
432                  let mut solution_queue = node.solution_queue.lock();
433                  while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
434                      // Increment the verification counter.
435                      let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
436                      let _node = node.clone();
437                      // For each solution, spawn a task to verify it.
438                      tokio::task::spawn_blocking(move || {
439                          // Retrieve the latest epoch hash.
440                          if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
441                              // Check if the prover has reached their solution limit.
442                              // While alphavm will ultimately abort any excess solutions for safety, performing this check
443                              // here prevents the to-be aborted solutions from propagating through the network.
444                              let prover_address = solution.address();
445                              if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
446                                  debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
447                              }
448                              // Retrieve the latest proof target.
449                              let proof_target = _node.ledger.latest_block().header().proof_target();
450                              // Ensure that the solution is valid for the given epoch.
451                              let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
452  
453                              match is_valid {
454                                  // If the solution is valid, propagate the `UnconfirmedSolution`.
455                                  Ok(()) => {
456                                      let message = Message::UnconfirmedSolution(serialized);
457                                      // Propagate the "UnconfirmedSolution".
458                                      _node.propagate(message, &[peer_ip]);
459                                  }
460                                  // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
461                                  Err(error) => {
462                                      if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
463                                          debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
464                                      }
465                                  }
466                              }
467                          } else {
468                              warn!("Failed to retrieve the latest epoch hash.");
469                          }
470                          // Decrement the verification counter.
471                          _node.num_verifying_solutions.fetch_sub(1, Relaxed);
472                      });
473                      // If we are already at capacity, don't verify more solutions.
474                      if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
475                          break;
476                      }
477                  }
478              }
479          });
480      }
481  
482      /// Initializes deploy verification.
483      fn initialize_deploy_verification(&self) {
484          // Start the deploy verification loop.
485          let node = self.clone();
486          self.spawn(async move {
487              loop {
488                  // If the Ctrl-C handler registered the signal, stop the node.
489                  if node.signal_handler.is_stopped() {
490                      info!("Shutting down deployment verification");
491                      break;
492                  }
493  
494                  // Determine if the queue contains txs to verify.
495                  let queue_is_empty = node.deploy_queue.lock().is_empty();
496                  // Determine if our verification counter has space to verify new txs.
497                  let counter_is_full =
498                      node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
499  
500                  // Sleep to allow the queue to be filled or transactions to be validated.
501                  if queue_is_empty || counter_is_full {
502                      sleep(Duration::from_millis(50)).await;
503                      continue;
504                  }
505  
506                  // Try to verify deployments.
507                  while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
508                      // Increment the verification counter.
509                      let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
510                      let _node = node.clone();
511                      // For each deployment, spawn a task to verify it.
512                      tokio::task::spawn_blocking(move || {
513                          // First collect the state root.
514                          let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
515                              debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
516                              _node.num_verifying_deploys.fetch_sub(1, Relaxed);
517                              return;
518                          };
519                          // Check if the state root is in the ledger.
520                          if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
521                              debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
522                              // Propagate the `UnconfirmedTransaction`.
523                              _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
524                              _node.num_verifying_deploys.fetch_sub(1, Relaxed);
525                              return;
526                              // Also skip the `check_transaction_basic` call if it is already propagated.
527                          }
528                          // Check the deployment.
529                          match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
530                              Ok(_) => {
531                                  // Propagate the `UnconfirmedTransaction`.
532                                  _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
533                              }
534                              Err(error) => {
535                                  debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
536                              }
537                          }
538                          // Decrement the verification counter.
539                          _node.num_verifying_deploys.fetch_sub(1, Relaxed);
540                      });
541                      // If we are already at capacity, don't verify more deployments.
542                      if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
543                          break;
544                      }
545                  }
546              }
547          });
548      }
549  
550      /// Initializes execute verification.
551      fn initialize_execute_verification(&self) {
552          // Start the execute verification loop.
553          let node = self.clone();
554          self.spawn(async move {
555              loop {
556                  // If the Ctrl-C handler registered the signal, stop the node.
557                  if node.signal_handler.is_stopped() {
558                      info!("Shutting down execution verification");
559                      break;
560                  }
561  
562                  // Determine if the queue contains txs to verify.
563                  let queue_is_empty = node.execute_queue.lock().is_empty();
564                  // Determine if our verification counter has space to verify new txs.
565                  let counter_is_full =
566                      node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
567  
568                  // Sleep to allow the queue to be filled or transactions to be validated.
569                  if queue_is_empty || counter_is_full {
570                      sleep(Duration::from_millis(50)).await;
571                      continue;
572                  }
573  
574                  // Try to verify executions.
575                  while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
576                      // Increment the verification counter.
577                      let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
578                      let _node = node.clone();
579                      // For each execution, spawn a task to verify it.
580                      tokio::task::spawn_blocking(move || {
581                          // First collect the state roots.
582                          let state_roots = [
583                              transaction.execution().map(|t| t.global_state_root()),
584                              transaction.fee_transition().map(|t| t.global_state_root()),
585                          ]
586                          .into_iter()
587                          .flatten();
588  
589                          for state_root in state_roots {
590                              if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
591                                  debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
592                                  // Propagate the `UnconfirmedTransaction`.
593                                  _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
594                                  _node.num_verifying_executions.fetch_sub(1, Relaxed);
595                                  return;
596                                  // Also skip the `check_transaction_basic` call if it is already propagated.
597                              }
598                          }
599                          // Check the execution.
600                          match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
601                              Ok(_) => {
602                                  // Propagate the `UnconfirmedTransaction`.
603                                  _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
604                              }
605                              Err(error) => {
606                                  debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
607                              }
608                          }
609                          // Decrement the verification counter.
610                          _node.num_verifying_executions.fetch_sub(1, Relaxed);
611                      });
612                      // If we are already at capacity, don't verify more executions.
613                      if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
614                          break;
615                      }
616                  }
617              }
618          });
619      }
620  
621      /// Spawns a task with the given future; it should only be used for long-running tasks.
622      pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
623          self.handles.lock().push(tokio::spawn(future));
624      }
625  }
626  
627  #[async_trait]
628  impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
629      /// Shuts down the node.
630      async fn shut_down(&self) {
631          info!("Shutting down...");
632  
633          // Shut down the node.
634          trace!("Shutting down the node...");
635  
636          // Abort the tasks.
637          trace!("Shutting down the client...");
638          self.handles.lock().iter().for_each(|handle| handle.abort());
639  
640          // Shut down the router.
641          self.router.shut_down().await;
642  
643          info!("Node has shut down.");
644      }
645  }