primary.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::common::{ 17 CurrentNetwork, 18 TranslucentLedgerService, 19 utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger}, 20 }; 21 22 use alphaos_account::Account; 23 use alphaos_node_bft::{ 24 BFT, 25 MAX_BATCH_DELAY_IN_MS, 26 MEMORY_POOL_PORT, 27 Primary, 28 helpers::{PrimarySender, Storage, init_primary_channels}, 29 }; 30 use alphaos_node_bft_storage_service::BFTMemoryService; 31 use alphaos_node_network::PeerPoolHandling; 32 use alphaos_node_sync::BlockSync; 33 use alphaos_utilities::SimpleStoppable; 34 35 use alphavm::{ 36 console::{ 37 account::{Address, PrivateKey}, 38 algorithms::{BHP256, Hash}, 39 network::Network, 40 }, 41 ledger::{ 42 Ledger, 43 block::Block, 44 committee::{Committee, MIN_VALIDATOR_STAKE}, 45 narwhal::BatchHeader, 46 store::{ConsensusStore, helpers::memory::ConsensusMemory}, 47 }, 48 prelude::{CryptoRng, FromBytes, Rng, TestRng, ToBits, ToBytes, VM}, 49 utilities::to_bytes_le, 50 }; 51 52 use alpha_std::StorageMode; 53 use indexmap::IndexMap; 54 use itertools::Itertools; 55 #[cfg(feature = "locktick")] 56 use locktick::parking_lot::Mutex; 57 #[cfg(not(feature = "locktick"))] 58 use parking_lot::Mutex; 59 use std::{ 60 collections::HashMap, 61 net::{IpAddr, Ipv4Addr, SocketAddr}, 62 ops::RangeBounds, 63 sync::{Arc, OnceLock}, 64 time::Duration, 65 }; 66 use tokio::{task::JoinHandle, time::sleep}; 67 use tracing::*; 68 69 /// The configuration for the test network. 70 #[derive(Clone, Copy, Debug)] 71 pub struct TestNetworkConfig { 72 /// The number of nodes to spin up. 73 pub num_nodes: u16, 74 /// If this is set to `true`, the BFT protocol is started on top of Narwhal. 75 pub bft: bool, 76 /// If this is set to `true`, all nodes are connected to each other (when they're first 77 /// started). 78 pub connect_all: bool, 79 /// If `Some(i)` is set, the cannons will fire every `i` milliseconds. 80 pub fire_transmissions: Option<u64>, 81 /// The log level to use for the test. 82 pub log_level: Option<u8>, 83 /// If this is set to `true`, the number of connections is logged every 5 seconds. 84 pub log_connections: bool, 85 } 86 87 /// A test network. 88 #[derive(Clone)] 89 pub struct TestNetwork { 90 /// The configuration for the test network. 91 pub config: TestNetworkConfig, 92 /// A map of node IDs to validators in the network. 93 pub validators: HashMap<u16, TestValidator>, 94 } 95 96 /// A test validator. 97 #[derive(Clone)] 98 pub struct TestValidator { 99 /// The ID of the validator. 100 pub id: u16, 101 /// The primary instance. When the BFT is enabled this is a clone of the BFT primary. 102 pub primary: Primary<CurrentNetwork>, 103 /// The channel sender of the primary. 104 pub primary_sender: Option<PrimarySender<CurrentNetwork>>, 105 /// The BFT instance. This is only set if the BFT is enabled. 106 pub bft: OnceLock<BFT<CurrentNetwork>>, 107 /// The tokio handles of all long-running tasks associated with the validator (incl. cannons). 108 pub handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 109 } 110 111 pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>; 112 113 impl TestValidator { 114 pub fn fire_transmissions(&mut self, interval_ms: u64) { 115 let solution_handle = fire_unconfirmed_solutions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms); 116 let transaction_handle = 117 fire_unconfirmed_transactions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms); 118 119 self.handles.lock().push(solution_handle); 120 self.handles.lock().push(transaction_handle); 121 } 122 123 pub fn log_connections(&mut self) { 124 let self_clone = self.clone(); 125 self.handles.lock().push(tokio::task::spawn(async move { 126 loop { 127 let connections = self_clone.primary.gateway().connected_peers(); 128 info!("{} connections", connections.len()); 129 for connection in connections { 130 debug!(" {}", connection); 131 } 132 sleep(Duration::from_secs(5)).await; 133 } 134 })); 135 } 136 } 137 138 impl TestNetwork { 139 // Creates a new test network with the given configuration. 140 pub fn new(config: TestNetworkConfig) -> Self { 141 let mut rng = TestRng::default(); 142 143 if let Some(log_level) = config.log_level { 144 initialize_logger(log_level); 145 } 146 147 let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng); 148 let bonded_balances: IndexMap<_, _> = committee 149 .members() 150 .iter() 151 .map(|(address, (amount, _, _))| (*address, (*address, *address, *amount))) 152 .collect(); 153 let gen_key = *accounts[0].private_key(); 154 let public_balance_per_validator = (CurrentNetwork::STARTING_SUPPLY 155 - (config.num_nodes as u64) * MIN_VALIDATOR_STAKE) 156 / (config.num_nodes as u64); 157 let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new(); 158 for account in accounts.iter() { 159 balances.insert(account.address(), public_balance_per_validator); 160 } 161 162 let mut validators = HashMap::with_capacity(config.num_nodes as usize); 163 for (id, account) in accounts.into_iter().enumerate() { 164 let gen_ledger = 165 genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), &mut rng); 166 let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new())); 167 let storage = Storage::new( 168 ledger.clone(), 169 Arc::new(BFTMemoryService::new()), 170 BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64, 171 ); 172 // Initialize the block synchronization logic. 173 let block_sync = Arc::new(BlockSync::new(ledger.clone())); 174 let (primary, bft) = if config.bft { 175 let bft = BFT::<CurrentNetwork>::new( 176 account, 177 storage, 178 ledger, 179 block_sync, 180 Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), 181 &[], 182 false, 183 StorageMode::new_test(None), 184 None, 185 ) 186 .unwrap(); 187 (bft.primary().clone(), Some(bft)) 188 } else { 189 let primary = Primary::<CurrentNetwork>::new( 190 account, 191 storage, 192 ledger, 193 block_sync, 194 Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), 195 &[], 196 false, 197 StorageMode::new_test(None), 198 None, 199 ) 200 .unwrap(); 201 (primary, None) 202 }; 203 204 let test_validator = TestValidator { 205 id: id as u16, 206 primary, 207 primary_sender: None, 208 bft: OnceLock::new(), 209 handles: Default::default(), 210 }; 211 if let Some(bft) = bft { 212 assert!(test_validator.bft.set(bft).is_ok()); 213 } 214 validators.insert(id as u16, test_validator); 215 } 216 217 Self { config, validators } 218 } 219 220 // Starts each node in the network. 221 pub async fn start(&mut self) { 222 for validator in self.validators.values_mut() { 223 let (primary_sender, primary_receiver) = init_primary_channels(); 224 validator.primary_sender = Some(primary_sender.clone()); 225 226 // let ledger_service = validator.primary.ledger().clone(); 227 // let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service); 228 // sync.try_block_sync(validator.primary.gateway()).await.unwrap(); 229 230 if let Some(bft) = validator.bft.get_mut() { 231 // Setup the channels and start the bft. 232 bft.run(None, None, primary_sender, primary_receiver).await.unwrap(); 233 } else { 234 // Setup the channels and start the primary. 235 validator.primary.run(None, None, primary_sender, primary_receiver).await.unwrap(); 236 } 237 238 if let Some(interval_ms) = self.config.fire_transmissions { 239 validator.fire_transmissions(interval_ms); 240 } 241 242 if self.config.log_connections { 243 validator.log_connections(); 244 } 245 } 246 247 if self.config.connect_all { 248 self.connect_all().await; 249 } 250 } 251 252 // Starts the solution and transaction cannons for node. 253 pub fn fire_transmissions_at(&mut self, id: u16, interval_ms: u64) { 254 self.validators.get_mut(&id).unwrap().fire_transmissions(interval_ms); 255 } 256 257 // Connects a node to another node. 258 pub async fn connect_validators(&self, first_id: u16, second_id: u16) { 259 let first_validator = self.validators.get(&first_id).unwrap(); 260 let second_validator_ip = self.validators.get(&second_id).unwrap().primary.gateway().local_ip(); 261 first_validator.primary.gateway().connect(second_validator_ip); 262 // Give the connection time to be established. 263 sleep(Duration::from_millis(100)).await; 264 } 265 266 // Connects all nodes to each other. 267 pub async fn connect_all(&self) { 268 for (validator, other_validator) in self.validators.values().tuple_combinations() { 269 // Connect to the node. 270 let ip = other_validator.primary.gateway().local_ip(); 271 validator.primary.gateway().connect(ip); 272 // Give the connection time to be established. 273 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 274 } 275 } 276 277 // Connects a specific node to all other nodes. 278 pub async fn connect_one(&self, id: u16) { 279 let target_validator = self.validators.get(&id).unwrap(); 280 let target_ip = target_validator.primary.gateway().local_ip(); 281 for validator in self.validators.values() { 282 if validator.id != id { 283 // Connect to the node. 284 validator.primary.gateway().connect(target_ip); 285 // Give the connection time to be established. 286 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 287 } 288 } 289 } 290 291 // Disconnects N nodes from all other nodes. 292 pub async fn disconnect(&self, num_nodes: u16) { 293 for validator in self.validators.values().take(num_nodes as usize) { 294 for peer_ip in validator.primary.gateway().connected_peers().iter() { 295 validator.primary.gateway().disconnect(*peer_ip); 296 } 297 } 298 299 // Give the connections time to be closed. 300 sleep(Duration::from_millis(100)).await; 301 } 302 303 // Disconnects a specific node from all other nodes. 304 pub async fn disconnect_one(&self, id: u16) { 305 let target_validator = self.validators.get(&id).unwrap(); 306 for peer_ip in target_validator.primary.gateway().connected_peers().iter() { 307 target_validator.primary.gateway().disconnect(*peer_ip); 308 } 309 310 // Give the connections time to be closed. 311 sleep(Duration::from_millis(100)).await; 312 } 313 314 // Checks if a Byzantine fault-tolerant quorum of validators has reached the given round. 315 // Assuming `N = 3f + 1 + k`, where `0 <= k < 3`, and '/' denotes integer division, 316 // then `N - (N-1)/3 = 2N/3 + 1 = 2f + 1 + (2k+2)/3 = 2f + 1 + k = N - f`. 317 pub fn is_round_reached(&self, round: u64) -> bool { 318 let quorum_threshold = self.validators.len() - (self.validators.len() - 1) / 3; 319 self.validators.values().filter(|v| v.primary.current_round() >= round).count() >= quorum_threshold 320 } 321 322 // Checks if all the nodes have stopped progressing. 323 pub async fn is_halted(&self) -> bool { 324 let halt_round = self.validators.values().map(|v| v.primary.current_round()).max().unwrap(); 325 sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS * 2)).await; 326 self.validators.values().all(|v| v.primary.current_round() <= halt_round) 327 } 328 329 // Checks if the committee is coherent in storage for all nodes (not quorum) over a range of 330 // rounds. 331 pub fn is_committee_coherent<T>(&self, rounds_range: T) -> bool 332 where 333 T: RangeBounds<u64> + IntoIterator<Item = u64>, 334 { 335 for round in rounds_range.into_iter() { 336 let mut last: Option<Committee<CurrentNetwork>> = None; 337 for validator in self.validators.values() { 338 // Round might be in future, in case validator didn't get to it. 339 if let Ok(committee) = validator.primary.ledger().get_committee_for_round(round) { 340 match last.clone() { 341 None => last = Some(committee), 342 Some(first) => { 343 if first != committee { 344 return false; 345 } 346 } 347 } 348 } 349 } 350 } 351 352 true 353 } 354 355 // Checks if the certificates are coherent in storage for all nodes (not quorum) over a range 356 // of rounds. 357 pub fn is_certificate_round_coherent<T>(&self, rounds_range: T) -> bool 358 where 359 T: RangeBounds<u64> + IntoIterator<Item = u64>, 360 { 361 rounds_range.into_iter().all(|round| { 362 self.validators.values().map(|v| v.primary.storage().get_certificates_for_round(round)).dedup().count() == 1 363 }) 364 } 365 } 366 367 // Initializes a new test committee. 368 pub fn new_test_committee(n: u16, rng: &mut TestRng) -> (Vec<Account<CurrentNetwork>>, Committee<CurrentNetwork>) { 369 let mut accounts = Vec::with_capacity(n as usize); 370 let mut members = IndexMap::with_capacity(n as usize); 371 for i in 0..n { 372 // Sample the account. 373 let account = Account::new(rng).unwrap(); 374 info!("Validator {}: {}", i, account.address()); 375 376 members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100))); 377 accounts.push(account); 378 } 379 // Initialize the committee. 380 let committee = Committee::<CurrentNetwork>::new(0u64, members).unwrap(); 381 382 (accounts, committee) 383 } 384 385 fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> { 386 static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new(); 387 CACHE.get_or_init(|| Mutex::new(HashMap::new())) 388 } 389 390 pub fn genesis_block( 391 genesis_private_key: PrivateKey<CurrentNetwork>, 392 committee: Committee<CurrentNetwork>, 393 public_balances: IndexMap<Address<CurrentNetwork>, u64>, 394 bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>, 395 rng: &mut (impl Rng + CryptoRng), 396 ) -> Block<CurrentNetwork> { 397 // Initialize the store. 398 let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap(); 399 // Initialize a new VM. 400 let vm = VM::from(store).unwrap(); 401 // Initialize the genesis block. 402 vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap() 403 } 404 405 pub fn genesis_ledger( 406 genesis_private_key: PrivateKey<CurrentNetwork>, 407 committee: Committee<CurrentNetwork>, 408 public_balances: IndexMap<Address<CurrentNetwork>, u64>, 409 bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>, 410 rng: &mut (impl Rng + CryptoRng), 411 ) -> CurrentLedger { 412 let cache_key = 413 to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap(); 414 // Initialize the genesis block on the first call; other callers 415 // will wait for it on the mutex. 416 let block = genesis_cache() 417 .lock() 418 .entry(cache_key.clone()) 419 .or_insert_with(|| { 420 let hasher = BHP256::<CurrentNetwork>::setup("aleo.dev.block").unwrap(); 421 let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis"; 422 let file_path = std::env::temp_dir().join(file_name); 423 if file_path.exists() { 424 let buffer = std::fs::read(file_path).unwrap(); 425 return Block::from_bytes_le(&buffer).unwrap(); 426 } 427 428 let block = genesis_block(genesis_private_key, committee, public_balances, bonded_balances, rng); 429 std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap(); 430 block 431 }) 432 .clone(); 433 // Initialize the ledger with the genesis block. 434 CurrentLedger::load(block, StorageMode::new_test(None)).unwrap() 435 }