mod.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 mod router; 17 18 use crate::traits::NodeInterface; 19 20 use alphaos_account::Account; 21 use alphaos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking}; 22 use alphaos_node_cdn::CdnBlockSync; 23 use alphaos_node_consensus::Consensus; 24 use alphaos_node_network::{NodeType, PeerPoolHandling}; 25 use alphaos_node_rest::Rest; 26 use alphaos_node_router::{ 27 Heartbeat, 28 Inbound, 29 Outbound, 30 Router, 31 Routing, 32 messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction}, 33 }; 34 use alphaos_node_sync::{BlockSync, Ping}; 35 use alphaos_node_tcp::{ 36 P2P, 37 protocols::{Disconnect, Handshake, OnConnect, Reading}, 38 }; 39 use alphaos_utilities::SignalHandler; 40 41 use alphavm::prelude::{ 42 Ledger, 43 Network, 44 block::{Block, Header}, 45 puzzle::Solution, 46 store::ConsensusStorage, 47 }; 48 49 use alpha_std::StorageMode; 50 use anyhow::{Context, Result}; 51 use core::future::Future; 52 #[cfg(feature = "locktick")] 53 use locktick::parking_lot::Mutex; 54 #[cfg(not(feature = "locktick"))] 55 use parking_lot::Mutex; 56 use std::{net::SocketAddr, sync::Arc, time::Duration}; 57 use tokio::task::JoinHandle; 58 59 /// A validator is a full node, capable of validating blocks. 60 #[derive(Clone)] 61 pub struct Validator<N: Network, C: ConsensusStorage<N>> { 62 /// The ledger of the node. 63 ledger: Ledger<N, C>, 64 /// The consensus module of the node. 65 consensus: Consensus<N>, 66 /// The router of the node. 67 router: Router<N>, 68 /// The REST server of the node. 69 rest: Option<Rest<N, C, Self>>, 70 /// The block synchronization logic (used in the Router impl). 71 sync: Arc<BlockSync<N>>, 72 /// The spawned handles. 73 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 74 /// Keeps track of sending pings. 75 ping: Arc<Ping<N>>, 76 } 77 78 impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> { 79 /// Initializes a new validator node. 80 pub async fn new( 81 node_ip: SocketAddr, 82 bft_ip: Option<SocketAddr>, 83 rest_ip: Option<SocketAddr>, 84 rest_rps: u32, 85 account: Account<N>, 86 trusted_peers: &[SocketAddr], 87 trusted_validators: &[SocketAddr], 88 genesis: Block<N>, 89 cdn: Option<http::Uri>, 90 storage_mode: StorageMode, 91 trusted_peers_only: bool, 92 dev_txs: bool, 93 dev: Option<u16>, 94 signal_handler: Arc<SignalHandler>, 95 ) -> Result<Self> { 96 // Initialize the ledger. 97 let ledger = { 98 let storage_mode = storage_mode.clone(); 99 let genesis = genesis.clone(); 100 101 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode)) 102 } 103 .with_context(|| "Failed to initialize the ledger")?; 104 105 // Initialize the ledger service. 106 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone())); 107 108 // Initialize the node router. 109 let router = Router::new( 110 node_ip, 111 NodeType::Validator, 112 account.clone(), 113 ledger_service.clone(), 114 trusted_peers, 115 Self::MAXIMUM_NUMBER_OF_PEERS as u16, 116 trusted_peers_only, 117 storage_mode.clone(), 118 dev.is_some(), 119 ) 120 .await?; 121 122 // Initialize the block synchronization logic. 123 let sync = Arc::new(BlockSync::new(ledger_service.clone())); 124 let locators = sync.get_block_locators()?; 125 let ping = Arc::new(Ping::new(router.clone(), locators)); 126 127 // Initialize the consensus layer. 128 let consensus = Consensus::new( 129 account.clone(), 130 ledger_service.clone(), 131 sync.clone(), 132 bft_ip, 133 trusted_validators, 134 trusted_peers_only, 135 storage_mode.clone(), 136 ping.clone(), 137 dev, 138 ) 139 .await?; 140 141 // Initialize the node. 142 let mut node = Self { 143 ledger: ledger.clone(), 144 consensus: consensus.clone(), 145 router, 146 rest: None, 147 sync: sync.clone(), 148 ping, 149 handles: Default::default(), 150 }; 151 152 // Perform sync with CDN (if enabled). 153 let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))); 154 155 // Initialize the transaction pool. 156 node.initialize_transaction_pool(dev, dev_txs)?; 157 158 // Initialize the REST server. 159 if let Some(rest_ip) = rest_ip { 160 node.rest = Some( 161 Rest::start( 162 rest_ip, 163 rest_rps, 164 Some(consensus), 165 ledger.clone(), 166 Arc::new(node.clone()), 167 cdn_sync.clone(), 168 sync, 169 ) 170 .await?, 171 ); 172 } 173 174 // Set up everything else after CDN sync is done. 175 if let Some(cdn_sync) = cdn_sync { 176 if let Err(error) = cdn_sync.wait().await { 177 crate::log_clean_error(&storage_mode); 178 node.shut_down().await; 179 return Err(error); 180 } 181 } 182 183 // Initialize the routing. 184 node.initialize_routing().await; 185 // Initialize the notification message loop. 186 node.handles.lock().push(crate::start_notification_message_loop()); 187 188 // Return the node. 189 Ok(node) 190 } 191 192 /// Returns the ledger. 193 pub fn ledger(&self) -> &Ledger<N, C> { 194 &self.ledger 195 } 196 197 /// Returns the REST server. 198 pub fn rest(&self) -> &Option<Rest<N, C, Self>> { 199 &self.rest 200 } 201 202 /// Returns the router. 203 pub fn router(&self) -> &Router<N> { 204 &self.router 205 } 206 207 // /// Initialize the transaction pool. 208 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> { 209 // use alphavm::{ 210 // console::{ 211 // account::ViewKey, 212 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value}, 213 // types::U64, 214 // }, 215 // ledger::block::transition::Output, 216 // }; 217 // use std::str::FromStr; 218 // 219 // // Initialize the locator. 220 // let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("split")?); 221 // // Initialize the record name. 222 // let record_name = Identifier::from_str("credits")?; 223 // 224 // /// Searches the genesis block for the mint record. 225 // fn search_genesis_for_mint<N: Network>( 226 // block: Block<N>, 227 // view_key: &ViewKey<N>, 228 // ) -> Option<Record<N, Plaintext<N>>> { 229 // for transition in block.transitions().filter(|t| t.is_mint()) { 230 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] { 231 // if ciphertext.is_owner(view_key) { 232 // match ciphertext.decrypt(view_key) { 233 // Ok(record) => return Some(record), 234 // Err(error) => { 235 // error!("Failed to decrypt the mint output record - {error}"); 236 // return None; 237 // } 238 // } 239 // } 240 // } 241 // } 242 // None 243 // } 244 // 245 // /// Searches the block for the split record. 246 // fn search_block_for_split<N: Network>( 247 // block: Block<N>, 248 // view_key: &ViewKey<N>, 249 // ) -> Option<Record<N, Plaintext<N>>> { 250 // let mut found = None; 251 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported. 252 // // block.transitions().rev().for_each(|t| { 253 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>(); 254 // splits.iter().rev().for_each(|t| { 255 // if found.is_some() { 256 // return; 257 // } 258 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else { 259 // error!("Failed to find the split output record"); 260 // return; 261 // }; 262 // if ciphertext.is_owner(view_key) { 263 // match ciphertext.decrypt(view_key) { 264 // Ok(record) => found = Some(record), 265 // Err(error) => { 266 // error!("Failed to decrypt the split output record - {error}"); 267 // } 268 // } 269 // } 270 // }); 271 // found 272 // } 273 // 274 // let self_ = self.clone(); 275 // self.spawn(async move { 276 // // Retrieve the view key. 277 // let view_key = self_.view_key(); 278 // // Initialize the record. 279 // let mut record = { 280 // let mut found = None; 281 // let mut height = self_.ledger.latest_height(); 282 // while found.is_none() && height > 0 { 283 // // Retrieve the block. 284 // let Ok(block) = self_.ledger.get_block(height) else { 285 // error!("Failed to get block at height {}", height); 286 // break; 287 // }; 288 // // Search for the latest split record. 289 // if let Some(record) = search_block_for_split(block, view_key) { 290 // found = Some(record); 291 // } 292 // // Decrement the height. 293 // height = height.saturating_sub(1); 294 // } 295 // match found { 296 // Some(record) => record, 297 // None => { 298 // // Retrieve the genesis block. 299 // let Ok(block) = self_.ledger.get_block(0) else { 300 // error!("Failed to get the genesis block"); 301 // return; 302 // }; 303 // // Search the genesis block for the mint record. 304 // if let Some(record) = search_genesis_for_mint(block, view_key) { 305 // found = Some(record); 306 // } 307 // found.expect("Failed to find the split output record") 308 // } 309 // } 310 // }; 311 // info!("Starting transaction pool..."); 312 // // Start the transaction loop. 313 // loop { 314 // tokio::time::sleep(Duration::from_secs(1)).await; 315 // // If the node is running in development mode, only generate if you are allowed. 316 // if let Some(dev) = dev { 317 // if dev != 0 { 318 // continue; 319 // } 320 // } 321 // 322 // // Prepare the inputs. 323 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter(); 324 // // Execute the transaction. 325 // let transaction = match self_.ledger.vm().execute( 326 // self_.private_key(), 327 // locator, 328 // inputs, 329 // None, 330 // None, 331 // &mut rand::thread_rng(), 332 // ) { 333 // Ok(transaction) => transaction, 334 // Err(error) => { 335 // error!("Transaction pool encountered an execution error - {error}"); 336 // continue; 337 // } 338 // }; 339 // // Retrieve the transition. 340 // let Some(transition) = transaction.transitions().next() else { 341 // error!("Transaction pool encountered a missing transition"); 342 // continue; 343 // }; 344 // // Retrieve the second output. 345 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else { 346 // error!("Transaction pool encountered a missing output"); 347 // continue; 348 // }; 349 // // Save the second output record. 350 // let Ok(next_record) = ciphertext.decrypt(view_key) else { 351 // error!("Transaction pool encountered a decryption error"); 352 // continue; 353 // }; 354 // // Broadcast the transaction. 355 // if self_ 356 // .unconfirmed_transaction( 357 // self_.router.local_ip(), 358 // UnconfirmedTransaction::from(transaction.clone()), 359 // transaction.clone(), 360 // ) 361 // .await 362 // { 363 // info!("Transaction pool broadcasted the transaction"); 364 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap(); 365 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) { 366 // tokio::time::sleep(Duration::from_secs(1)).await; 367 // } 368 // info!("Transaction accepted by the ledger"); 369 // } 370 // // Save the record. 371 // record = next_record; 372 // } 373 // }); 374 // Ok(()) 375 // } 376 377 /// Initializes the transaction pool (if in development mode). 378 /// 379 /// Spawns a background task that periodically issues transactions to the network. 380 fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> { 381 use alphavm::console::{ 382 program::{Identifier, Literal, ProgramID, Value}, 383 types::U64, 384 }; 385 use std::str::FromStr; 386 387 // Initialize the locator. 388 let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("transfer_public")?); 389 390 // Determine whether to start the loop. 391 match dev { 392 // If the node is running in development mode, only generate if you are allowed. 393 Some(id) => { 394 // If the node is not the first node, or if we should not create dev traffic, do not start the loop. 395 if id != 0 || !dev_txs { 396 return Ok(()); 397 } 398 } 399 // If the node is not running in development mode, do not generate dev traffic. 400 _ => return Ok(()), 401 } 402 403 let self_ = self.clone(); 404 self.spawn(async move { 405 tokio::time::sleep(Duration::from_secs(3)).await; 406 info!("Starting transaction pool..."); 407 408 // Start the transaction loop. 409 loop { 410 tokio::time::sleep(Duration::from_millis(500)).await; 411 412 // Prepare the inputs. 413 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))]; 414 // Execute the transaction. 415 let self__ = self_.clone(); 416 let transaction = match spawn_blocking!(self__.ledger.vm().execute( 417 self__.private_key(), 418 locator, 419 inputs.into_iter(), 420 None, 421 10_000, 422 None, 423 &mut rand::thread_rng(), 424 )) { 425 Ok(transaction) => transaction, 426 Err(error) => { 427 error!("Transaction pool encountered an execution error - {error}"); 428 continue; 429 } 430 }; 431 // Broadcast the transaction. 432 if self_ 433 .unconfirmed_transaction( 434 self_.router.local_ip(), 435 UnconfirmedTransaction::from(transaction.clone()), 436 transaction.clone(), 437 ) 438 .await 439 { 440 info!("Transaction pool broadcasted the transaction"); 441 } 442 } 443 }); 444 Ok(()) 445 } 446 447 /// Spawns a task with the given future; it should only be used for long-running tasks. 448 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 449 self.handles.lock().push(tokio::spawn(future)); 450 } 451 } 452 453 #[async_trait] 454 impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> { 455 /// Shuts down the node. 456 async fn shut_down(&self) { 457 info!("Shutting down..."); 458 459 // Shut down the node. 460 trace!("Shutting down the node..."); 461 462 // Abort the tasks. 463 trace!("Shutting down the validator..."); 464 self.handles.lock().iter().for_each(|handle| handle.abort()); 465 466 // Shut down the router. 467 self.router.shut_down().await; 468 469 // Shut down consensus. 470 trace!("Shutting down consensus..."); 471 self.consensus.shut_down().await; 472 473 info!("Node has shut down."); 474 } 475 } 476 477 #[cfg(test)] 478 mod tests { 479 use super::*; 480 use alphavm::prelude::{ 481 MainnetV0, 482 VM, 483 store::{ConsensusStore, helpers::memory::ConsensusMemory}, 484 }; 485 486 use anyhow::bail; 487 use rand::SeedableRng; 488 use rand_chacha::ChaChaRng; 489 use std::str::FromStr; 490 491 type CurrentNetwork = MainnetV0; 492 493 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test. 494 #[ignore] 495 #[tokio::test] 496 async fn test_profiler() -> Result<()> { 497 // Specify the node attributes. 498 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap(); 499 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap(); 500 let storage_mode = StorageMode::Development(0); 501 let dev_txs = true; 502 503 // Initialize an (insecure) fixed RNG. 504 let mut rng = ChaChaRng::seed_from_u64(1234567890u64); 505 // Initialize the account. 506 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap(); 507 // Initialize a new VM. 508 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open( 509 StorageMode::new_test(None), 510 )?)?; 511 // Initialize the genesis block. 512 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?; 513 514 println!("Initializing validator node..."); 515 516 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new( 517 node, 518 None, 519 Some(rest), 520 10, 521 account, 522 &[], 523 &[], 524 genesis, 525 None, 526 storage_mode, 527 false, 528 dev_txs, 529 None, 530 SignalHandler::new(), 531 ) 532 .await 533 .unwrap(); 534 535 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),); 536 537 bail!("\n\nRemember to #[ignore] this test!\n\n") 538 } 539 }