/ node / bft / examples / simple_node.rs
simple_node.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  #[macro_use]
 17  extern crate tracing;
 18  
 19  #[cfg(feature = "metrics")]
 20  extern crate alphaos_node_metrics as metrics;
 21  
 22  use alphaos_account::Account;
 23  use alphaos_node_bft::{
 24      BFT,
 25      MEMORY_POOL_PORT,
 26      Primary,
 27      helpers::{ConsensusReceiver, PrimarySender, Storage, init_consensus_channels, init_primary_channels},
 28  };
 29  use alphaos_node_bft_ledger_service::TranslucentLedgerService;
 30  use alphaos_node_bft_storage_service::BFTMemoryService;
 31  use alphaos_node_sync::BlockSync;
 32  use alphaos_utilities::SimpleStoppable;
 33  
 34  use alpha_std::StorageMode;
 35  use alphavm::{
 36      console::{account::PrivateKey, algorithms::BHP256, types::Address},
 37      ledger::{
 38          Block,
 39          Ledger,
 40          block::Transaction,
 41          committee::{Committee, MIN_VALIDATOR_STAKE},
 42          narwhal::{BatchHeader, Data},
 43          puzzle::{Solution, SolutionID},
 44          store::{ConsensusStore, helpers::memory::ConsensusMemory},
 45      },
 46      prelude::{Field, Hash, Network, Uniform, VM},
 47      utilities::{FromBytes, TestRng, ToBits, ToBytes, to_bytes_le},
 48  };
 49  
 50  use ::bytes::Bytes;
 51  use anyhow::{Error, Result, anyhow, ensure};
 52  use axum::{
 53      Router,
 54      extract::{Path, State},
 55      http::StatusCode,
 56      response::{IntoResponse, Response},
 57      routing::get,
 58  };
 59  
 60  use axum_extra::response::ErasedJson;
 61  use clap::{Parser, ValueEnum};
 62  use indexmap::IndexMap;
 63  use rand::{CryptoRng, Rng, SeedableRng};
 64  use std::{
 65      collections::HashMap,
 66      net::{IpAddr, Ipv4Addr, SocketAddr},
 67      path::PathBuf,
 68      str::FromStr,
 69      sync::{Arc, Mutex, OnceLock},
 70  };
 71  use tokio::{net::TcpListener, sync::oneshot};
 72  use tracing_subscriber::{
 73      layer::{Layer, SubscriberExt},
 74      util::SubscriberInitExt,
 75  };
 76  
 77  type CurrentNetwork = alphavm::prelude::MainnetV0;
 78  
 79  /**************************************************************************************************/
 80  
 81  /// Initializes the logger.
 82  pub fn initialize_logger(verbosity: u8) {
 83      let verbosity_str = match verbosity {
 84          0 => "info",
 85          1 => "debug",
 86          2..=4 => "trace",
 87          _ => "info",
 88      };
 89  
 90      // Filter out undesirable logs. (unfortunately EnvFilter cannot be cloned)
 91      let filter = tracing_subscriber::EnvFilter::from_str(verbosity_str)
 92          .unwrap()
 93          .add_directive("mio=off".parse().unwrap())
 94          .add_directive("tokio_util=off".parse().unwrap())
 95          .add_directive("hyper=off".parse().unwrap())
 96          .add_directive("reqwest=off".parse().unwrap())
 97          .add_directive("want=off".parse().unwrap())
 98          .add_directive("h2=off".parse().unwrap());
 99  
100      let filter = if verbosity > 3 {
101          filter.add_directive("alphaos_node_bft::gateway=trace".parse().unwrap())
102      } else {
103          filter.add_directive("alphaos_node_bft::gateway=off".parse().unwrap())
104      };
105  
106      let filter = if verbosity > 4 {
107          filter.add_directive("alphaos_node_tcp=trace".parse().unwrap())
108      } else {
109          filter.add_directive("alphaos_node_tcp=off".parse().unwrap())
110      };
111  
112      // Initialize tracing.
113      let _ = tracing_subscriber::registry()
114          .with(tracing_subscriber::fmt::Layer::default().with_target(verbosity > 2).with_filter(filter))
115          .try_init();
116  }
117  
118  /**************************************************************************************************/
119  
120  /// Starts the BFT instance.
121  pub async fn start_bft(
122      node_id: u16,
123      num_nodes: u16,
124      peers: HashMap<u16, SocketAddr>,
125  ) -> Result<(BFT<CurrentNetwork>, PrimarySender<CurrentNetwork>)> {
126      // Initialize the primary channels.
127      let (sender, receiver) = init_primary_channels();
128      // Initialize the components.
129      let (committee, account) = initialize_components(node_id, num_nodes)?;
130      // Initialize the translucent ledger service.
131      let ledger = create_ledger(&account, num_nodes, committee, node_id);
132      // Initialize the storage.
133      let storage = Storage::new(
134          ledger.clone(),
135          Arc::new(BFTMemoryService::new()),
136          BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64,
137      );
138      // Initialize the gateway IP and storage mode.
139      let ip = match peers.get(&node_id) {
140          Some(ip) => Some(*ip),
141          None => Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + node_id)),
142      };
143      let storage_mode = StorageMode::new_test(None);
144      // Initialize the trusted validators.
145      let trusted_validators = trusted_validators(node_id, num_nodes, peers);
146      let trusted_peers_only = false;
147      // Initialize the consensus channels.
148      let (consensus_sender, consensus_receiver) = init_consensus_channels::<CurrentNetwork>();
149      // Initialize the consensus receiver handler.
150      consensus_handler(consensus_receiver);
151      // Initialize the BFT instance.
152      let block_sync = Arc::new(BlockSync::new(ledger.clone()));
153      let mut bft = BFT::<CurrentNetwork>::new(
154          account,
155          storage,
156          ledger,
157          block_sync,
158          ip,
159          &trusted_validators,
160          trusted_peers_only,
161          storage_mode,
162          None,
163      )?;
164      // Run the BFT instance.
165      bft.run(None, Some(consensus_sender), sender.clone(), receiver).await?;
166      // Retrieve the BFT's primary.
167      let primary = bft.primary();
168      // Handle OS signals.
169      handle_signals(primary);
170      // Return the BFT instance.
171      Ok((bft, sender))
172  }
173  
174  /// Starts the primary instance.
175  pub async fn start_primary(
176      node_id: u16,
177      num_nodes: u16,
178      peers: HashMap<u16, SocketAddr>,
179  ) -> Result<(Primary<CurrentNetwork>, PrimarySender<CurrentNetwork>)> {
180      // Initialize the primary channels.
181      let (sender, receiver) = init_primary_channels();
182      // Initialize the components.
183      let (committee, account) = initialize_components(node_id, num_nodes)?;
184      // Initialize the translucent ledger service.
185      let ledger = create_ledger(&account, num_nodes, committee, node_id);
186      // Initialize the storage.
187      let storage = Storage::new(
188          ledger.clone(),
189          Arc::new(BFTMemoryService::new()),
190          BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64,
191      );
192      // Initialize the gateway IP and storage mode.
193      let ip = match peers.get(&node_id) {
194          Some(ip) => Some(*ip),
195          None => Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + node_id)),
196      };
197      let storage_mode = StorageMode::new_test(None);
198      // Initialize the trusted validators.
199      let trusted_validators = trusted_validators(node_id, num_nodes, peers);
200      let trusted_peers_only = false;
201      // Initialize the primary instance.
202      let block_sync = Arc::new(BlockSync::new(ledger.clone()));
203      let mut primary = Primary::<CurrentNetwork>::new(
204          account,
205          storage,
206          ledger,
207          block_sync,
208          ip,
209          &trusted_validators,
210          trusted_peers_only,
211          storage_mode,
212          None,
213      )?;
214      // Run the primary instance.
215      primary.run(None, None, sender.clone(), receiver).await?;
216      // Handle OS signals.
217      handle_signals(&primary);
218      // Return the primary instance.
219      Ok((primary, sender))
220  }
221  
222  /// Initialize the translucent ledger service.
223  fn create_ledger(
224      account: &Account<CurrentNetwork>,
225      num_nodes: u16,
226      committee: Committee<alphavm::prelude::MainnetV0>,
227      node_id: u16,
228  ) -> Arc<TranslucentLedgerService<alphavm::prelude::MainnetV0, ConsensusMemory<alphavm::prelude::MainnetV0>>> {
229      let gen_key = account.private_key();
230      let public_balance_per_validator =
231          (CurrentNetwork::STARTING_SUPPLY - (num_nodes as u64) * MIN_VALIDATOR_STAKE) / (num_nodes as u64);
232      let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new();
233      for address in committee.members().keys() {
234          balances.insert(*address, public_balance_per_validator);
235      }
236      let mut rng = TestRng::default();
237      let gen_ledger = genesis_ledger(*gen_key, committee.clone(), balances.clone(), node_id, &mut rng);
238      Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new()))
239  }
240  
241  pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
242  
243  fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> {
244      static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new();
245      CACHE.get_or_init(|| Mutex::new(HashMap::new()))
246  }
247  
248  fn genesis_block(
249      genesis_private_key: PrivateKey<CurrentNetwork>,
250      committee: Committee<CurrentNetwork>,
251      public_balances: IndexMap<Address<CurrentNetwork>, u64>,
252      rng: &mut (impl Rng + CryptoRng),
253  ) -> Block<CurrentNetwork> {
254      // Initialize the store.
255      let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap();
256      // Initialize a new VM.
257      let vm = VM::from(store).unwrap();
258      // Initialize the genesis block.
259      let bonded_balances: IndexMap<_, _> =
260          committee.members().iter().map(|(address, (amount, _, _))| (*address, (*address, *address, *amount))).collect();
261      vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap()
262  }
263  
264  fn genesis_ledger(
265      genesis_private_key: PrivateKey<CurrentNetwork>,
266      committee: Committee<CurrentNetwork>,
267      public_balances: IndexMap<Address<CurrentNetwork>, u64>,
268      node_id: u16,
269      rng: &mut (impl Rng + CryptoRng),
270  ) -> CurrentLedger {
271      let cache_key =
272          to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap();
273      // Initialize the genesis block on the first call; other callers
274      // will wait for it on the mutex.
275      let block = genesis_cache()
276          .lock()
277          .unwrap()
278          .entry(cache_key.clone())
279          .or_insert_with(|| {
280              let hasher = BHP256::<CurrentNetwork>::setup("aleo.dev.block").unwrap();
281              let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis";
282              let file_path = std::env::temp_dir().join(file_name);
283              if file_path.exists() {
284                  let buffer = std::fs::read(file_path).unwrap();
285                  return Block::from_bytes_le(&buffer).unwrap();
286              }
287  
288              let block = genesis_block(genesis_private_key, committee, public_balances, rng);
289              std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap();
290              block
291          })
292          .clone();
293      // Initialize the ledger with the genesis block.
294      CurrentLedger::load(block, alpha_std::StorageMode::Development(node_id)).unwrap()
295  }
296  
297  /// Initializes the components of the node.
298  fn initialize_components(node_id: u16, num_nodes: u16) -> Result<(Committee<CurrentNetwork>, Account<CurrentNetwork>)> {
299      // Ensure that the node ID is valid.
300      ensure!(node_id < num_nodes, "Node ID {node_id} must be less than {num_nodes}");
301  
302      // Sample a account.
303      let account = Account::new(&mut rand_chacha::ChaChaRng::seed_from_u64(node_id as u64))?;
304      println!("\n{account}\n");
305  
306      // Initialize a map for the committee members.
307      let mut members = IndexMap::with_capacity(num_nodes as usize);
308      // Add the validators as members.
309      for i in 0..num_nodes {
310          // Sample the account.
311          let account = Account::new(&mut rand_chacha::ChaChaRng::seed_from_u64(i as u64))?;
312          // Add the validator.
313          members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, i as u8));
314          println!("  Validator {}: {}", i, account.address());
315      }
316      println!();
317  
318      // Initialize the committee.
319      let committee = Committee::<CurrentNetwork>::new(0u64, members)?;
320      // Return the committee and account.
321      Ok((committee, account))
322  }
323  
324  /// Handles the consensus receiver.
325  fn consensus_handler(receiver: ConsensusReceiver<CurrentNetwork>) {
326      let ConsensusReceiver { mut rx_consensus_subdag } = receiver;
327  
328      tokio::task::spawn(async move {
329          while let Some((subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
330              // Determine the amount of time to sleep for the subdag.
331              let subdag_ms = subdag.values().flatten().count();
332              // Determine the amount of time to sleep for the transmissions.
333              let transmissions_ms = transmissions.len() * 25;
334              // Add a constant delay.
335              let constant_ms = 100;
336              // Compute the total amount of time to sleep.
337              let sleep_ms = (subdag_ms + transmissions_ms + constant_ms) as u64;
338              // Sleep for the determined amount of time.
339              tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
340              // Call the callback.
341              callback.send(Ok(())).ok();
342          }
343      });
344  }
345  
346  /// Returns the trusted validators.
347  fn trusted_validators(node_id: u16, num_nodes: u16, peers: HashMap<u16, SocketAddr>) -> Vec<SocketAddr> {
348      // Initialize a vector for the trusted nodes.
349      let mut trusted = Vec::with_capacity(num_nodes as usize);
350      // Iterate through the nodes.
351      for i in 0..num_nodes {
352          // Initialize the gateway IP.
353          let ip = match peers.get(&i) {
354              Some(ip) => *ip,
355              None => SocketAddr::from_str(&format!("127.0.0.1:{}", MEMORY_POOL_PORT + i)).unwrap(),
356          };
357          // If the node is not the current node, add it to the trusted nodes.
358          if i != node_id {
359              trusted.push(ip);
360          }
361      }
362      // Return the trusted nodes.
363      trusted
364  }
365  
366  /// Handles OS signals for the node to intercept and perform a clean shutdown.
367  /// Note: Only Ctrl-C is supported; it should work on both Unix-family systems and Windows.
368  fn handle_signals(primary: &Primary<CurrentNetwork>) {
369      let node = primary.clone();
370      tokio::task::spawn(async move {
371          match tokio::signal::ctrl_c().await {
372              Ok(()) => {
373                  node.shut_down().await;
374                  std::process::exit(0);
375              }
376              Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error),
377          }
378      });
379  }
380  
381  /**************************************************************************************************/
382  
383  /// Fires *fake* unconfirmed solutions at the node.
384  fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) {
385      let tx_unconfirmed_solution = sender.tx_unconfirmed_solution.clone();
386      tokio::task::spawn(async move {
387          // This RNG samples the *same* fake solutions for all nodes.
388          let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789);
389          // This RNG samples *different* fake solutions for each node.
390          let mut unique_rng = rand_chacha::ChaChaRng::seed_from_u64(node_id as u64);
391  
392          // A closure to generate a solution ID and solution.
393          fn sample(mut rng: impl Rng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
394              // Sample a random fake solution ID.
395              let solution_id = rng.r#gen::<u64>().into();
396              // Sample random fake solution bytes.
397              let solution = Data::Buffer(Bytes::from((0..1024).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
398              // Return the ID and solution.
399              (solution_id, solution)
400          }
401  
402          // Initialize a counter.
403          let mut counter = 0;
404  
405          loop {
406              // Sample a random fake solution ID and solution.
407              let (solution_id, solution) =
408                  if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) };
409              // Initialize a callback sender and receiver.
410              let (callback, callback_receiver) = oneshot::channel();
411              // Send the fake solution.
412              if let Err(e) = tx_unconfirmed_solution.send((solution_id, solution, callback)).await {
413                  error!("Failed to send unconfirmed solution: {e}");
414              }
415              let _ = callback_receiver.await;
416              // Increment the counter.
417              counter += 1;
418              // Sleep briefly.
419              tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
420          }
421      });
422  }
423  
424  /// Fires *fake* unconfirmed transactions at the node.
425  fn fire_unconfirmed_transactions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) {
426      let tx_unconfirmed_transaction = sender.tx_unconfirmed_transaction.clone();
427      tokio::task::spawn(async move {
428          // This RNG samples the *same* fake transactions for all nodes.
429          let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789);
430          // This RNG samples *different* fake transactions for each node.
431          let mut unique_rng = rand_chacha::ChaChaRng::seed_from_u64(node_id as u64);
432  
433          // A closure to generate an ID and transaction.
434          fn sample(
435              mut rng: impl Rng,
436          ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
437              // Sample a random fake transaction ID.
438              let id = Field::<CurrentNetwork>::rand(&mut rng).into();
439              // Sample random fake transaction bytes.
440              let transaction = Data::Buffer(Bytes::from((0..1024).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
441              // Return the ID and transaction.
442              (id, transaction)
443          }
444  
445          // Initialize a counter.
446          let mut counter = 0;
447  
448          loop {
449              // Sample a random fake transaction ID and transaction.
450              let (id, transaction) = if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) };
451              // Initialize a callback sender and receiver.
452              let (callback, callback_receiver) = oneshot::channel();
453              // Send the fake transaction.
454              if let Err(e) = tx_unconfirmed_transaction.send((id, transaction, callback)).await {
455                  error!("Failed to send unconfirmed transaction: {e}");
456              }
457              let _ = callback_receiver.await;
458              // Increment the counter.
459              counter += 1;
460              // Sleep briefly.
461              tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
462          }
463      });
464  }
465  
466  /**************************************************************************************************/
467  
468  /// An enum of error handlers for the REST API server.
469  pub struct RestError(pub String);
470  
471  impl IntoResponse for RestError {
472      fn into_response(self) -> Response {
473          (StatusCode::INTERNAL_SERVER_ERROR, format!("Something went wrong: {}", self.0)).into_response()
474      }
475  }
476  
477  #[derive(Clone)]
478  struct NodeState {
479      bft: Option<BFT<CurrentNetwork>>,
480      primary: Primary<CurrentNetwork>,
481  }
482  
483  /// Returns the leader of the previous round, if one was present.
484  async fn get_leader(State(node): State<NodeState>) -> Result<ErasedJson, RestError> {
485      match &node.bft {
486          Some(bft) => Ok(ErasedJson::pretty(bft.leader())),
487          None => Err(RestError("BFT is not enabled".into())),
488      }
489  }
490  
491  /// Returns the current round.
492  async fn get_current_round(State(node): State<NodeState>) -> Result<ErasedJson, RestError> {
493      Ok(ErasedJson::pretty(node.primary.current_round()))
494  }
495  
496  /// Returns the certificates for the given round.
497  async fn get_certificates_for_round(
498      State(node): State<NodeState>,
499      Path(round): Path<u64>,
500  ) -> Result<ErasedJson, RestError> {
501      Ok(ErasedJson::pretty(node.primary.storage().get_certificates_for_round(round)))
502  }
503  
504  /// Starts up a local server for monitoring the node.
505  async fn start_server(bft: Option<BFT<CurrentNetwork>>, primary: Primary<CurrentNetwork>, node_id: u16) {
506      // Initialize the routes.
507      let router = Router::new()
508          .route("/", get(|| async { "Hello, World!" }))
509          .route("/leader", get(get_leader))
510          .route("/round/current", get(get_current_round))
511          .route("/certificates/:round", get(get_certificates_for_round))
512          // Pass in the `NodeState` to access state.
513          .with_state(NodeState { bft, primary });
514  
515      // Construct the IP address and port.
516      let addr = format!("127.0.0.1:{}", 3000 + node_id);
517  
518      // Run the server.
519      info!("Starting the server at '{addr}'...");
520      let rest_addr: SocketAddr = addr.parse().unwrap();
521      let rest_listener = TcpListener::bind(rest_addr).await.unwrap();
522      axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
523  }
524  
525  /**************************************************************************************************/
526  
527  /// The operating mode of the node.
528  #[derive(Debug, Clone, ValueEnum)]
529  enum Mode {
530      /// Runs the node with the Narwhal memory pool protocol.
531      Narwhal,
532      /// Runs the node with the Bullshark BFT protocol (on top of Narwhal).
533      Bft,
534  }
535  
536  /// A simple CLI for the node.
537  #[derive(Parser, Debug)]
538  struct Args {
539      /// The mode to run the node in.
540      #[arg(long)]
541      mode: Mode,
542      /// The ID of the node.
543      #[arg(long, value_name = "ID")]
544      id: u16,
545      /// The number of nodes in the network.
546      #[arg(long, value_name = "N")]
547      num_nodes: u16,
548      /// If set, the path to the file containing the committee peers.
549      #[arg(long, value_name = "PATH")]
550      peers: Option<PathBuf>,
551      /// Enables the solution cannons, and optionally the interval in ms to run them on.
552      #[arg(long, value_name = "INTERVAL_MS")]
553      fire_solutions: Option<Option<u64>>,
554      /// Enables the transaction cannons, and optionally the interval in ms to run them on.
555      #[arg(long, value_name = "INTERVAL_MS")]
556      fire_transactions: Option<Option<u64>>,
557      /// Enables the solution and transaction cannons, and optionally the interval in ms to run them on.
558      #[arg(long, value_name = "INTERVAL_MS")]
559      fire_transmissions: Option<Option<u64>>,
560      /// Enables the metrics exporter.
561      #[clap(long, default_value = "false")]
562      metrics: bool,
563  }
564  
565  /// A helper method to parse the peers provided to the CLI.
566  fn parse_peers(peers_string: String) -> Result<HashMap<u16, SocketAddr>, Error> {
567      // Expect list of peers in the form of `node_id=ip:port`, one per line.
568      let mut peers = HashMap::new();
569      for peer in peers_string.lines() {
570          let mut split = peer.split('=');
571          let node_id = u16::from_str(split.next().ok_or_else(|| anyhow!("Bad Format"))?)?;
572          let addr: String = split.next().ok_or_else(|| anyhow!("Bad Format"))?.parse()?;
573          let ip = SocketAddr::from_str(addr.as_str())?;
574          peers.insert(node_id, ip);
575      }
576      Ok(peers)
577  }
578  
579  /**************************************************************************************************/
580  
581  #[tokio::main]
582  async fn main() -> Result<()> {
583      initialize_logger(1);
584  
585      let args = Args::parse();
586  
587      let peers = match args.peers {
588          Some(path) => parse_peers(std::fs::read_to_string(path)?)?,
589          None => Default::default(),
590      };
591  
592      // Initialize an optional BFT holder.
593      let mut bft_holder = None;
594  
595      // Start the node.
596      let (primary, sender) = match args.mode {
597          Mode::Bft => {
598              // Start the BFT.
599              let (bft, sender) = start_bft(args.id, args.num_nodes, peers).await?;
600              // Set the BFT holder.
601              bft_holder = Some(bft.clone());
602              // Return the primary and sender.
603              (bft.primary().clone(), sender)
604          }
605          Mode::Narwhal => start_primary(args.id, args.num_nodes, peers).await?,
606      };
607  
608      // The default interval to fire transmissions at.
609      const DEFAULT_INTERVAL_MS: u64 = 450; // ms
610  
611      // Fire unconfirmed solutions.
612      match (args.fire_transmissions, args.fire_solutions) {
613          // Note: We allow the user to overload the solutions rate, even when the 'fire-transmissions' flag is enabled.
614          (Some(rate), _) | (_, Some(rate)) => {
615              fire_unconfirmed_solutions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
616          }
617          _ => (),
618      };
619  
620      // Fire unconfirmed transactions.
621      match (args.fire_transmissions, args.fire_transactions) {
622          // Note: We allow the user to overload the transactions rate, even when the 'fire-transmissions' flag is enabled.
623          (Some(rate), _) | (_, Some(rate)) => {
624              fire_unconfirmed_transactions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
625          }
626          _ => (),
627      };
628  
629      // Initialize the metrics.
630      #[cfg(feature = "metrics")]
631      if args.metrics {
632          info!("Initializing metrics...");
633          metrics::initialize_metrics(SocketAddr::from_str(&format!("0.0.0.0:{}", 9000 + args.id)).ok());
634      }
635  
636      // Start the monitoring server.
637      start_server(bft_holder, primary, args.id).await;
638      // // Note: Do not move this.
639      // std::future::pending::<()>().await;
640      Ok(())
641  }
642  
643  #[cfg(test)]
644  mod tests {
645      use super::*;
646  
647      #[test]
648      fn parse_peers_empty() -> Result<(), Error> {
649          let peers = parse_peers("".to_owned())?;
650          assert_eq!(peers.len(), 0);
651          Ok(())
652      }
653  
654      #[test]
655      fn parse_peers_ok() -> Result<(), Error> {
656          let s = r#"0=192.168.1.176:5000
657  1=192.168.1.176:5001
658  2=192.168.1.176:5002
659  3=192.168.1.176:5003"#;
660          let peers = parse_peers(s.to_owned())?;
661          assert_eq!(peers.len(), 4);
662          Ok(())
663      }
664  
665      #[test]
666      fn parse_peers_bad_id() -> Result<(), Error> {
667          let s = "A=192.168.1.176:5000";
668          let peers = parse_peers(s.to_owned());
669          assert!(peers.is_err());
670          Ok(())
671      }
672  
673      #[test]
674      fn parse_peers_bad_format() -> Result<(), Error> {
675          let s = "foo";
676          let peers = parse_peers(s.to_owned());
677          assert!(peers.is_err());
678          Ok(())
679      }
680  }