/ node / src / validator / mod.rs
mod.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  mod router;
 17  
 18  use crate::traits::NodeInterface;
 19  
 20  use alphaos_account::Account;
 21  use alphaos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
 22  use alphaos_node_cdn::CdnBlockSync;
 23  use alphaos_node_consensus::Consensus;
 24  use alphaos_node_network::{NodeType, PeerPoolHandling};
 25  use alphaos_node_rest::Rest;
 26  use alphaos_node_router::{
 27      Heartbeat,
 28      Inbound,
 29      Outbound,
 30      Router,
 31      Routing,
 32      messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
 33  };
 34  use alphaos_node_sync::{BlockSync, Ping};
 35  use alphaos_node_tcp::{
 36      P2P,
 37      protocols::{Disconnect, Handshake, OnConnect, Reading},
 38  };
 39  use alphaos_utilities::SignalHandler;
 40  
 41  use alphavm::prelude::{
 42      Ledger,
 43      Network,
 44      block::{Block, Header},
 45      puzzle::Solution,
 46      store::ConsensusStorage,
 47  };
 48  
 49  use alpha_std::StorageMode;
 50  use anyhow::{Context, Result};
 51  use core::future::Future;
 52  #[cfg(feature = "locktick")]
 53  use locktick::parking_lot::Mutex;
 54  #[cfg(not(feature = "locktick"))]
 55  use parking_lot::Mutex;
 56  use std::{net::SocketAddr, sync::Arc, time::Duration};
 57  use tokio::task::JoinHandle;
 58  
 59  /// A validator is a full node, capable of validating blocks.
 60  #[derive(Clone)]
 61  pub struct Validator<N: Network, C: ConsensusStorage<N>> {
 62      /// The ledger of the node.
 63      ledger: Ledger<N, C>,
 64      /// The consensus module of the node.
 65      consensus: Consensus<N>,
 66      /// The router of the node.
 67      router: Router<N>,
 68      /// The REST server of the node.
 69      rest: Option<Rest<N, C, Self>>,
 70      /// The block synchronization logic (used in the Router impl).
 71      sync: Arc<BlockSync<N>>,
 72      /// The spawned handles.
 73      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
 74      /// Keeps track of sending pings.
 75      ping: Arc<Ping<N>>,
 76  }
 77  
 78  impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
 79      /// Initializes a new validator node.
 80      pub async fn new(
 81          node_ip: SocketAddr,
 82          bft_ip: Option<SocketAddr>,
 83          rest_ip: Option<SocketAddr>,
 84          rest_rps: u32,
 85          account: Account<N>,
 86          trusted_peers: &[SocketAddr],
 87          trusted_validators: &[SocketAddr],
 88          genesis: Block<N>,
 89          cdn: Option<http::Uri>,
 90          storage_mode: StorageMode,
 91          trusted_peers_only: bool,
 92          dev_txs: bool,
 93          dev: Option<u16>,
 94          signal_handler: Arc<SignalHandler>,
 95      ) -> Result<Self> {
 96          // Initialize the ledger.
 97          let ledger = {
 98              let storage_mode = storage_mode.clone();
 99              let genesis = genesis.clone();
100  
101              spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
102          }
103          .with_context(|| "Failed to initialize the ledger")?;
104  
105          // Initialize the ledger service.
106          let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone()));
107  
108          // Initialize the node router.
109          let router = Router::new(
110              node_ip,
111              NodeType::Validator,
112              account.clone(),
113              ledger_service.clone(),
114              trusted_peers,
115              Self::MAXIMUM_NUMBER_OF_PEERS as u16,
116              trusted_peers_only,
117              storage_mode.clone(),
118              dev.is_some(),
119          )
120          .await?;
121  
122          // Initialize the block synchronization logic.
123          let sync = Arc::new(BlockSync::new(ledger_service.clone()));
124          let locators = sync.get_block_locators()?;
125          let ping = Arc::new(Ping::new(router.clone(), locators));
126  
127          // Initialize the consensus layer.
128          let consensus = Consensus::new(
129              account.clone(),
130              ledger_service.clone(),
131              sync.clone(),
132              bft_ip,
133              trusted_validators,
134              trusted_peers_only,
135              storage_mode.clone(),
136              ping.clone(),
137              dev,
138          )
139          .await?;
140  
141          // Initialize the node.
142          let mut node = Self {
143              ledger: ledger.clone(),
144              consensus: consensus.clone(),
145              router,
146              rest: None,
147              sync: sync.clone(),
148              ping,
149              handles: Default::default(),
150          };
151  
152          // Perform sync with CDN (if enabled).
153          let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)));
154  
155          // Initialize the transaction pool.
156          node.initialize_transaction_pool(dev, dev_txs)?;
157  
158          // Initialize the REST server.
159          if let Some(rest_ip) = rest_ip {
160              node.rest = Some(
161                  Rest::start(
162                      rest_ip,
163                      rest_rps,
164                      Some(consensus),
165                      ledger.clone(),
166                      Arc::new(node.clone()),
167                      cdn_sync.clone(),
168                      sync,
169                  )
170                  .await?,
171              );
172          }
173  
174          // Set up everything else after CDN sync is done.
175          if let Some(cdn_sync) = cdn_sync {
176              if let Err(error) = cdn_sync.wait().await {
177                  crate::log_clean_error(&storage_mode);
178                  node.shut_down().await;
179                  return Err(error);
180              }
181          }
182  
183          // Initialize the routing.
184          node.initialize_routing().await;
185          // Initialize the notification message loop.
186          node.handles.lock().push(crate::start_notification_message_loop());
187  
188          // Return the node.
189          Ok(node)
190      }
191  
192      /// Returns the ledger.
193      pub fn ledger(&self) -> &Ledger<N, C> {
194          &self.ledger
195      }
196  
197      /// Returns the REST server.
198      pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
199          &self.rest
200      }
201  
202      /// Returns the router.
203      pub fn router(&self) -> &Router<N> {
204          &self.router
205      }
206  
207      // /// Initialize the transaction pool.
208      // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
209      //     use alphavm::{
210      //         console::{
211      //             account::ViewKey,
212      //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
213      //             types::U64,
214      //         },
215      //         ledger::block::transition::Output,
216      //     };
217      //     use std::str::FromStr;
218      //
219      //     // Initialize the locator.
220      //     let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("split")?);
221      //     // Initialize the record name.
222      //     let record_name = Identifier::from_str("credits")?;
223      //
224      //     /// Searches the genesis block for the mint record.
225      //     fn search_genesis_for_mint<N: Network>(
226      //         block: Block<N>,
227      //         view_key: &ViewKey<N>,
228      //     ) -> Option<Record<N, Plaintext<N>>> {
229      //         for transition in block.transitions().filter(|t| t.is_mint()) {
230      //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
231      //                 if ciphertext.is_owner(view_key) {
232      //                     match ciphertext.decrypt(view_key) {
233      //                         Ok(record) => return Some(record),
234      //                         Err(error) => {
235      //                             error!("Failed to decrypt the mint output record - {error}");
236      //                             return None;
237      //                         }
238      //                     }
239      //                 }
240      //             }
241      //         }
242      //         None
243      //     }
244      //
245      //     /// Searches the block for the split record.
246      //     fn search_block_for_split<N: Network>(
247      //         block: Block<N>,
248      //         view_key: &ViewKey<N>,
249      //     ) -> Option<Record<N, Plaintext<N>>> {
250      //         let mut found = None;
251      //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
252      //         // block.transitions().rev().for_each(|t| {
253      //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
254      //         splits.iter().rev().for_each(|t| {
255      //             if found.is_some() {
256      //                 return;
257      //             }
258      //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
259      //                 error!("Failed to find the split output record");
260      //                 return;
261      //             };
262      //             if ciphertext.is_owner(view_key) {
263      //                 match ciphertext.decrypt(view_key) {
264      //                     Ok(record) => found = Some(record),
265      //                     Err(error) => {
266      //                         error!("Failed to decrypt the split output record - {error}");
267      //                     }
268      //                 }
269      //             }
270      //         });
271      //         found
272      //     }
273      //
274      //     let self_ = self.clone();
275      //     self.spawn(async move {
276      //         // Retrieve the view key.
277      //         let view_key = self_.view_key();
278      //         // Initialize the record.
279      //         let mut record = {
280      //             let mut found = None;
281      //             let mut height = self_.ledger.latest_height();
282      //             while found.is_none() && height > 0 {
283      //                 // Retrieve the block.
284      //                 let Ok(block) = self_.ledger.get_block(height) else {
285      //                     error!("Failed to get block at height {}", height);
286      //                     break;
287      //                 };
288      //                 // Search for the latest split record.
289      //                 if let Some(record) = search_block_for_split(block, view_key) {
290      //                     found = Some(record);
291      //                 }
292      //                 // Decrement the height.
293      //                 height = height.saturating_sub(1);
294      //             }
295      //             match found {
296      //                 Some(record) => record,
297      //                 None => {
298      //                     // Retrieve the genesis block.
299      //                     let Ok(block) = self_.ledger.get_block(0) else {
300      //                         error!("Failed to get the genesis block");
301      //                         return;
302      //                     };
303      //                     // Search the genesis block for the mint record.
304      //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
305      //                         found = Some(record);
306      //                     }
307      //                     found.expect("Failed to find the split output record")
308      //                 }
309      //             }
310      //         };
311      //         info!("Starting transaction pool...");
312      //         // Start the transaction loop.
313      //         loop {
314      //             tokio::time::sleep(Duration::from_secs(1)).await;
315      //             // If the node is running in development mode, only generate if you are allowed.
316      //             if let Some(dev) = dev {
317      //                 if dev != 0 {
318      //                     continue;
319      //                 }
320      //             }
321      //
322      //             // Prepare the inputs.
323      //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
324      //             // Execute the transaction.
325      //             let transaction = match self_.ledger.vm().execute(
326      //                 self_.private_key(),
327      //                 locator,
328      //                 inputs,
329      //                 None,
330      //                 None,
331      //                 &mut rand::thread_rng(),
332      //             ) {
333      //                 Ok(transaction) => transaction,
334      //                 Err(error) => {
335      //                     error!("Transaction pool encountered an execution error - {error}");
336      //                     continue;
337      //                 }
338      //             };
339      //             // Retrieve the transition.
340      //             let Some(transition) = transaction.transitions().next() else {
341      //                 error!("Transaction pool encountered a missing transition");
342      //                 continue;
343      //             };
344      //             // Retrieve the second output.
345      //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
346      //                 error!("Transaction pool encountered a missing output");
347      //                 continue;
348      //             };
349      //             // Save the second output record.
350      //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
351      //                 error!("Transaction pool encountered a decryption error");
352      //                 continue;
353      //             };
354      //             // Broadcast the transaction.
355      //             if self_
356      //                 .unconfirmed_transaction(
357      //                     self_.router.local_ip(),
358      //                     UnconfirmedTransaction::from(transaction.clone()),
359      //                     transaction.clone(),
360      //                 )
361      //                 .await
362      //             {
363      //                 info!("Transaction pool broadcasted the transaction");
364      //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
365      //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
366      //                     tokio::time::sleep(Duration::from_secs(1)).await;
367      //                 }
368      //                 info!("Transaction accepted by the ledger");
369      //             }
370      //             // Save the record.
371      //             record = next_record;
372      //         }
373      //     });
374      //     Ok(())
375      // }
376  
377      /// Initializes the transaction pool (if in development mode).
378      ///
379      /// Spawns a background task that periodically issues transactions to the network.
380      fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
381          use alphavm::console::{
382              program::{Identifier, Literal, ProgramID, Value},
383              types::U64,
384          };
385          use std::str::FromStr;
386  
387          // Initialize the locator.
388          let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("transfer_public")?);
389  
390          // Determine whether to start the loop.
391          match dev {
392              // If the node is running in development mode, only generate if you are allowed.
393              Some(id) => {
394                  // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
395                  if id != 0 || !dev_txs {
396                      return Ok(());
397                  }
398              }
399              // If the node is not running in development mode, do not generate dev traffic.
400              _ => return Ok(()),
401          }
402  
403          let self_ = self.clone();
404          self.spawn(async move {
405              tokio::time::sleep(Duration::from_secs(3)).await;
406              info!("Starting transaction pool...");
407  
408              // Start the transaction loop.
409              loop {
410                  tokio::time::sleep(Duration::from_millis(500)).await;
411  
412                  // Prepare the inputs.
413                  let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
414                  // Execute the transaction.
415                  let self__ = self_.clone();
416                  let transaction = match spawn_blocking!(self__.ledger.vm().execute(
417                      self__.private_key(),
418                      locator,
419                      inputs.into_iter(),
420                      None,
421                      10_000,
422                      None,
423                      &mut rand::thread_rng(),
424                  )) {
425                      Ok(transaction) => transaction,
426                      Err(error) => {
427                          error!("Transaction pool encountered an execution error - {error}");
428                          continue;
429                      }
430                  };
431                  // Broadcast the transaction.
432                  if self_
433                      .unconfirmed_transaction(
434                          self_.router.local_ip(),
435                          UnconfirmedTransaction::from(transaction.clone()),
436                          transaction.clone(),
437                      )
438                      .await
439                  {
440                      info!("Transaction pool broadcasted the transaction");
441                  }
442              }
443          });
444          Ok(())
445      }
446  
447      /// Spawns a task with the given future; it should only be used for long-running tasks.
448      pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
449          self.handles.lock().push(tokio::spawn(future));
450      }
451  }
452  
453  #[async_trait]
454  impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
455      /// Shuts down the node.
456      async fn shut_down(&self) {
457          info!("Shutting down...");
458  
459          // Shut down the node.
460          trace!("Shutting down the node...");
461  
462          // Abort the tasks.
463          trace!("Shutting down the validator...");
464          self.handles.lock().iter().for_each(|handle| handle.abort());
465  
466          // Shut down the router.
467          self.router.shut_down().await;
468  
469          // Shut down consensus.
470          trace!("Shutting down consensus...");
471          self.consensus.shut_down().await;
472  
473          info!("Node has shut down.");
474      }
475  }
476  
477  #[cfg(test)]
478  mod tests {
479      use super::*;
480      use alphavm::prelude::{
481          MainnetV0,
482          VM,
483          store::{ConsensusStore, helpers::memory::ConsensusMemory},
484      };
485  
486      use anyhow::bail;
487      use rand::SeedableRng;
488      use rand_chacha::ChaChaRng;
489      use std::str::FromStr;
490  
491      type CurrentNetwork = MainnetV0;
492  
493      /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
494      #[ignore]
495      #[tokio::test]
496      async fn test_profiler() -> Result<()> {
497          // Specify the node attributes.
498          let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
499          let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
500          let storage_mode = StorageMode::Development(0);
501          let dev_txs = true;
502  
503          // Initialize an (insecure) fixed RNG.
504          let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
505          // Initialize the account.
506          let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
507          // Initialize a new VM.
508          let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
509              StorageMode::new_test(None),
510          )?)?;
511          // Initialize the genesis block.
512          let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
513  
514          println!("Initializing validator node...");
515  
516          let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
517              node,
518              None,
519              Some(rest),
520              10,
521              account,
522              &[],
523              &[],
524              genesis,
525              None,
526              storage_mode,
527              false,
528              dev_txs,
529              None,
530              SignalHandler::new(),
531          )
532          .await
533          .unwrap();
534  
535          println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
536  
537          bail!("\n\nRemember to #[ignore] this test!\n\n")
538      }
539  }