/ node / bft / tests / common / primary.rs
primary.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use crate::common::{
 17      CurrentNetwork,
 18      TranslucentLedgerService,
 19      utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger},
 20  };
 21  
 22  use alphaos_account::Account;
 23  use alphaos_node_bft::{
 24      BFT,
 25      MAX_BATCH_DELAY_IN_MS,
 26      MEMORY_POOL_PORT,
 27      Primary,
 28      helpers::{PrimarySender, Storage, init_primary_channels},
 29  };
 30  use alphaos_node_bft_storage_service::BFTMemoryService;
 31  use alphaos_node_network::PeerPoolHandling;
 32  use alphaos_node_sync::BlockSync;
 33  use alphaos_utilities::SimpleStoppable;
 34  
 35  use alphavm::{
 36      console::{
 37          account::{Address, PrivateKey},
 38          algorithms::{BHP256, Hash},
 39          network::Network,
 40      },
 41      ledger::{
 42          Ledger,
 43          block::Block,
 44          committee::{Committee, MIN_VALIDATOR_STAKE},
 45          narwhal::BatchHeader,
 46          store::{ConsensusStore, helpers::memory::ConsensusMemory},
 47      },
 48      prelude::{CryptoRng, FromBytes, Rng, TestRng, ToBits, ToBytes, VM},
 49      utilities::to_bytes_le,
 50  };
 51  
 52  use alpha_std::StorageMode;
 53  use indexmap::IndexMap;
 54  use itertools::Itertools;
 55  #[cfg(feature = "locktick")]
 56  use locktick::parking_lot::Mutex;
 57  #[cfg(not(feature = "locktick"))]
 58  use parking_lot::Mutex;
 59  use std::{
 60      collections::HashMap,
 61      net::{IpAddr, Ipv4Addr, SocketAddr},
 62      ops::RangeBounds,
 63      sync::{Arc, OnceLock},
 64      time::Duration,
 65  };
 66  use tokio::{task::JoinHandle, time::sleep};
 67  use tracing::*;
 68  
 69  /// The configuration for the test network.
 70  #[derive(Clone, Copy, Debug)]
 71  pub struct TestNetworkConfig {
 72      /// The number of nodes to spin up.
 73      pub num_nodes: u16,
 74      /// If this is set to `true`, the BFT protocol is started on top of Narwhal.
 75      pub bft: bool,
 76      /// If this is set to `true`, all nodes are connected to each other (when they're first
 77      /// started).
 78      pub connect_all: bool,
 79      /// If `Some(i)` is set, the cannons will fire every `i` milliseconds.
 80      pub fire_transmissions: Option<u64>,
 81      /// The log level to use for the test.
 82      pub log_level: Option<u8>,
 83      /// If this is set to `true`, the number of connections is logged every 5 seconds.
 84      pub log_connections: bool,
 85  }
 86  
 87  /// A test network.
 88  #[derive(Clone)]
 89  pub struct TestNetwork {
 90      /// The configuration for the test network.
 91      pub config: TestNetworkConfig,
 92      /// A map of node IDs to validators in the network.
 93      pub validators: HashMap<u16, TestValidator>,
 94  }
 95  
 96  /// A test validator.
 97  #[derive(Clone)]
 98  pub struct TestValidator {
 99      /// The ID of the validator.
100      pub id: u16,
101      /// The primary instance. When the BFT is enabled this is a clone of the BFT primary.
102      pub primary: Primary<CurrentNetwork>,
103      /// The channel sender of the primary.
104      pub primary_sender: Option<PrimarySender<CurrentNetwork>>,
105      /// The BFT instance. This is only set if the BFT is enabled.
106      pub bft: OnceLock<BFT<CurrentNetwork>>,
107      /// The tokio handles of all long-running tasks associated with the validator (incl. cannons).
108      pub handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109  }
110  
111  pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
112  
113  impl TestValidator {
114      pub fn fire_transmissions(&mut self, interval_ms: u64) {
115          let solution_handle = fire_unconfirmed_solutions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);
116          let transaction_handle =
117              fire_unconfirmed_transactions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);
118  
119          self.handles.lock().push(solution_handle);
120          self.handles.lock().push(transaction_handle);
121      }
122  
123      pub fn log_connections(&mut self) {
124          let self_clone = self.clone();
125          self.handles.lock().push(tokio::task::spawn(async move {
126              loop {
127                  let connections = self_clone.primary.gateway().connected_peers();
128                  info!("{} connections", connections.len());
129                  for connection in connections {
130                      debug!("  {}", connection);
131                  }
132                  sleep(Duration::from_secs(5)).await;
133              }
134          }));
135      }
136  }
137  
138  impl TestNetwork {
139      // Creates a new test network with the given configuration.
140      pub fn new(config: TestNetworkConfig) -> Self {
141          let mut rng = TestRng::default();
142  
143          if let Some(log_level) = config.log_level {
144              initialize_logger(log_level);
145          }
146  
147          let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng);
148          let bonded_balances: IndexMap<_, _> = committee
149              .members()
150              .iter()
151              .map(|(address, (amount, _, _))| (*address, (*address, *address, *amount)))
152              .collect();
153          let gen_key = *accounts[0].private_key();
154          let public_balance_per_validator = (CurrentNetwork::STARTING_SUPPLY
155              - (config.num_nodes as u64) * MIN_VALIDATOR_STAKE)
156              / (config.num_nodes as u64);
157          let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new();
158          for account in accounts.iter() {
159              balances.insert(account.address(), public_balance_per_validator);
160          }
161  
162          let mut validators = HashMap::with_capacity(config.num_nodes as usize);
163          for (id, account) in accounts.into_iter().enumerate() {
164              let gen_ledger =
165                  genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), &mut rng);
166              let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new()));
167              let storage = Storage::new(
168                  ledger.clone(),
169                  Arc::new(BFTMemoryService::new()),
170                  BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64,
171              );
172              // Initialize the block synchronization logic.
173              let block_sync = Arc::new(BlockSync::new(ledger.clone()));
174              let (primary, bft) = if config.bft {
175                  let bft = BFT::<CurrentNetwork>::new(
176                      account,
177                      storage,
178                      ledger,
179                      block_sync,
180                      Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)),
181                      &[],
182                      false,
183                      StorageMode::new_test(None),
184                      None,
185                  )
186                  .unwrap();
187                  (bft.primary().clone(), Some(bft))
188              } else {
189                  let primary = Primary::<CurrentNetwork>::new(
190                      account,
191                      storage,
192                      ledger,
193                      block_sync,
194                      Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)),
195                      &[],
196                      false,
197                      StorageMode::new_test(None),
198                      None,
199                  )
200                  .unwrap();
201                  (primary, None)
202              };
203  
204              let test_validator = TestValidator {
205                  id: id as u16,
206                  primary,
207                  primary_sender: None,
208                  bft: OnceLock::new(),
209                  handles: Default::default(),
210              };
211              if let Some(bft) = bft {
212                  assert!(test_validator.bft.set(bft).is_ok());
213              }
214              validators.insert(id as u16, test_validator);
215          }
216  
217          Self { config, validators }
218      }
219  
220      // Starts each node in the network.
221      pub async fn start(&mut self) {
222          for validator in self.validators.values_mut() {
223              let (primary_sender, primary_receiver) = init_primary_channels();
224              validator.primary_sender = Some(primary_sender.clone());
225  
226              // let ledger_service = validator.primary.ledger().clone();
227              // let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service);
228              // sync.try_block_sync(validator.primary.gateway()).await.unwrap();
229  
230              if let Some(bft) = validator.bft.get_mut() {
231                  // Setup the channels and start the bft.
232                  bft.run(None, None, primary_sender, primary_receiver).await.unwrap();
233              } else {
234                  // Setup the channels and start the primary.
235                  validator.primary.run(None, None, primary_sender, primary_receiver).await.unwrap();
236              }
237  
238              if let Some(interval_ms) = self.config.fire_transmissions {
239                  validator.fire_transmissions(interval_ms);
240              }
241  
242              if self.config.log_connections {
243                  validator.log_connections();
244              }
245          }
246  
247          if self.config.connect_all {
248              self.connect_all().await;
249          }
250      }
251  
252      // Starts the solution and transaction cannons for node.
253      pub fn fire_transmissions_at(&mut self, id: u16, interval_ms: u64) {
254          self.validators.get_mut(&id).unwrap().fire_transmissions(interval_ms);
255      }
256  
257      // Connects a node to another node.
258      pub async fn connect_validators(&self, first_id: u16, second_id: u16) {
259          let first_validator = self.validators.get(&first_id).unwrap();
260          let second_validator_ip = self.validators.get(&second_id).unwrap().primary.gateway().local_ip();
261          first_validator.primary.gateway().connect(second_validator_ip);
262          // Give the connection time to be established.
263          sleep(Duration::from_millis(100)).await;
264      }
265  
266      // Connects all nodes to each other.
267      pub async fn connect_all(&self) {
268          for (validator, other_validator) in self.validators.values().tuple_combinations() {
269              // Connect to the node.
270              let ip = other_validator.primary.gateway().local_ip();
271              validator.primary.gateway().connect(ip);
272              // Give the connection time to be established.
273              tokio::time::sleep(std::time::Duration::from_millis(10)).await;
274          }
275      }
276  
277      // Connects a specific node to all other nodes.
278      pub async fn connect_one(&self, id: u16) {
279          let target_validator = self.validators.get(&id).unwrap();
280          let target_ip = target_validator.primary.gateway().local_ip();
281          for validator in self.validators.values() {
282              if validator.id != id {
283                  // Connect to the node.
284                  validator.primary.gateway().connect(target_ip);
285                  // Give the connection time to be established.
286                  tokio::time::sleep(std::time::Duration::from_millis(10)).await;
287              }
288          }
289      }
290  
291      // Disconnects N nodes from all other nodes.
292      pub async fn disconnect(&self, num_nodes: u16) {
293          for validator in self.validators.values().take(num_nodes as usize) {
294              for peer_ip in validator.primary.gateway().connected_peers().iter() {
295                  validator.primary.gateway().disconnect(*peer_ip);
296              }
297          }
298  
299          // Give the connections time to be closed.
300          sleep(Duration::from_millis(100)).await;
301      }
302  
303      // Disconnects a specific node from all other nodes.
304      pub async fn disconnect_one(&self, id: u16) {
305          let target_validator = self.validators.get(&id).unwrap();
306          for peer_ip in target_validator.primary.gateway().connected_peers().iter() {
307              target_validator.primary.gateway().disconnect(*peer_ip);
308          }
309  
310          // Give the connections time to be closed.
311          sleep(Duration::from_millis(100)).await;
312      }
313  
314      // Checks if a Byzantine fault-tolerant quorum of validators has reached the given round.
315      // Assuming `N = 3f + 1 + k`, where `0 <= k < 3`, and '/' denotes integer division,
316      // then `N - (N-1)/3 = 2N/3 + 1 = 2f + 1 + (2k+2)/3 = 2f + 1 + k = N - f`.
317      pub fn is_round_reached(&self, round: u64) -> bool {
318          let quorum_threshold = self.validators.len() - (self.validators.len() - 1) / 3;
319          self.validators.values().filter(|v| v.primary.current_round() >= round).count() >= quorum_threshold
320      }
321  
322      // Checks if all the nodes have stopped progressing.
323      pub async fn is_halted(&self) -> bool {
324          let halt_round = self.validators.values().map(|v| v.primary.current_round()).max().unwrap();
325          sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS * 2)).await;
326          self.validators.values().all(|v| v.primary.current_round() <= halt_round)
327      }
328  
329      // Checks if the committee is coherent in storage for all nodes (not quorum) over a range of
330      // rounds.
331      pub fn is_committee_coherent<T>(&self, rounds_range: T) -> bool
332      where
333          T: RangeBounds<u64> + IntoIterator<Item = u64>,
334      {
335          for round in rounds_range.into_iter() {
336              let mut last: Option<Committee<CurrentNetwork>> = None;
337              for validator in self.validators.values() {
338                  // Round might be in future, in case validator didn't get to it.
339                  if let Ok(committee) = validator.primary.ledger().get_committee_for_round(round) {
340                      match last.clone() {
341                          None => last = Some(committee),
342                          Some(first) => {
343                              if first != committee {
344                                  return false;
345                              }
346                          }
347                      }
348                  }
349              }
350          }
351  
352          true
353      }
354  
355      // Checks if the certificates are coherent in storage for all nodes (not quorum) over a range
356      // of rounds.
357      pub fn is_certificate_round_coherent<T>(&self, rounds_range: T) -> bool
358      where
359          T: RangeBounds<u64> + IntoIterator<Item = u64>,
360      {
361          rounds_range.into_iter().all(|round| {
362              self.validators.values().map(|v| v.primary.storage().get_certificates_for_round(round)).dedup().count() == 1
363          })
364      }
365  }
366  
367  // Initializes a new test committee.
368  pub fn new_test_committee(n: u16, rng: &mut TestRng) -> (Vec<Account<CurrentNetwork>>, Committee<CurrentNetwork>) {
369      let mut accounts = Vec::with_capacity(n as usize);
370      let mut members = IndexMap::with_capacity(n as usize);
371      for i in 0..n {
372          // Sample the account.
373          let account = Account::new(rng).unwrap();
374          info!("Validator {}: {}", i, account.address());
375  
376          members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
377          accounts.push(account);
378      }
379      // Initialize the committee.
380      let committee = Committee::<CurrentNetwork>::new(0u64, members).unwrap();
381  
382      (accounts, committee)
383  }
384  
385  fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> {
386      static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new();
387      CACHE.get_or_init(|| Mutex::new(HashMap::new()))
388  }
389  
390  pub fn genesis_block(
391      genesis_private_key: PrivateKey<CurrentNetwork>,
392      committee: Committee<CurrentNetwork>,
393      public_balances: IndexMap<Address<CurrentNetwork>, u64>,
394      bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>,
395      rng: &mut (impl Rng + CryptoRng),
396  ) -> Block<CurrentNetwork> {
397      // Initialize the store.
398      let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap();
399      // Initialize a new VM.
400      let vm = VM::from(store).unwrap();
401      // Initialize the genesis block.
402      vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap()
403  }
404  
405  pub fn genesis_ledger(
406      genesis_private_key: PrivateKey<CurrentNetwork>,
407      committee: Committee<CurrentNetwork>,
408      public_balances: IndexMap<Address<CurrentNetwork>, u64>,
409      bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>,
410      rng: &mut (impl Rng + CryptoRng),
411  ) -> CurrentLedger {
412      let cache_key =
413          to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap();
414      // Initialize the genesis block on the first call; other callers
415      // will wait for it on the mutex.
416      let block = genesis_cache()
417          .lock()
418          .entry(cache_key.clone())
419          .or_insert_with(|| {
420              let hasher = BHP256::<CurrentNetwork>::setup("aleo.dev.block").unwrap();
421              let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis";
422              let file_path = std::env::temp_dir().join(file_name);
423              if file_path.exists() {
424                  let buffer = std::fs::read(file_path).unwrap();
425                  return Block::from_bytes_le(&buffer).unwrap();
426              }
427  
428              let block = genesis_block(genesis_private_key, committee, public_balances, bonded_balances, rng);
429              std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap();
430              block
431          })
432          .clone();
433      // Initialize the ledger with the genesis block.
434      CurrentLedger::load(block, StorageMode::new_test(None)).unwrap()
435  }