/ node / src / client / mod.rs
mod.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  mod router;
 17  
 18  use crate::{
 19      bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking},
 20      cdn::CdnBlockSync,
 21      traits::NodeInterface,
 22  };
 23  
 24  use alphaos_account::Account;
 25  use alphaos_node_network::NodeType;
 26  use alphaos_node_rest::Rest;
 27  use alphaos_node_router::{
 28      Heartbeat,
 29      Inbound,
 30      Outbound,
 31      Router,
 32      Routing,
 33      messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
 34  };
 35  use alphaos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
 36  use alphaos_node_tcp::{
 37      P2P,
 38      protocols::{Disconnect, Handshake, OnConnect, Reading},
 39  };
 40  use alphaos_utilities::{SignalHandler, Stoppable};
 41  
 42  use alphavm::{
 43      console::network::Network,
 44      ledger::{
 45          Ledger,
 46          block::{Block, Header},
 47          puzzle::{Puzzle, Solution, SolutionID},
 48          store::ConsensusStorage,
 49      },
 50      prelude::{VM, block::Transaction},
 51      utilities::flatten_error,
 52  };
 53  
 54  use alpha_std::StorageMode;
 55  use anyhow::{Context, Result};
 56  use core::future::Future;
 57  use indexmap::IndexMap;
 58  #[cfg(feature = "locktick")]
 59  use locktick::parking_lot::Mutex;
 60  use lru::LruCache;
 61  #[cfg(not(feature = "locktick"))]
 62  use parking_lot::Mutex;
 63  use std::{
 64      net::SocketAddr,
 65      num::NonZeroUsize,
 66      sync::{
 67          Arc,
 68          atomic::{
 69              AtomicUsize,
 70              Ordering::{Acquire, Relaxed},
 71          },
 72      },
 73      time::Duration,
 74  };
 75  use tokio::{
 76      task::JoinHandle,
 77      time::{sleep, timeout},
 78  };
 79  
 80  /// The maximum number of solutions to verify in parallel.
 81  /// Note: worst case memory to verify a solution is 0.5 GiB.
 82  const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
 83  /// The capacity for storing unconfirmed deployments.
 84  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
 85  const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
 86  /// The capacity for storing unconfirmed executions.
 87  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
 88  const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
 89  /// The capacity for storing unconfirmed solutions.
 90  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
 91  const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
 92  
 93  /// Transaction details needed for propagation.
 94  /// We preserve the serialized transaction for faster propagation.
 95  type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
 96  /// Solution details needed for propagation.
 97  /// We preserve the serialized solution for faster propagation.
 98  type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
 99  
