/ node / src / validator / mod.rs
mod.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  mod router;
 20  
 21  use crate::traits::NodeInterface;
 22  
 23  use alphaos_account::Account;
 24  use alphaos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
 25  use alphaos_node_cdn::CdnBlockSync;
 26  use alphaos_node_consensus::{Consensus, IpcEventHandler};
 27  use alphaos_node_network::{NodeType, PeerPoolHandling};
 28  use alphaos_node_rest::Rest;
 29  use alphaos_node_router::{
 30      messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
 31      Heartbeat,
 32      Inbound,
 33      Outbound,
 34      Router,
 35      Routing,
 36  };
 37  use alphaos_node_sync::{BlockSync, Ping};
 38  use alphaos_node_tcp::{
 39      protocols::{Disconnect, Handshake, OnConnect, Reading},
 40      P2P,
 41  };
 42  use alphaos_utilities::SignalHandler;
 43  
 44  use alphavm::prelude::{block::Block, puzzle::Solution, store::ConsensusStorage, Ledger, Network};
 45  
 46  use alphastd::StorageMode;
 47  use anyhow::{Context, Result};
 48  use core::future::Future;
 49  #[cfg(feature = "locktick")]
 50  use locktick::parking_lot::Mutex;
 51  #[cfg(not(feature = "locktick"))]
 52  use parking_lot::Mutex;
 53  use std::{net::SocketAddr, sync::Arc, time::Duration};
 54  use tokio::task::JoinHandle;
 55  
 56  /// A validator is a full node, capable of validating blocks.
 57  #[derive(Clone)]
 58  pub struct Validator<N: Network, C: ConsensusStorage<N>> {
 59      /// The ledger of the node.
 60      ledger: Ledger<N, C>,
 61      /// The consensus module of the node.
 62      consensus: Consensus<N>,
 63      /// The router of the node.
 64      router: Router<N>,
 65      /// The REST server of the node.
 66      rest: Option<Rest<N, C, Self>>,
 67      /// The block synchronization logic (used in the Router impl).
 68      sync: Arc<BlockSync<N>>,
 69      /// The spawned handles.
 70      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
 71      /// Keeps track of sending pings.
 72      ping: Arc<Ping<N>>,
 73  }
 74  
 75  impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
 76      /// Initializes a new validator node.
 77      pub async fn new(
 78          node_ip: SocketAddr,
 79          bft_ip: Option<SocketAddr>,
 80          rest_ip: Option<SocketAddr>,
 81          rest_rps: u32,
 82          account: Account<N>,
 83          trusted_peers: &[SocketAddr],
 84          trusted_validators: &[SocketAddr],
 85          genesis: Block<N>,
 86          cdn: Option<http::Uri>,
 87          storage_mode: StorageMode,
 88          trusted_peers_only: bool,
 89          dev_txs: bool,
 90          dev: Option<u16>,
 91          signal_handler: Arc<SignalHandler>,
 92      ) -> Result<Self> {
 93          Self::with_ipc_handler(
 94              node_ip,
 95              bft_ip,
 96              rest_ip,
 97              rest_rps,
 98              account,
 99              trusted_peers,
100              trusted_validators,
101              genesis,
102              cdn,
103              storage_mode,
104              trusted_peers_only,
105              dev_txs,
106              dev,
107              signal_handler,
108              None,
109          )
110          .await
111      }
112  
113      /// Initializes a new validator node with an optional IPC handler for cross-chain communication.
114      ///
115      /// The IPC handler enables communication between Alpha and Delta chains via the adnet runtime.
116      #[allow(clippy::too_many_arguments)]
117      pub async fn with_ipc_handler(
118          node_ip: SocketAddr,
119          bft_ip: Option<SocketAddr>,
120          rest_ip: Option<SocketAddr>,
121          rest_rps: u32,
122          account: Account<N>,
123          trusted_peers: &[SocketAddr],
124          trusted_validators: &[SocketAddr],
125          genesis: Block<N>,
126          cdn: Option<http::Uri>,
127          storage_mode: StorageMode,
128          trusted_peers_only: bool,
129          dev_txs: bool,
130          dev: Option<u16>,
131          signal_handler: Arc<SignalHandler>,
132          ipc_handler: Option<Arc<dyn IpcEventHandler<N>>>,
133      ) -> Result<Self> {
134          // Initialize the ledger.
135          let ledger = {
136              let storage_mode = storage_mode.clone();
137              let genesis = genesis.clone();
138  
139              spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
140          }
141          .with_context(|| "Failed to initialize the ledger")?;
142  
143          // Initialize the ledger service.
144          let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone()));
145  
146          // Initialize the node router.
147          let router = Router::new(
148              node_ip,
149              NodeType::Validator,
150              account.clone(),
151              ledger_service.clone(),
152              trusted_peers,
153              Self::MAXIMUM_NUMBER_OF_PEERS as u16,
154              trusted_peers_only,
155              storage_mode.clone(),
156              dev.is_some(),
157          )
158          .await?;
159  
160          // Initialize the block synchronization logic.
161          let sync = Arc::new(BlockSync::new(ledger_service.clone()));
162          let locators = sync.get_block_locators()?;
163          let ping = Arc::new(Ping::new(router.clone(), locators));
164  
165          // Initialize the consensus layer.
166          let consensus = Consensus::with_ipc_handler(
167              account.clone(),
168              ledger_service.clone(),
169              sync.clone(),
170              bft_ip,
171              trusted_validators,
172              trusted_peers_only,
173              storage_mode.clone(),
174              ping.clone(),
175              dev,
176              ipc_handler,
177          )
178          .await?;
179  
180          // Initialize the node.
181          let mut node = Self {
182              ledger: ledger.clone(),
183              consensus: consensus.clone(),
184              router,
185              rest: None,
186              sync: sync.clone(),
187              ping,
188              handles: Default::default(),
189          };
190  
191          // Perform sync with CDN (if enabled).
192          let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)));
193  
194          // Initialize the transaction pool.
195          node.initialize_transaction_pool(dev, dev_txs)?;
196  
197          // Initialize the REST server.
198          if let Some(rest_ip) = rest_ip {
199              node.rest = Some(
200                  Rest::start(
201                      rest_ip,
202                      rest_rps,
203                      Some(consensus),
204                      ledger.clone(),
205                      Arc::new(node.clone()),
206                      cdn_sync.clone(),
207                      sync,
208                  )
209                  .await?,
210              );
211          }
212  
213          // Set up everything else after CDN sync is done.
214          if let Some(cdn_sync) = cdn_sync {
215              if let Err(error) = cdn_sync.wait().await {
216                  crate::log_clean_error(&storage_mode);
217                  node.shut_down().await;
218                  return Err(error);
219              }
220          }
221  
222          // Initialize the routing.
223          node.initialize_routing().await;
224          // Initialize the notification message loop.
225          node.handles.lock().push(crate::start_notification_message_loop());
226  
227          // Return the node.
228          Ok(node)
229      }
230  
231      /// Returns the ledger.
232      pub fn ledger(&self) -> &Ledger<N, C> {
233          &self.ledger
234      }
235  
236      /// Returns the REST server.
237      pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
238          &self.rest
239      }
240  
241      /// Returns the router.
242      pub fn router(&self) -> &Router<N> {
243          &self.router
244      }
245  
246      // /// Initialize the transaction pool.
247      // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
248      //     use alphavm::{
249      //         console::{
250      //             account::ViewKey,
251      //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
252      //             types::U64,
253      //         },
254      //         ledger::block::transition::Output,
255      //     };
256      //     use std::str::FromStr;
257      //
258      //     // Initialize the locator.
259      //     let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("split")?);
260      //     // Initialize the record name.
261      //     let record_name = Identifier::from_str("credits")?;
262      //
263      //     /// Searches the genesis block for the mint record.
264      //     fn search_genesis_for_mint<N: Network>(
265      //         block: Block<N>,
266      //         view_key: &ViewKey<N>,
267      //     ) -> Option<Record<N, Plaintext<N>>> {
268      //         for transition in block.transitions().filter(|t| t.is_mint()) {
269      //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
270      //                 if ciphertext.is_owner(view_key) {
271      //                     match ciphertext.decrypt(view_key) {
272      //                         Ok(record) => return Some(record),
273      //                         Err(error) => {
274      //                             error!("Failed to decrypt the mint output record - {error}");
275      //                             return None;
276      //                         }
277      //                     }
278      //                 }
279      //             }
280      //         }
281      //         None
282      //     }
283      //
284      //     /// Searches the block for the split record.
285      //     fn search_block_for_split<N: Network>(
286      //         block: Block<N>,
287      //         view_key: &ViewKey<N>,
288      //     ) -> Option<Record<N, Plaintext<N>>> {
289      //         let mut found = None;
290      //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
291      //         // block.transitions().rev().for_each(|t| {
292      //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
293      //         splits.iter().rev().for_each(|t| {
294      //             if found.is_some() {
295      //                 return;
296      //             }
297      //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
298      //                 error!("Failed to find the split output record");
299      //                 return;
300      //             };
301      //             if ciphertext.is_owner(view_key) {
302      //                 match ciphertext.decrypt(view_key) {
303      //                     Ok(record) => found = Some(record),
304      //                     Err(error) => {
305      //                         error!("Failed to decrypt the split output record - {error}");
306      //                     }
307      //                 }
308      //             }
309      //         });
310      //         found
311      //     }
312      //
313      //     let self_ = self.clone();
314      //     self.spawn(async move {
315      //         // Retrieve the view key.
316      //         let view_key = self_.view_key();
317      //         // Initialize the record.
318      //         let mut record = {
319      //             let mut found = None;
320      //             let mut height = self_.ledger.latest_height();
321      //             while found.is_none() && height > 0 {
322      //                 // Retrieve the block.
323      //                 let Ok(block) = self_.ledger.get_block(height) else {
324      //                     error!("Failed to get block at height {}", height);
325      //                     break;
326      //                 };
327      //                 // Search for the latest split record.
328      //                 if let Some(record) = search_block_for_split(block, view_key) {
329      //                     found = Some(record);
330      //                 }
331      //                 // Decrement the height.
332      //                 height = height.saturating_sub(1);
333      //             }
334      //             match found {
335      //                 Some(record) => record,
336      //                 None => {
337      //                     // Retrieve the genesis block.
338      //                     let Ok(block) = self_.ledger.get_block(0) else {
339      //                         error!("Failed to get the genesis block");
340      //                         return;
341      //                     };
342      //                     // Search the genesis block for the mint record.
343      //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
344      //                         found = Some(record);
345      //                     }
346      //                     found.expect("Failed to find the split output record")
347      //                 }
348      //             }
349      //         };
350      //         info!("Starting transaction pool...");
351      //         // Start the transaction loop.
352      //         loop {
353      //             tokio::time::sleep(Duration::from_secs(1)).await;
354      //             // If the node is running in development mode, only generate if you are allowed.
355      //             if let Some(dev) = dev {
356      //                 if dev != 0 {
357      //                     continue;
358      //                 }
359      //             }
360      //
361      //             // Prepare the inputs.
362      //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
363      //             // Execute the transaction.
364      //             let transaction = match self_.ledger.vm().execute(
365      //                 self_.private_key(),
366      //                 locator,
367      //                 inputs,
368      //                 None,
369      //                 None,
370      //                 &mut rand::thread_rng(),
371      //             ) {
372      //                 Ok(transaction) => transaction,
373      //                 Err(error) => {
374      //                     error!("Transaction pool encountered an execution error - {error}");
375      //                     continue;
376      //                 }
377      //             };
378      //             // Retrieve the transition.
379      //             let Some(transition) = transaction.transitions().next() else {
380      //                 error!("Transaction pool encountered a missing transition");
381      //                 continue;
382      //             };
383      //             // Retrieve the second output.
384      //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
385      //                 error!("Transaction pool encountered a missing output");
386      //                 continue;
387      //             };
388      //             // Save the second output record.
389      //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
390      //                 error!("Transaction pool encountered a decryption error");
391      //                 continue;
392      //             };
393      //             // Broadcast the transaction.
394      //             if self_
395      //                 .unconfirmed_transaction(
396      //                     self_.router.local_ip(),
397      //                     UnconfirmedTransaction::from(transaction.clone()),
398      //                     transaction.clone(),
399      //                 )
400      //                 .await
401      //             {
402      //                 info!("Transaction pool broadcasted the transaction");
403      //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
404      //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
405      //                     tokio::time::sleep(Duration::from_secs(1)).await;
406      //                 }
407      //                 info!("Transaction accepted by the ledger");
408      //             }
409      //             // Save the record.
410      //             record = next_record;
411      //         }
412      //     });
413      //     Ok(())
414      // }
415  
416      /// Initializes the transaction pool (if in development mode).
417      ///
418      /// Spawns a background task that periodically issues transactions to the network.
419      fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
420          use alphavm::console::{
421              program::{Identifier, Literal, ProgramID, Value},
422              types::U64,
423          };
424          use std::str::FromStr;
425  
426          // Initialize the locator.
427          let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("transfer_public")?);
428  
429          // Determine whether to start the loop.
430          match dev {
431              // If the node is running in development mode, only generate if you are allowed.
432              Some(id) => {
433                  // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
434                  if id != 0 || !dev_txs {
435                      return Ok(());
436                  }
437              }
438              // If the node is not running in development mode, do not generate dev traffic.
439              _ => return Ok(()),
440          }
441  
442          let self_ = self.clone();
443          self.spawn(async move {
444              tokio::time::sleep(Duration::from_secs(3)).await;
445              info!("Starting transaction pool...");
446  
447              // Start the transaction loop.
448              loop {
449                  tokio::time::sleep(Duration::from_millis(500)).await;
450  
451                  // Prepare the inputs.
452                  let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
453                  // Execute the transaction.
454                  let self__ = self_.clone();
455                  let transaction = match spawn_blocking!(self__.ledger.vm().execute(
456                      self__.private_key(),
457                      locator,
458                      inputs.into_iter(),
459                      None,
460                      10_000,
461                      None,
462                      &mut rand::thread_rng(),
463                  )) {
464                      Ok(transaction) => transaction,
465                      Err(error) => {
466                          error!("Transaction pool encountered an execution error - {error}");
467                          continue;
468                      }
469                  };
470                  // Broadcast the transaction.
471                  if self_
472                      .unconfirmed_transaction(
473                          self_.router.local_ip(),
474                          UnconfirmedTransaction::from(transaction.clone()),
475                          transaction.clone(),
476                      )
477                      .await
478                  {
479                      info!("Transaction pool broadcasted the transaction");
480                  }
481              }
482          });
483          Ok(())
484      }
485  
486      /// Spawns a task with the given future; it should only be used for long-running tasks.
487      pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
488          self.handles.lock().push(tokio::spawn(future));
489      }
490  }
491  
492  #[async_trait]
493  impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
494      /// Shuts down the node.
495      async fn shut_down(&self) {
496          info!("Shutting down...");
497  
498          // Shut down the node.
499          trace!("Shutting down the node...");
500  
501          // Abort the tasks.
502          trace!("Shutting down the validator...");
503          self.handles.lock().iter().for_each(|handle| handle.abort());
504  
505          // Shut down the router.
506          self.router.shut_down().await;
507  
508          // Shut down consensus.
509          trace!("Shutting down consensus...");
510          self.consensus.shut_down().await;
511  
512          info!("Node has shut down.");
513      }
514  }
515  
516  #[cfg(test)]
517  mod tests {
518      use super::*;
519      use alphavm::prelude::{
520          store::{helpers::memory::ConsensusMemory, ConsensusStore},
521          MainnetV0,
522          VM,
523      };
524  
525      use anyhow::bail;
526      use rand::SeedableRng;
527      use rand_chacha::ChaChaRng;
528      use std::str::FromStr;
529  
530      type CurrentNetwork = MainnetV0;
531  
532      /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
533      #[ignore]
534      #[tokio::test]
535      async fn test_profiler() -> Result<()> {
536          // Specify the node attributes.
537          let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
538          let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
539          let storage_mode = StorageMode::Development(0);
540          let dev_txs = true;
541  
542          // Initialize an (insecure) fixed RNG.
543          let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
544          // Initialize the account.
545          let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
546          // Initialize a new VM.
547          let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
548              StorageMode::new_test(None),
549          )?)?;
550          // Initialize the genesis block.
551          let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
552  
553          println!("Initializing validator node...");
554  
555          let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
556              node,
557              None,
558              Some(rest),
559              10,
560              account,
561              &[],
562              &[],
563              genesis,
564              None,
565              storage_mode,
566              false,
567              dev_txs,
568              None,
569              SignalHandler::new(),
570          )
571          .await
572          .unwrap();
573  
574          println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
575  
576          bail!("\n\nRemember to #[ignore] this test!\n\n")
577      }
578  }