/ node / bft / tests / common / primary.rs
primary.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use crate::common::{
 20      utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger},
 21      CurrentNetwork,
 22      TranslucentLedgerService,
 23  };
 24  
 25  use alphaos_account::Account;
 26  use alphaos_node_bft::{
 27      helpers::{init_primary_channels, PrimarySender, Storage},
 28      Primary,
 29      BFT,
 30      MAX_BATCH_DELAY_IN_MS,
 31      MEMORY_POOL_PORT,
 32  };
 33  use alphaos_node_bft_storage_service::BFTMemoryService;
 34  use alphaos_node_network::PeerPoolHandling;
 35  use alphaos_node_sync::BlockSync;
 36  use alphaos_utilities::SimpleStoppable;
 37  
 38  use alphavm::{
 39      console::{
 40          account::{Address, PrivateKey},
 41          algorithms::{Hash, BHP256},
 42          network::Network,
 43      },
 44      ledger::{
 45          block::Block,
 46          committee::{Committee, MIN_VALIDATOR_STAKE},
 47          narwhal::BatchHeader,
 48          store::{helpers::memory::ConsensusMemory, ConsensusStore},
 49          Ledger,
 50      },
 51      prelude::{CryptoRng, FromBytes, Rng, TestRng, ToBits, ToBytes, VM},
 52      utilities::to_bytes_le,
 53  };
 54  
 55  use alphastd::StorageMode;
 56  use indexmap::IndexMap;
 57  use itertools::Itertools;
 58  #[cfg(feature = "locktick")]
 59  use locktick::parking_lot::Mutex;
 60  #[cfg(not(feature = "locktick"))]
 61  use parking_lot::Mutex;
 62  use std::{
 63      collections::HashMap,
 64      net::{IpAddr, Ipv4Addr, SocketAddr},
 65      ops::RangeBounds,
 66      sync::{Arc, OnceLock},
 67      time::Duration,
 68  };
 69  use tokio::{task::JoinHandle, time::sleep};
 70  use tracing::*;
 71  
 72  /// The configuration for the test network.
 73  #[derive(Clone, Copy, Debug)]
 74  pub struct TestNetworkConfig {
 75      /// The number of nodes to spin up.
 76      pub num_nodes: u16,
 77      /// If this is set to `true`, the BFT protocol is started on top of Narwhal.
 78      pub bft: bool,
 79      /// If this is set to `true`, all nodes are connected to each other (when they're first
 80      /// started).
 81      pub connect_all: bool,
 82      /// If `Some(i)` is set, the cannons will fire every `i` milliseconds.
 83      pub fire_transmissions: Option<u64>,
 84      /// The log level to use for the test.
 85      pub log_level: Option<u8>,
 86      /// If this is set to `true`, the number of connections is logged every 5 seconds.
 87      pub log_connections: bool,
 88  }
 89  
 90  /// A test network.
 91  #[derive(Clone)]
 92  pub struct TestNetwork {
 93      /// The configuration for the test network.
 94      pub config: TestNetworkConfig,
 95      /// A map of node IDs to validators in the network.
 96      pub validators: HashMap<u16, TestValidator>,
 97  }
 98  
 99  /// A test validator.