100  /// A client node is a full node, capable of querying with the network.
101  #[derive(Clone)]
102  pub struct Client<N: Network, C: ConsensusStorage<N>> {
103      /// The ledger of the node.
104      ledger: Ledger<N, C>,
105      /// The router of the node.
106      router: Router<N>,
107      /// The REST server of the node.
108      rest: Option<Rest<N, C, Self>>,
109      /// The block synchronization logic.
110      sync: Arc<BlockSync<N>>,
111      /// The genesis block.
112      genesis: Block<N>,
113      /// The puzzle.
114      puzzle: Puzzle<N>,
115      /// The unconfirmed solutions queue.
116      solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
117      /// The unconfirmed deployments queue.
118      deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
119      /// The unconfirmed executions queue.
120      execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
121      /// The amount of solutions currently being verified.
122      num_verifying_solutions: Arc<AtomicUsize>,
123      /// The amount of deployments currently being verified.
124      num_verifying_deploys: Arc<AtomicUsize>,
125      /// The amount of executions currently being verified.
126      num_verifying_executions: Arc<AtomicUsize>,
127      /// The spawned handles.
128      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
129      /// Keeps track of sending pings.
130      ping: Arc<Ping<N>>,
131      /// The signal handling logic.
132      signal_handler: Arc<SignalHandler>,
133  }
134  
135  impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
136      /// Initializes a new client node.
137      pub async fn new(
138          node_ip: SocketAddr,
139          rest_ip: Option<SocketAddr>,
140          rest_rps: u32,
141          account: Account<N>,
142          trusted_peers: &[SocketAddr],
143          genesis: Block<N>,
144          cdn: Option<http::Uri>,
145          storage_mode: StorageMode,
146          trusted_peers_only: bool,
147          dev: Option<u16>,
148          signal_handler: Arc<SignalHandler>,
149      ) -> Result<Self> {
150          // Initialize the ledger.
151          let ledger = {
152              let storage_mode = storage_mode.clone();
153              let genesis = genesis.clone();
154  
155              spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
156          }
157          .with_context(|| "Failed to initialize the ledger")?;
158  
159          // Initialize the ledger service.
160          let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone()));
161          // Initialize the node router.
162          let router = Router::new(
163              node_ip,
164              NodeType::Client,
165              account,
166              ledger_service.clone(),
167              trusted_peers,
168              Self::MAXIMUM_NUMBER_OF_PEERS as u16,
169              trusted_peers_only,
170              storage_mode.clone(),
171              dev.is_some(),
172          )
173          .await?;
174  
175          // Initialize the sync module.
176          let sync = Arc::new(BlockSync::new(ledger_service.clone()));
177  
178          // Set up the ping logic.
179          let locators = sync.get_block_locators()?;
180          let ping = Arc::new(Ping::new(router.clone(), locators));
181  
182          // Initialize the node.
183          let mut node = Self {
184              ledger: ledger.clone(),
185              router,
186              rest: None,
187              sync: sync.clone(),
188              genesis,
189              ping,
190              puzzle: ledger.puzzle().clone(),
191              solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
192              deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
193              execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
194              num_verifying_solutions: Default::default(),
195              num_verifying_deploys: Default::default(),
196              num_verifying_executions: Default::default(),
197              handles: Default::default(),
198              signal_handler: signal_handler.clone(),
199          };
200  
201          // Perform sync with CDN (if enabled).
202          let cdn_sync = cdn.map(|base_url| {
203              trace!("CDN sync is enabled");
204              Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
205          });
206  
207          // Initialize the REST server.
208          if let Some(rest_ip) = rest_ip {
209              node.rest = Some(
210                  Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
211                      .await?,
212              );
213          }
214  
215          // Set up everything else after CDN sync is done.
216          if let Some(cdn_sync) = cdn_sync {
217              if let Err(error) = cdn_sync.wait().await {
218                  crate::log_clean_error(&storage_mode);
219                  node.shut_down().await;
220                  return Err(error);
221              }
222          }
223  
224          // Initialize the routing.
225          node.initialize_routing().await;
226          // Initialize the sync module.
227          node.initialize_sync();
228          // Initialize solution verification.
229          node.initialize_solution_verification();
230          // Initialize deployment verification.
231          node.initialize_deploy_verification();
232          // Initialize execution verification.
233          node.initialize_execute_verification();
234          // Initialize the notification message loop.
235          node.handles.lock().push(crate::start_notification_message_loop());
236          // Return the node.
237          Ok(node)
238      }
239  
240      /// Returns the ledger.
241      pub fn ledger(&self) -> &Ledger<N, C> {
242          &self.ledger
243      }
244  
245      /// Returns the REST server.
246      pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
247          &self.rest
248      }
249  
250      /// Returns the router.
251      pub fn router(&self) -> &Router<N> {
252          &self.router
253      }
254  }
255  
256  /// Sync-specific code.
257  impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
258      /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
259      /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
260      const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
261  
262      /// Spawns the tasks that performs the syncing logic for this client.
263      fn initialize_sync(&self) {
264          // Start the block request generation loop (outgoing).
265          let self_ = self.clone();
266          self.spawn(async move {
267              while !self_.signal_handler.is_stopped() {
268                  // Perform the sync routine.
269                  self_.try_issuing_block_requests().await;
270              }
271  
272              info!("Stopped block request generation");
273          });
274  
275          // Start the block response processing loop (incoming).
276          let self_ = self.clone();
277          self.spawn(async move {
278              while !self_.signal_handler.is_stopped() {
279                  // Wait until there is something to do or until the timeout.
280                  let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
281  
282                  // Perform the sync routine.
283                  self_.try_advancing_block_synchronization().await;
284  
285                  // We perform no additional rate limiting here as
286                  // requests are already rate-limited.
287              }
288  
289              debug!("Stopped block response processing");
290          });
291      }
292  
293      /// Client-side version of [`snarkvm_node_bft::Sync::try_advancing_block_synchronization`].
294      async fn try_advancing_block_synchronization(&self) {
295          let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
296              Ok(val) => val,
297              Err(err) => {
298                  error!("Block synchronization failed - {err}");
299                  return;
300              }
301          };
302  
303          // If there are new blocks, we need to update the block locators.
304          if has_new_blocks {
305              match self.sync.get_block_locators() {
306                  Ok(locators) => self.ping.update_block_locators(locators),
307                  Err(err) => error!("Failed to get block locators: {err}"),
308              }
309          }
310      }
311  
312      /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`.
313      async fn try_issuing_block_requests(&self) {
314          // Wait for peer updates or timeout
315          let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await;
316  
317          // For sanity, check that sync height is never below ledger height.
318          // (if the ledger height is lower or equal to the current sync height, this is a noop)
319          self.sync.set_sync_height(self.ledger.latest_height());
320  
321          match self.sync.handle_block_request_timeouts(&self.router) {
322              Ok(Some((requests, sync_peers))) => {
323                  // Re-request blocks instead of performing regular block sync.
324                  self.send_block_requests(requests, sync_peers).await;
325                  return;
326              }
327              Ok(None) => {}
328              Err(err) => {
329                  // Abort and retry later.
330                  error!("{}", flatten_error(&err));
331                  return;
332              }
333          }
334  
335          // Do not attempt to sync if there are not blocks to sync.
336          // This prevents redundant log messages and performing unnecessary computation.
337          if !self.sync.can_block_sync() {
338              trace!("Nothing to sync. Will not issue new block requests");
339              return;
340          }
341  
342          // First, try to advance the ledger with new responses.
343          let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
344              Ok(val) => val,
345              Err(err) => {
346                  error!("{err}");
347                  return;
348              }
349          };
350  
351          if has_new_blocks {
352              match self.sync.get_block_locators() {
353                  Ok(locators) => self.ping.update_block_locators(locators),
354                  Err(err) => error!("Failed to get block locators: {err}"),
355              }
356  
357              // If these were the last blocks to process, do not continue.
358              if !self.sync.can_block_sync() {
359                  return;
360              }
361          }
362  
363          // Prepare the block requests, if any.
364          // In the process, we update the state of `is_block_synced` for the sync module.
365          let (block_requests, sync_peers) = self.sync.prepare_block_requests();
366  
367          // If there are no block requests, but there are pending block responses in the sync pool,
368          // then try to advance the ledger using these pending block responses.
369          if block_requests.is_empty() {
370              let total_requests = self.sync.num_total_block_requests();
371              let num_outstanding = self.sync.num_outstanding_block_requests();
372              if total_requests > 0 {
373                  trace!(
374                      "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
375                  );
376              } else {
377                  // This can happen during peer rotation and should not be a warning.
378                  debug!(
379                      "Not block synced yet, and there are no outstanding block requests or \
380                   new block requests to send"
381                  );
382              }
383          } else {
384              self.send_block_requests(block_requests, sync_peers).await;
385          }
386      }
387  
388      async fn send_block_requests(
389          &self,
390          block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
391          sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
392      ) {
393          // Issues the block requests in batches.
394          for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
395              if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await {
396                  // Stop if we fail to process a batch of requests.
397                  break;
398              }
399  
400              // Sleep to avoid triggering spam detection.
401              tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
402          }
403      }
404  
405      /// Initializes solution verification.
406      fn initialize_solution_verification(&self) {
407          // Start the solution verification loop.
408          let node = self.clone();
409          self.spawn(async move {
410              loop {
411                  // If the Ctrl-C handler registered the signal, stop the node.
412                  if node.signal_handler.is_stopped() {
413                      info!("Shutting down solution verification");
414                      break;
415                  }
416  
417                  // Determine if the queue contains txs to verify.
418                  let queue_is_empty = node.solution_queue.lock().is_empty();
419                  // Determine if our verification counter has space to verify new solutions.
420                  let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
421  
422                  // Sleep to allow the queue to be filled or solutions to be validated.
423                  if queue_is_empty || counter_is_full {
424                      sleep(Duration::from_millis(50)).await;
425                      continue;
426                  }
427  
428                  // Try to verify solutions.
429                  let mut solution_queue = node.solution_queue.lock();
430                  while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
431                      // Increment the verification counter.
432                      let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
433                      let _node = node.clone();
434                      // For each solution, spawn a task to verify it.
435                      tokio::task::spawn_blocking(move || {
436                          // Retrieve the latest epoch hash.
437                          if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
438                              // Check if the prover has reached their solution limit.
439                              // While snarkVM will ultimately abort any excess solutions for safety, performing this check
440                              // here prevents the to-be aborted solutions from propagating through the network.
441                              let prover_address = solution.address();
442                              if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
443                                  debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
444                              }
445                              // Retrieve the latest proof target.
446                              let proof_target = _node.ledger.latest_block().header().proof_target();
447                              // Ensure that the solution is valid for the given epoch.
448                              let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
449  
450                              match is_valid {
451                                  // If the solution is valid, propagate the `UnconfirmedSolution`.
452                                  Ok(()) => {
453                                      let message = Message::UnconfirmedSolution(serialized);
454                                      // Propagate the "UnconfirmedSolution".
455                                      _node.propagate(message, &[peer_ip]);
456                                  }
457                                  // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
458                                  Err(error) => {
459                                      if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
460                                          debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
461                                      }
462                                  }
463                              }
464                          } else {
465                              warn!("Failed to retrieve the latest epoch hash.");
466                          }
467                          // Decrement the verification counter.
468                          _node.num_verifying_solutions.fetch_sub(1, Relaxed);
469                      });
470                      // If we are already at capacity, don't verify more solutions.
471                      if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
472                          break;
473                      }
474                  }
475              }
476          });
477      }
478  
479      /// Initializes deploy verification.
480      fn initialize_deploy_verification(&self) {
481          // Start the deploy verification loop.
482          let node = self.clone();
483          self.spawn(async move {
484              loop {
485                  // If the Ctrl-C handler registered the signal, stop the node.
486                  if node.signal_handler.is_stopped() {
487                      info!("Shutting down deployment verification");
488                      break;
489                  }
490  
491                  // Determine if the queue contains txs to verify.
492                  let queue_is_empty = node.deploy_queue.lock().is_empty();
493                  // Determine if our verification counter has space to verify new txs.
494                  let counter_is_full =
495                      node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
496  
497                  // Sleep to allow the queue to be filled or transactions to be validated.
498                  if queue_is_empty || counter_is_full {
499                      sleep(Duration::from_millis(50)).await;
500                      continue;
501                  }
502  
503                  // Try to verify deployments.
504                  while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
505                      // Increment the verification counter.
506                      let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
507                      let _node = node.clone();
508                      // For each deployment, spawn a task to verify it.
509                      tokio::task::spawn_blocking(move || {
510                          // First collect the state root.
511                          let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
512                              debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
513                              _node.num_verifying_deploys.fetch_sub(1, Relaxed);
514                              return;
515                          };
516                          // Check if the state root is in the ledger.
517                          if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
518                              debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
519                              // Propagate the `UnconfirmedTransaction`.
520                              _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
521                              _node.num_verifying_deploys.fetch_sub(1, Relaxed);
522                              return;
523                              // Also skip the `check_transaction_basic` call if it is already propagated.
524                          }
525                          // Check the deployment.
526                          match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
527                              Ok(_) => {
528                                  // Propagate the `UnconfirmedTransaction`.
529                                  _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
530                              }
531                              Err(error) => {
532                                  debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
533                              }
534                          }
535                          // Decrement the verification counter.
536                          _node.num_verifying_deploys.fetch_sub(1, Relaxed);
537                      });
538                      // If we are already at capacity, don't verify more deployments.
539                      if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
540                          break;
541                      }
542                  }
543              }
544          });
545      }
546  
547      /// Initializes execute verification.
548      fn initialize_execute_verification(&self) {
549          // Start the execute verification loop.
550          let node = self.clone();
551          self.spawn(async move {
552              loop {
553                  // If the Ctrl-C handler registered the signal, stop the node.
554                  if node.signal_handler.is_stopped() {
555                      info!("Shutting down execution verification");
556                      break;
557                  }
558  
559                  // Determine if the queue contains txs to verify.
560                  let queue_is_empty = node.execute_queue.lock().is_empty();
561                  // Determine if our verification counter has space to verify new txs.
562                  let counter_is_full =
563                      node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
564  
565                  // Sleep to allow the queue to be filled or transactions to be validated.
566                  if queue_is_empty || counter_is_full {
567                      sleep(Duration::from_millis(50)).await;
568                      continue;
569                  }
570  
571                  // Try to verify executions.
572                  while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
573                      // Increment the verification counter.
574                      let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
575                      let _node = node.clone();
576                      // For each execution, spawn a task to verify it.
577                      tokio::task::spawn_blocking(move || {
578                          // First collect the state roots.
579                          let state_roots = [
580                              transaction.execution().map(|t| t.global_state_root()),
581                              transaction.fee_transition().map(|t| t.global_state_root()),
582                          ]
583                          .into_iter()
584                          .flatten();
585  
586                          for state_root in state_roots {
587                              if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
588                                  debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
589                                  // Propagate the `UnconfirmedTransaction`.
590                                  _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
591                                  _node.num_verifying_executions.fetch_sub(1, Relaxed);
592                                  return;
593                                  // Also skip the `check_transaction_basic` call if it is already propagated.
594                              }
595                          }
596                          // Check the execution.
597                          match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
598                              Ok(_) => {
599                                  // Propagate the `UnconfirmedTransaction`.
600                                  _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
601                              }
602                              Err(error) => {
603                                  debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
604                              }
605                          }
606                          // Decrement the verification counter.
607                          _node.num_verifying_executions.fetch_sub(1, Relaxed);
608                      });
609                      // If we are already at capacity, don't verify more executions.
610                      if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
611                          break;
612                      }
613                  }
614              }
615          });
616      }
617  
618      /// Spawns a task with the given future; it should only be used for long-running tasks.
619      pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
620          self.handles.lock().push(tokio::spawn(future));
621      }
622  }
623  
624  #[async_trait]
625  impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
626      /// Shuts down the node.
627      async fn shut_down(&self) {
628          info!("Shutting down...");
629  
630          // Shut down the node.
631          trace!("Shutting down the node...");
632  
633          // Abort the tasks.
634          trace!("Shutting down the client...");
635          self.handles.lock().iter().for_each(|handle| handle.abort());
636  
637          // Shut down the router.
638          self.router.shut_down().await;
639  
640          info!("Node has shut down.");
641      }
642  }