primary.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::common::{ 20 utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger}, 21 CurrentNetwork, 22 TranslucentLedgerService, 23 }; 24 25 use alphaos_account::Account; 26 use alphaos_node_bft::{ 27 helpers::{init_primary_channels, PrimarySender, Storage}, 28 Primary, 29 BFT, 30 MAX_BATCH_DELAY_IN_MS, 31 MEMORY_POOL_PORT, 32 }; 33 use alphaos_node_bft_storage_service::BFTMemoryService; 34 use alphaos_node_network::PeerPoolHandling; 35 use alphaos_node_sync::BlockSync; 36 use alphaos_utilities::SimpleStoppable; 37 38 use alphavm::{ 39 console::{ 40 account::{Address, PrivateKey}, 41 algorithms::{Hash, BHP256}, 42 network::Network, 43 }, 44 ledger::{ 45 block::Block, 46 committee::{Committee, MIN_VALIDATOR_STAKE}, 47 narwhal::BatchHeader, 48 store::{helpers::memory::ConsensusMemory, ConsensusStore}, 49 Ledger, 50 }, 51 prelude::{CryptoRng, FromBytes, Rng, TestRng, ToBits, ToBytes, VM}, 52 utilities::to_bytes_le, 53 }; 54 55 use alphastd::StorageMode; 56 use indexmap::IndexMap; 57 use itertools::Itertools; 58 #[cfg(feature = "locktick")] 59 use locktick::parking_lot::Mutex; 60 #[cfg(not(feature = "locktick"))] 61 use parking_lot::Mutex; 62 use std::{ 63 collections::HashMap, 64 net::{IpAddr, Ipv4Addr, SocketAddr}, 65 ops::RangeBounds, 66 sync::{Arc, OnceLock}, 67 time::Duration, 68 }; 69 use tokio::{task::JoinHandle, time::sleep}; 70 use tracing::*; 71 72 /// The configuration for the test network. 73 #[derive(Clone, Copy, Debug)] 74 pub struct TestNetworkConfig { 75 /// The number of nodes to spin up. 76 pub num_nodes: u16, 77 /// If this is set to `true`, the BFT protocol is started on top of Narwhal. 78 pub bft: bool, 79 /// If this is set to `true`, all nodes are connected to each other (when they're first 80 /// started). 81 pub connect_all: bool, 82 /// If `Some(i)` is set, the cannons will fire every `i` milliseconds. 83 pub fire_transmissions: Option<u64>, 84 /// The log level to use for the test. 85 pub log_level: Option<u8>, 86 /// If this is set to `true`, the number of connections is logged every 5 seconds. 87 pub log_connections: bool, 88 } 89 90 /// A test network. 91 #[derive(Clone)] 92 pub struct TestNetwork { 93 /// The configuration for the test network. 94 pub config: TestNetworkConfig, 95 /// A map of node IDs to validators in the network. 96 pub validators: HashMap<u16, TestValidator>, 97 } 98 99 /// A test validator. 100 #[derive(Clone)] 101 pub struct TestValidator { 102 /// The ID of the validator. 103 pub id: u16, 104 /// The primary instance. When the BFT is enabled this is a clone of the BFT primary. 105 pub primary: Primary<CurrentNetwork>, 106 /// The channel sender of the primary. 107 pub primary_sender: Option<PrimarySender<CurrentNetwork>>, 108 /// The BFT instance. This is only set if the BFT is enabled. 109 pub bft: OnceLock<BFT<CurrentNetwork>>, 110 /// The tokio handles of all long-running tasks associated with the validator (incl. cannons). 111 pub handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 112 } 113 114 pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>; 115 116 impl TestValidator { 117 pub fn fire_transmissions(&mut self, interval_ms: u64) { 118 let solution_handle = fire_unconfirmed_solutions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms); 119 let transaction_handle = 120 fire_unconfirmed_transactions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms); 121 122 self.handles.lock().push(solution_handle); 123 self.handles.lock().push(transaction_handle); 124 } 125 126 pub fn log_connections(&mut self) { 127 let self_clone = self.clone(); 128 self.handles.lock().push(tokio::task::spawn(async move { 129 loop { 130 let connections = self_clone.primary.gateway().connected_peers(); 131 info!("{} connections", connections.len()); 132 for connection in connections { 133 debug!(" {}", connection); 134 } 135 sleep(Duration::from_secs(5)).await; 136 } 137 })); 138 } 139 } 140 141 impl TestNetwork { 142 // Creates a new test network with the given configuration. 143 pub fn new(config: TestNetworkConfig) -> Self { 144 let mut rng = TestRng::default(); 145 146 if let Some(log_level) = config.log_level { 147 initialize_logger(log_level); 148 } 149 150 let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng); 151 let bonded_balances: IndexMap<_, _> = committee 152 .members() 153 .iter() 154 .map(|(address, (amount, _, _))| (*address, (*address, *address, *amount))) 155 .collect(); 156 let gen_key = *accounts[0].private_key(); 157 let public_balance_per_validator = (CurrentNetwork::STARTING_SUPPLY 158 - (config.num_nodes as u64) * MIN_VALIDATOR_STAKE) 159 / (config.num_nodes as u64); 160 let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new(); 161 for account in accounts.iter() { 162 balances.insert(account.address(), public_balance_per_validator); 163 } 164 165 let mut validators = HashMap::with_capacity(config.num_nodes as usize); 166 for (id, account) in accounts.into_iter().enumerate() { 167 let gen_ledger = 168 genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), &mut rng); 169 let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new())); 170 let storage = Storage::new( 171 ledger.clone(), 172 Arc::new(BFTMemoryService::new()), 173 BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64, 174 ); 175 // Initialize the block synchronization logic. 176 let block_sync = Arc::new(BlockSync::new(ledger.clone())); 177 let (primary, bft) = if config.bft { 178 let bft = BFT::<CurrentNetwork>::new( 179 account, 180 storage, 181 ledger, 182 block_sync, 183 Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), 184 &[], 185 false, 186 StorageMode::new_test(None), 187 None, 188 ) 189 .unwrap(); 190 (bft.primary().clone(), Some(bft)) 191 } else { 192 let primary = Primary::<CurrentNetwork>::new( 193 account, 194 storage, 195 ledger, 196 block_sync, 197 Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), 198 &[], 199 false, 200 StorageMode::new_test(None), 201 None, 202 ) 203 .unwrap(); 204 (primary, None) 205 }; 206 207 let test_validator = TestValidator { 208 id: id as u16, 209 primary, 210 primary_sender: None, 211 bft: OnceLock::new(), 212 handles: Default::default(), 213 }; 214 if let Some(bft) = bft { 215 assert!(test_validator.bft.set(bft).is_ok()); 216 } 217 validators.insert(id as u16, test_validator); 218 } 219 220 Self { config, validators } 221 } 222 223 // Starts each node in the network. 224 pub async fn start(&mut self) { 225 for validator in self.validators.values_mut() { 226 let (primary_sender, primary_receiver) = init_primary_channels(); 227 validator.primary_sender = Some(primary_sender.clone()); 228 229 // let ledger_service = validator.primary.ledger().clone(); 230 // let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service); 231 // sync.try_block_sync(validator.primary.gateway()).await.unwrap(); 232 233 if let Some(bft) = validator.bft.get_mut() { 234 // Setup the channels and start the bft. 235 bft.run(None, None, primary_sender, primary_receiver).await.unwrap(); 236 } else { 237 // Setup the channels and start the primary. 238 validator.primary.run(None, None, primary_sender, primary_receiver).await.unwrap(); 239 } 240 241 if let Some(interval_ms) = self.config.fire_transmissions { 242 validator.fire_transmissions(interval_ms); 243 } 244 245 if self.config.log_connections { 246 validator.log_connections(); 247 } 248 } 249 250 if self.config.connect_all { 251 self.connect_all().await; 252 } 253 } 254 255 // Starts the solution and transaction cannons for node. 256 pub fn fire_transmissions_at(&mut self, id: u16, interval_ms: u64) { 257 self.validators.get_mut(&id).unwrap().fire_transmissions(interval_ms); 258 } 259 260 // Connects a node to another node. 261 pub async fn connect_validators(&self, first_id: u16, second_id: u16) { 262 let first_validator = self.validators.get(&first_id).unwrap(); 263 let second_validator_ip = self.validators.get(&second_id).unwrap().primary.gateway().local_ip(); 264 first_validator.primary.gateway().connect(second_validator_ip); 265 // Give the connection time to be established. 266 sleep(Duration::from_millis(100)).await; 267 } 268 269 // Connects all nodes to each other. 270 pub async fn connect_all(&self) { 271 for (validator, other_validator) in self.validators.values().tuple_combinations() { 272 // Connect to the node. 273 let ip = other_validator.primary.gateway().local_ip(); 274 validator.primary.gateway().connect(ip); 275 // Give the connection time to be established. 276 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 277 } 278 } 279 280 // Connects a specific node to all other nodes. 281 pub async fn connect_one(&self, id: u16) { 282 let target_validator = self.validators.get(&id).unwrap(); 283 let target_ip = target_validator.primary.gateway().local_ip(); 284 for validator in self.validators.values() { 285 if validator.id != id { 286 // Connect to the node. 287 validator.primary.gateway().connect(target_ip); 288 // Give the connection time to be established. 289 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 290 } 291 } 292 } 293 294 // Disconnects N nodes from all other nodes. 295 pub async fn disconnect(&self, num_nodes: u16) { 296 for validator in self.validators.values().take(num_nodes as usize) { 297 for peer_ip in validator.primary.gateway().connected_peers().iter() { 298 validator.primary.gateway().disconnect(*peer_ip); 299 } 300 } 301 302 // Give the connections time to be closed. 303 sleep(Duration::from_millis(100)).await; 304 } 305 306 // Disconnects a specific node from all other nodes. 307 pub async fn disconnect_one(&self, id: u16) { 308 let target_validator = self.validators.get(&id).unwrap(); 309 for peer_ip in target_validator.primary.gateway().connected_peers().iter() { 310 target_validator.primary.gateway().disconnect(*peer_ip); 311 } 312 313 // Give the connections time to be closed. 314 sleep(Duration::from_millis(100)).await; 315 } 316 317 // Checks if a Byzantine fault-tolerant quorum of validators has reached the given round. 318 // Assuming `N = 3f + 1 + k`, where `0 <= k < 3`, and '/' denotes integer division, 319 // then `N - (N-1)/3 = 2N/3 + 1 = 2f + 1 + (2k+2)/3 = 2f + 1 + k = N - f`. 320 pub fn is_round_reached(&self, round: u64) -> bool { 321 let quorum_threshold = self.validators.len() - (self.validators.len() - 1) / 3; 322 self.validators.values().filter(|v| v.primary.current_round() >= round).count() >= quorum_threshold 323 } 324 325 // Checks if all the nodes have stopped progressing. 326 pub async fn is_halted(&self) -> bool { 327 let halt_round = self.validators.values().map(|v| v.primary.current_round()).max().unwrap(); 328 sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS * 2)).await; 329 self.validators.values().all(|v| v.primary.current_round() <= halt_round) 330 } 331 332 // Checks if the committee is coherent in storage for all nodes (not quorum) over a range of 333 // rounds. 334 pub fn is_committee_coherent<T>(&self, rounds_range: T) -> bool 335 where 336 T: RangeBounds<u64> + IntoIterator<Item = u64>, 337 { 338 for round in rounds_range.into_iter() { 339 let mut last: Option<Committee<CurrentNetwork>> = None; 340 for validator in self.validators.values() { 341 // Round might be in future, in case validator didn't get to it. 342 if let Ok(committee) = validator.primary.ledger().get_committee_for_round(round) { 343 match last.clone() { 344 None => last = Some(committee), 345 Some(first) => { 346 if first != committee { 347 return false; 348 } 349 } 350 } 351 } 352 } 353 } 354 355 true 356 } 357 358 // Checks if the certificates are coherent in storage for all nodes (not quorum) over a range 359 // of rounds. 360 pub fn is_certificate_round_coherent<T>(&self, rounds_range: T) -> bool 361 where 362 T: RangeBounds<u64> + IntoIterator<Item = u64>, 363 { 364 rounds_range.into_iter().all(|round| { 365 self.validators.values().map(|v| v.primary.storage().get_certificates_for_round(round)).dedup().count() == 1 366 }) 367 } 368 } 369 370 // Initializes a new test committee. 371 pub fn new_test_committee(n: u16, rng: &mut TestRng) -> (Vec<Account<CurrentNetwork>>, Committee<CurrentNetwork>) { 372 let mut accounts = Vec::with_capacity(n as usize); 373 let mut members = IndexMap::with_capacity(n as usize); 374 for i in 0..n { 375 // Sample the account. 376 let account = Account::new(rng).unwrap(); 377 info!("Validator {}: {}", i, account.address()); 378 379 members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100))); 380 accounts.push(account); 381 } 382 // Initialize the committee. 383 let committee = Committee::<CurrentNetwork>::new(0u64, members).unwrap(); 384 385 (accounts, committee) 386 } 387 388 fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> { 389 static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new(); 390 CACHE.get_or_init(|| Mutex::new(HashMap::new())) 391 } 392 393 pub fn genesis_block( 394 genesis_private_key: PrivateKey<CurrentNetwork>, 395 committee: Committee<CurrentNetwork>, 396 public_balances: IndexMap<Address<CurrentNetwork>, u64>, 397 bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>, 398 rng: &mut (impl Rng + CryptoRng), 399 ) -> Block<CurrentNetwork> { 400 // Initialize the store. 401 let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap(); 402 // Initialize a new VM. 403 let vm = VM::from(store).unwrap(); 404 // Initialize the genesis block. 405 vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap() 406 } 407 408 pub fn genesis_ledger( 409 genesis_private_key: PrivateKey<CurrentNetwork>, 410 committee: Committee<CurrentNetwork>, 411 public_balances: IndexMap<Address<CurrentNetwork>, u64>, 412 bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>, 413 rng: &mut (impl Rng + CryptoRng), 414 ) -> CurrentLedger { 415 let cache_key = 416 to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap(); 417 // Initialize the genesis block on the first call; other callers 418 // will wait for it on the mutex. 419 let block = genesis_cache() 420 .lock() 421 .entry(cache_key.clone()) 422 .or_insert_with(|| { 423 let hasher = BHP256::<CurrentNetwork>::setup("alpha.dev.block").unwrap(); 424 let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis"; 425 let file_path = std::env::temp_dir().join(file_name); 426 if file_path.exists() { 427 let buffer = std::fs::read(file_path).unwrap(); 428 return Block::from_bytes_le(&buffer).unwrap(); 429 } 430 431 let block = genesis_block(genesis_private_key, committee, public_balances, bonded_balances, rng); 432 std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap(); 433 block 434 }) 435 .clone(); 436 // Initialize the ledger with the genesis block. 437 CurrentLedger::load(block, StorageMode::new_test(None)).unwrap() 438 }