100  #[derive(Clone)]
101  pub struct TestValidator {
102      /// The ID of the validator.
103      pub id: u16,
104      /// The primary instance. When the BFT is enabled this is a clone of the BFT primary.
105      pub primary: Primary<CurrentNetwork>,
106      /// The channel sender of the primary.
107      pub primary_sender: Option<PrimarySender<CurrentNetwork>>,
108      /// The BFT instance. This is only set if the BFT is enabled.
109      pub bft: OnceLock<BFT<CurrentNetwork>>,
110      /// The tokio handles of all long-running tasks associated with the validator (incl. cannons).
111      pub handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
112  }
113  
114  pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
115  
116  impl TestValidator {
117      pub fn fire_transmissions(&mut self, interval_ms: u64) {
118          let solution_handle = fire_unconfirmed_solutions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);
119          let transaction_handle =
120              fire_unconfirmed_transactions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);
121  
122          self.handles.lock().push(solution_handle);
123          self.handles.lock().push(transaction_handle);
124      }
125  
126      pub fn log_connections(&mut self) {
127          let self_clone = self.clone();
128          self.handles.lock().push(tokio::task::spawn(async move {
129              loop {
130                  let connections = self_clone.primary.gateway().connected_peers();
131                  info!("{} connections", connections.len());
132                  for connection in connections {
133                      debug!("  {}", connection);
134                  }
135                  sleep(Duration::from_secs(5)).await;
136              }
137          }));
138      }
139  }
140  
141  impl TestNetwork {
142      // Creates a new test network with the given configuration.
143      pub fn new(config: TestNetworkConfig) -> Self {
144          let mut rng = TestRng::default();
145  
146          if let Some(log_level) = config.log_level {
147              initialize_logger(log_level);
148          }
149  
150          let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng);
151          let bonded_balances: IndexMap<_, _> = committee
152              .members()
153              .iter()
154              .map(|(address, (amount, _, _))| (*address, (*address, *address, *amount)))
155              .collect();
156          let gen_key = *accounts[0].private_key();
157          let public_balance_per_validator = (CurrentNetwork::STARTING_SUPPLY
158              - (config.num_nodes as u64) * MIN_VALIDATOR_STAKE)
159              / (config.num_nodes as u64);
160          let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new();
161          for account in accounts.iter() {
162              balances.insert(account.address(), public_balance_per_validator);
163          }
164  
165          let mut validators = HashMap::with_capacity(config.num_nodes as usize);
166          for (id, account) in accounts.into_iter().enumerate() {
167              let gen_ledger =
168                  genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), &mut rng);
169              let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new()));
170              let storage = Storage::new(
171                  ledger.clone(),
172                  Arc::new(BFTMemoryService::new()),
173                  BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64,
174              );
175              // Initialize the block synchronization logic.
176              let block_sync = Arc::new(BlockSync::new(ledger.clone()));
177              let (primary, bft) = if config.bft {
178                  let bft = BFT::<CurrentNetwork>::new(
179                      account,
180                      storage,
181                      ledger,
182                      block_sync,
183                      Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)),
184                      &[],
185                      false,
186                      StorageMode::new_test(None),
187                      None,
188                  )
189                  .unwrap();
190                  (bft.primary().clone(), Some(bft))
191              } else {
192                  let primary = Primary::<CurrentNetwork>::new(
193                      account,
194                      storage,
195                      ledger,
196                      block_sync,
197                      Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)),
198                      &[],
199                      false,
200                      StorageMode::new_test(None),
201                      None,
202                  )
203                  .unwrap();
204                  (primary, None)
205              };
206  
207              let test_validator = TestValidator {
208                  id: id as u16,
209                  primary,
210                  primary_sender: None,
211                  bft: OnceLock::new(),
212                  handles: Default::default(),
213              };
214              if let Some(bft) = bft {
215                  assert!(test_validator.bft.set(bft).is_ok());
216              }
217              validators.insert(id as u16, test_validator);
218          }
219  
220          Self { config, validators }
221      }
222  
223      // Starts each node in the network.
224      pub async fn start(&mut self) {
225          for validator in self.validators.values_mut() {
226              let (primary_sender, primary_receiver) = init_primary_channels();
227              validator.primary_sender = Some(primary_sender.clone());
228  
229              // let ledger_service = validator.primary.ledger().clone();
230              // let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service);
231              // sync.try_block_sync(validator.primary.gateway()).await.unwrap();
232  
233              if let Some(bft) = validator.bft.get_mut() {
234                  // Setup the channels and start the bft.
235                  bft.run(None, None, primary_sender, primary_receiver).await.unwrap();
236              } else {
237                  // Setup the channels and start the primary.
238                  validator.primary.run(None, None, primary_sender, primary_receiver).await.unwrap();
239              }
240  
241              if let Some(interval_ms) = self.config.fire_transmissions {
242                  validator.fire_transmissions(interval_ms);
243              }
244  
245              if self.config.log_connections {
246                  validator.log_connections();
247              }
248          }
249  
250          if self.config.connect_all {
251              self.connect_all().await;
252          }
253      }
254  
255      // Starts the solution and transaction cannons for node.
256      pub fn fire_transmissions_at(&mut self, id: u16, interval_ms: u64) {
257          self.validators.get_mut(&id).unwrap().fire_transmissions(interval_ms);
258      }
259  
260      // Connects a node to another node.
261      pub async fn connect_validators(&self, first_id: u16, second_id: u16) {
262          let first_validator = self.validators.get(&first_id).unwrap();
263          let second_validator_ip = self.validators.get(&second_id).unwrap().primary.gateway().local_ip();
264          first_validator.primary.gateway().connect(second_validator_ip);
265          // Give the connection time to be established.
266          sleep(Duration::from_millis(100)).await;
267      }
268  
269      // Connects all nodes to each other.
270      pub async fn connect_all(&self) {
271          for (validator, other_validator) in self.validators.values().tuple_combinations() {
272              // Connect to the node.
273              let ip = other_validator.primary.gateway().local_ip();
274              validator.primary.gateway().connect(ip);
275              // Give the connection time to be established.
276              tokio::time::sleep(std::time::Duration::from_millis(10)).await;
277          }
278      }
279  
280      // Connects a specific node to all other nodes.
281      pub async fn connect_one(&self, id: u16) {
282          let target_validator = self.validators.get(&id).unwrap();
283          let target_ip = target_validator.primary.gateway().local_ip();
284          for validator in self.validators.values() {
285              if validator.id != id {
286                  // Connect to the node.
287                  validator.primary.gateway().connect(target_ip);
288                  // Give the connection time to be established.
289                  tokio::time::sleep(std::time::Duration::from_millis(10)).await;
290              }
291          }
292      }
293  
294      // Disconnects N nodes from all other nodes.
295      pub async fn disconnect(&self, num_nodes: u16) {
296          for validator in self.validators.values().take(num_nodes as usize) {
297              for peer_ip in validator.primary.gateway().connected_peers().iter() {
298                  validator.primary.gateway().disconnect(*peer_ip);
299              }
300          }
301  
302          // Give the connections time to be closed.
303          sleep(Duration::from_millis(100)).await;
304      }
305  
306      // Disconnects a specific node from all other nodes.
307      pub async fn disconnect_one(&self, id: u16) {
308          let target_validator = self.validators.get(&id).unwrap();
309          for peer_ip in target_validator.primary.gateway().connected_peers().iter() {
310              target_validator.primary.gateway().disconnect(*peer_ip);
311          }
312  
313          // Give the connections time to be closed.
314          sleep(Duration::from_millis(100)).await;
315      }
316  
317      // Checks if a Byzantine fault-tolerant quorum of validators has reached the given round.
318      // Assuming `N = 3f + 1 + k`, where `0 <= k < 3`, and '/' denotes integer division,
319      // then `N - (N-1)/3 = 2N/3 + 1 = 2f + 1 + (2k+2)/3 = 2f + 1 + k = N - f`.
320      pub fn is_round_reached(&self, round: u64) -> bool {
321          let quorum_threshold = self.validators.len() - (self.validators.len() - 1) / 3;
322          self.validators.values().filter(|v| v.primary.current_round() >= round).count() >= quorum_threshold
323      }
324  
325      // Checks if all the nodes have stopped progressing.
326      pub async fn is_halted(&self) -> bool {
327          let halt_round = self.validators.values().map(|v| v.primary.current_round()).max().unwrap();
328          sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS * 2)).await;
329          self.validators.values().all(|v| v.primary.current_round() <= halt_round)
330      }
331  
332      // Checks if the committee is coherent in storage for all nodes (not quorum) over a range of
333      // rounds.
334      pub fn is_committee_coherent<T>(&self, rounds_range: T) -> bool
335      where
336          T: RangeBounds<u64> + IntoIterator<Item = u64>,
337      {
338          for round in rounds_range.into_iter() {
339              let mut last: Option<Committee<CurrentNetwork>> = None;
340              for validator in self.validators.values() {
341                  // Round might be in future, in case validator didn't get to it.
342                  if let Ok(committee) = validator.primary.ledger().get_committee_for_round(round) {
343                      match last.clone() {
344                          None => last = Some(committee),
345                          Some(first) => {
346                              if first != committee {
347                                  return false;
348                              }
349                          }
350                      }
351                  }
352              }
353          }
354  
355          true
356      }
357  
358      // Checks if the certificates are coherent in storage for all nodes (not quorum) over a range
359      // of rounds.
360      pub fn is_certificate_round_coherent<T>(&self, rounds_range: T) -> bool
361      where
362          T: RangeBounds<u64> + IntoIterator<Item = u64>,
363      {
364          rounds_range.into_iter().all(|round| {
365              self.validators.values().map(|v| v.primary.storage().get_certificates_for_round(round)).dedup().count() == 1
366          })
367      }
368  }
369  
370  // Initializes a new test committee.
371  pub fn new_test_committee(n: u16, rng: &mut TestRng) -> (Vec<Account<CurrentNetwork>>, Committee<CurrentNetwork>) {
372      let mut accounts = Vec::with_capacity(n as usize);
373      let mut members = IndexMap::with_capacity(n as usize);
374      for i in 0..n {
375          // Sample the account.
376          let account = Account::new(rng).unwrap();
377          info!("Validator {}: {}", i, account.address());
378  
379          members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
380          accounts.push(account);
381      }
382      // Initialize the committee.
383      let committee = Committee::<CurrentNetwork>::new(0u64, members).unwrap();
384  
385      (accounts, committee)
386  }
387  
388  fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> {
389      static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new();
390      CACHE.get_or_init(|| Mutex::new(HashMap::new()))
391  }
392  
393  pub fn genesis_block(
394      genesis_private_key: PrivateKey<CurrentNetwork>,
395      committee: Committee<CurrentNetwork>,
396      public_balances: IndexMap<Address<CurrentNetwork>, u64>,
397      bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>,
398      rng: &mut (impl Rng + CryptoRng),
399  ) -> Block<CurrentNetwork> {
400      // Initialize the store.
401      let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap();
402      // Initialize a new VM.
403      let vm = VM::from(store).unwrap();
404      // Initialize the genesis block.
405      vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap()
406  }
407  
408  pub fn genesis_ledger(
409      genesis_private_key: PrivateKey<CurrentNetwork>,
410      committee: Committee<CurrentNetwork>,
411      public_balances: IndexMap<Address<CurrentNetwork>, u64>,
412      bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>,
413      rng: &mut (impl Rng + CryptoRng),
414  ) -> CurrentLedger {
415      let cache_key =
416          to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap();
417      // Initialize the genesis block on the first call; other callers
418      // will wait for it on the mutex.
419      let block = genesis_cache()
420          .lock()
421          .entry(cache_key.clone())
422          .or_insert_with(|| {
423              let hasher = BHP256::<CurrentNetwork>::setup("alpha.dev.block").unwrap();
424              let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis";
425              let file_path = std::env::temp_dir().join(file_name);
426              if file_path.exists() {
427                  let buffer = std::fs::read(file_path).unwrap();
428                  return Block::from_bytes_le(&buffer).unwrap();
429              }
430  
431              let block = genesis_block(genesis_private_key, committee, public_balances, bonded_balances, rng);
432              std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap();
433              block
434          })
435          .clone();
436      // Initialize the ledger with the genesis block.
437      CurrentLedger::load(block, StorageMode::new_test(None)).unwrap()
438  }