mod.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 mod router; 20 21 use crate::traits::NodeInterface; 22 23 use alphaos_account::Account; 24 use alphaos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking}; 25 use alphaos_node_cdn::CdnBlockSync; 26 use alphaos_node_consensus::{Consensus, IpcEventHandler}; 27 use alphaos_node_network::{NodeType, PeerPoolHandling}; 28 use alphaos_node_rest::Rest; 29 use alphaos_node_router::{ 30 messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction}, 31 Heartbeat, 32 Inbound, 33 Outbound, 34 Router, 35 Routing, 36 }; 37 use alphaos_node_sync::{BlockSync, Ping}; 38 use alphaos_node_tcp::{ 39 protocols::{Disconnect, Handshake, OnConnect, Reading}, 40 P2P, 41 }; 42 use alphaos_utilities::SignalHandler; 43 44 use alphavm::prelude::{block::Block, puzzle::Solution, store::ConsensusStorage, Ledger, Network}; 45 46 use alphastd::StorageMode; 47 use anyhow::{Context, Result}; 48 use core::future::Future; 49 #[cfg(feature = "locktick")] 50 use locktick::parking_lot::Mutex; 51 #[cfg(not(feature = "locktick"))] 52 use parking_lot::Mutex; 53 use std::{net::SocketAddr, sync::Arc, time::Duration}; 54 use tokio::task::JoinHandle; 55 56 /// A validator is a full node, capable of validating blocks. 57 #[derive(Clone)] 58 pub struct Validator<N: Network, C: ConsensusStorage<N>> { 59 /// The ledger of the node. 60 ledger: Ledger<N, C>, 61 /// The consensus module of the node. 62 consensus: Consensus<N>, 63 /// The router of the node. 64 router: Router<N>, 65 /// The REST server of the node. 66 rest: Option<Rest<N, C, Self>>, 67 /// The block synchronization logic (used in the Router impl). 68 sync: Arc<BlockSync<N>>, 69 /// The spawned handles. 70 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 71 /// Keeps track of sending pings. 72 ping: Arc<Ping<N>>, 73 } 74 75 impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> { 76 /// Initializes a new validator node. 77 pub async fn new( 78 node_ip: SocketAddr, 79 bft_ip: Option<SocketAddr>, 80 rest_ip: Option<SocketAddr>, 81 rest_rps: u32, 82 account: Account<N>, 83 trusted_peers: &[SocketAddr], 84 trusted_validators: &[SocketAddr], 85 genesis: Block<N>, 86 cdn: Option<http::Uri>, 87 storage_mode: StorageMode, 88 trusted_peers_only: bool, 89 dev_txs: bool, 90 dev: Option<u16>, 91 signal_handler: Arc<SignalHandler>, 92 ) -> Result<Self> { 93 Self::with_ipc_handler( 94 node_ip, 95 bft_ip, 96 rest_ip, 97 rest_rps, 98 account, 99 trusted_peers, 100 trusted_validators, 101 genesis, 102 cdn, 103 storage_mode, 104 trusted_peers_only, 105 dev_txs, 106 dev, 107 signal_handler, 108 None, 109 ) 110 .await 111 } 112 113 /// Initializes a new validator node with an optional IPC handler for cross-chain communication. 114 /// 115 /// The IPC handler enables communication between Alpha and Delta chains via the adnet runtime. 116 #[allow(clippy::too_many_arguments)] 117 pub async fn with_ipc_handler( 118 node_ip: SocketAddr, 119 bft_ip: Option<SocketAddr>, 120 rest_ip: Option<SocketAddr>, 121 rest_rps: u32, 122 account: Account<N>, 123 trusted_peers: &[SocketAddr], 124 trusted_validators: &[SocketAddr], 125 genesis: Block<N>, 126 cdn: Option<http::Uri>, 127 storage_mode: StorageMode, 128 trusted_peers_only: bool, 129 dev_txs: bool, 130 dev: Option<u16>, 131 signal_handler: Arc<SignalHandler>, 132 ipc_handler: Option<Arc<dyn IpcEventHandler<N>>>, 133 ) -> Result<Self> { 134 // Initialize the ledger. 135 let ledger = { 136 let storage_mode = storage_mode.clone(); 137 let genesis = genesis.clone(); 138 139 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode)) 140 } 141 .with_context(|| "Failed to initialize the ledger")?; 142 143 // Initialize the ledger service. 144 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone())); 145 146 // Initialize the node router. 147 let router = Router::new( 148 node_ip, 149 NodeType::Validator, 150 account.clone(), 151 ledger_service.clone(), 152 trusted_peers, 153 Self::MAXIMUM_NUMBER_OF_PEERS as u16, 154 trusted_peers_only, 155 storage_mode.clone(), 156 dev.is_some(), 157 ) 158 .await?; 159 160 // Initialize the block synchronization logic. 161 let sync = Arc::new(BlockSync::new(ledger_service.clone())); 162 let locators = sync.get_block_locators()?; 163 let ping = Arc::new(Ping::new(router.clone(), locators)); 164 165 // Initialize the consensus layer. 166 let consensus = Consensus::with_ipc_handler( 167 account.clone(), 168 ledger_service.clone(), 169 sync.clone(), 170 bft_ip, 171 trusted_validators, 172 trusted_peers_only, 173 storage_mode.clone(), 174 ping.clone(), 175 dev, 176 ipc_handler, 177 ) 178 .await?; 179 180 // Initialize the node. 181 let mut node = Self { 182 ledger: ledger.clone(), 183 consensus: consensus.clone(), 184 router, 185 rest: None, 186 sync: sync.clone(), 187 ping, 188 handles: Default::default(), 189 }; 190 191 // Perform sync with CDN (if enabled). 192 let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))); 193 194 // Initialize the transaction pool. 195 node.initialize_transaction_pool(dev, dev_txs)?; 196 197 // Initialize the REST server. 198 if let Some(rest_ip) = rest_ip { 199 node.rest = Some( 200 Rest::start( 201 rest_ip, 202 rest_rps, 203 Some(consensus), 204 ledger.clone(), 205 Arc::new(node.clone()), 206 cdn_sync.clone(), 207 sync, 208 ) 209 .await?, 210 ); 211 } 212 213 // Set up everything else after CDN sync is done. 214 if let Some(cdn_sync) = cdn_sync { 215 if let Err(error) = cdn_sync.wait().await { 216 crate::log_clean_error(&storage_mode); 217 node.shut_down().await; 218 return Err(error); 219 } 220 } 221 222 // Initialize the routing. 223 node.initialize_routing().await; 224 // Initialize the notification message loop. 225 node.handles.lock().push(crate::start_notification_message_loop()); 226 227 // Return the node. 228 Ok(node) 229 } 230 231 /// Returns the ledger. 232 pub fn ledger(&self) -> &Ledger<N, C> { 233 &self.ledger 234 } 235 236 /// Returns the REST server. 237 pub fn rest(&self) -> &Option<Rest<N, C, Self>> { 238 &self.rest 239 } 240 241 /// Returns the router. 242 pub fn router(&self) -> &Router<N> { 243 &self.router 244 } 245 246 // /// Initialize the transaction pool. 247 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> { 248 // use alphavm::{ 249 // console::{ 250 // account::ViewKey, 251 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value}, 252 // types::U64, 253 // }, 254 // ledger::block::transition::Output, 255 // }; 256 // use std::str::FromStr; 257 // 258 // // Initialize the locator. 259 // let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("split")?); 260 // // Initialize the record name. 261 // let record_name = Identifier::from_str("credits")?; 262 // 263 // /// Searches the genesis block for the mint record. 264 // fn search_genesis_for_mint<N: Network>( 265 // block: Block<N>, 266 // view_key: &ViewKey<N>, 267 // ) -> Option<Record<N, Plaintext<N>>> { 268 // for transition in block.transitions().filter(|t| t.is_mint()) { 269 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] { 270 // if ciphertext.is_owner(view_key) { 271 // match ciphertext.decrypt(view_key) { 272 // Ok(record) => return Some(record), 273 // Err(error) => { 274 // error!("Failed to decrypt the mint output record - {error}"); 275 // return None; 276 // } 277 // } 278 // } 279 // } 280 // } 281 // None 282 // } 283 // 284 // /// Searches the block for the split record. 285 // fn search_block_for_split<N: Network>( 286 // block: Block<N>, 287 // view_key: &ViewKey<N>, 288 // ) -> Option<Record<N, Plaintext<N>>> { 289 // let mut found = None; 290 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported. 291 // // block.transitions().rev().for_each(|t| { 292 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>(); 293 // splits.iter().rev().for_each(|t| { 294 // if found.is_some() { 295 // return; 296 // } 297 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else { 298 // error!("Failed to find the split output record"); 299 // return; 300 // }; 301 // if ciphertext.is_owner(view_key) { 302 // match ciphertext.decrypt(view_key) { 303 // Ok(record) => found = Some(record), 304 // Err(error) => { 305 // error!("Failed to decrypt the split output record - {error}"); 306 // } 307 // } 308 // } 309 // }); 310 // found 311 // } 312 // 313 // let self_ = self.clone(); 314 // self.spawn(async move { 315 // // Retrieve the view key. 316 // let view_key = self_.view_key(); 317 // // Initialize the record. 318 // let mut record = { 319 // let mut found = None; 320 // let mut height = self_.ledger.latest_height(); 321 // while found.is_none() && height > 0 { 322 // // Retrieve the block. 323 // let Ok(block) = self_.ledger.get_block(height) else { 324 // error!("Failed to get block at height {}", height); 325 // break; 326 // }; 327 // // Search for the latest split record. 328 // if let Some(record) = search_block_for_split(block, view_key) { 329 // found = Some(record); 330 // } 331 // // Decrement the height. 332 // height = height.saturating_sub(1); 333 // } 334 // match found { 335 // Some(record) => record, 336 // None => { 337 // // Retrieve the genesis block. 338 // let Ok(block) = self_.ledger.get_block(0) else { 339 // error!("Failed to get the genesis block"); 340 // return; 341 // }; 342 // // Search the genesis block for the mint record. 343 // if let Some(record) = search_genesis_for_mint(block, view_key) { 344 // found = Some(record); 345 // } 346 // found.expect("Failed to find the split output record") 347 // } 348 // } 349 // }; 350 // info!("Starting transaction pool..."); 351 // // Start the transaction loop. 352 // loop { 353 // tokio::time::sleep(Duration::from_secs(1)).await; 354 // // If the node is running in development mode, only generate if you are allowed. 355 // if let Some(dev) = dev { 356 // if dev != 0 { 357 // continue; 358 // } 359 // } 360 // 361 // // Prepare the inputs. 362 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter(); 363 // // Execute the transaction. 364 // let transaction = match self_.ledger.vm().execute( 365 // self_.private_key(), 366 // locator, 367 // inputs, 368 // None, 369 // None, 370 // &mut rand::thread_rng(), 371 // ) { 372 // Ok(transaction) => transaction, 373 // Err(error) => { 374 // error!("Transaction pool encountered an execution error - {error}"); 375 // continue; 376 // } 377 // }; 378 // // Retrieve the transition. 379 // let Some(transition) = transaction.transitions().next() else { 380 // error!("Transaction pool encountered a missing transition"); 381 // continue; 382 // }; 383 // // Retrieve the second output. 384 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else { 385 // error!("Transaction pool encountered a missing output"); 386 // continue; 387 // }; 388 // // Save the second output record. 389 // let Ok(next_record) = ciphertext.decrypt(view_key) else { 390 // error!("Transaction pool encountered a decryption error"); 391 // continue; 392 // }; 393 // // Broadcast the transaction. 394 // if self_ 395 // .unconfirmed_transaction( 396 // self_.router.local_ip(), 397 // UnconfirmedTransaction::from(transaction.clone()), 398 // transaction.clone(), 399 // ) 400 // .await 401 // { 402 // info!("Transaction pool broadcasted the transaction"); 403 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap(); 404 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) { 405 // tokio::time::sleep(Duration::from_secs(1)).await; 406 // } 407 // info!("Transaction accepted by the ledger"); 408 // } 409 // // Save the record. 410 // record = next_record; 411 // } 412 // }); 413 // Ok(()) 414 // } 415 416 /// Initializes the transaction pool (if in development mode). 417 /// 418 /// Spawns a background task that periodically issues transactions to the network. 419 fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> { 420 use alphavm::console::{ 421 program::{Identifier, Literal, ProgramID, Value}, 422 types::U64, 423 }; 424 use std::str::FromStr; 425 426 // Initialize the locator. 427 let locator = (ProgramID::from_str("credits.alpha")?, Identifier::from_str("transfer_public")?); 428 429 // Determine whether to start the loop. 430 match dev { 431 // If the node is running in development mode, only generate if you are allowed. 432 Some(id) => { 433 // If the node is not the first node, or if we should not create dev traffic, do not start the loop. 434 if id != 0 || !dev_txs { 435 return Ok(()); 436 } 437 } 438 // If the node is not running in development mode, do not generate dev traffic. 439 _ => return Ok(()), 440 } 441 442 let self_ = self.clone(); 443 self.spawn(async move { 444 tokio::time::sleep(Duration::from_secs(3)).await; 445 info!("Starting transaction pool..."); 446 447 // Start the transaction loop. 448 loop { 449 tokio::time::sleep(Duration::from_millis(500)).await; 450 451 // Prepare the inputs. 452 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))]; 453 // Execute the transaction. 454 let self__ = self_.clone(); 455 let transaction = match spawn_blocking!(self__.ledger.vm().execute( 456 self__.private_key(), 457 locator, 458 inputs.into_iter(), 459 None, 460 10_000, 461 None, 462 &mut rand::thread_rng(), 463 )) { 464 Ok(transaction) => transaction, 465 Err(error) => { 466 error!("Transaction pool encountered an execution error - {error}"); 467 continue; 468 } 469 }; 470 // Broadcast the transaction. 471 if self_ 472 .unconfirmed_transaction( 473 self_.router.local_ip(), 474 UnconfirmedTransaction::from(transaction.clone()), 475 transaction.clone(), 476 ) 477 .await 478 { 479 info!("Transaction pool broadcasted the transaction"); 480 } 481 } 482 }); 483 Ok(()) 484 } 485 486 /// Spawns a task with the given future; it should only be used for long-running tasks. 487 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 488 self.handles.lock().push(tokio::spawn(future)); 489 } 490 } 491 492 #[async_trait] 493 impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> { 494 /// Shuts down the node. 495 async fn shut_down(&self) { 496 info!("Shutting down..."); 497 498 // Shut down the node. 499 trace!("Shutting down the node..."); 500 501 // Abort the tasks. 502 trace!("Shutting down the validator..."); 503 self.handles.lock().iter().for_each(|handle| handle.abort()); 504 505 // Shut down the router. 506 self.router.shut_down().await; 507 508 // Shut down consensus. 509 trace!("Shutting down consensus..."); 510 self.consensus.shut_down().await; 511 512 info!("Node has shut down."); 513 } 514 } 515 516 #[cfg(test)] 517 mod tests { 518 use super::*; 519 use alphavm::prelude::{ 520 store::{helpers::memory::ConsensusMemory, ConsensusStore}, 521 MainnetV0, 522 VM, 523 }; 524 525 use anyhow::bail; 526 use rand::SeedableRng; 527 use rand_chacha::ChaChaRng; 528 use std::str::FromStr; 529 530 type CurrentNetwork = MainnetV0; 531 532 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test. 533 #[ignore] 534 #[tokio::test] 535 async fn test_profiler() -> Result<()> { 536 // Specify the node attributes. 537 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap(); 538 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap(); 539 let storage_mode = StorageMode::Development(0); 540 let dev_txs = true; 541 542 // Initialize an (insecure) fixed RNG. 543 let mut rng = ChaChaRng::seed_from_u64(1234567890u64); 544 // Initialize the account. 545 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap(); 546 // Initialize a new VM. 547 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open( 548 StorageMode::new_test(None), 549 )?)?; 550 // Initialize the genesis block. 551 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?; 552 553 println!("Initializing validator node..."); 554 555 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new( 556 node, 557 None, 558 Some(rest), 559 10, 560 account, 561 &[], 562 &[], 563 genesis, 564 None, 565 storage_mode, 566 false, 567 dev_txs, 568 None, 569 SignalHandler::new(), 570 ) 571 .await 572 .unwrap(); 573 574 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),); 575 576 bail!("\n\nRemember to #[ignore] this test!\n\n") 577 } 578 }