simple_node.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 #[macro_use] 17 extern crate tracing; 18 19 #[cfg(feature = "metrics")] 20 extern crate alphaos_node_metrics as metrics; 21 22 use alphaos_account::Account; 23 use alphaos_node_bft::{ 24 BFT, 25 MEMORY_POOL_PORT, 26 Primary, 27 helpers::{ConsensusReceiver, PrimarySender, Storage, init_consensus_channels, init_primary_channels}, 28 }; 29 use alphaos_node_bft_ledger_service::TranslucentLedgerService; 30 use alphaos_node_bft_storage_service::BFTMemoryService; 31 use alphaos_node_sync::BlockSync; 32 use alphaos_utilities::SimpleStoppable; 33 34 use alpha_std::StorageMode; 35 use alphavm::{ 36 console::{account::PrivateKey, algorithms::BHP256, types::Address}, 37 ledger::{ 38 Block, 39 Ledger, 40 block::Transaction, 41 committee::{Committee, MIN_VALIDATOR_STAKE}, 42 narwhal::{BatchHeader, Data}, 43 puzzle::{Solution, SolutionID}, 44 store::{ConsensusStore, helpers::memory::ConsensusMemory}, 45 }, 46 prelude::{Field, Hash, Network, Uniform, VM}, 47 utilities::{FromBytes, TestRng, ToBits, ToBytes, to_bytes_le}, 48 }; 49 50 use ::bytes::Bytes; 51 use anyhow::{Error, Result, anyhow, ensure}; 52 use axum::{ 53 Router, 54 extract::{Path, State}, 55 http::StatusCode, 56 response::{IntoResponse, Response}, 57 routing::get, 58 }; 59 60 use axum_extra::response::ErasedJson; 61 use clap::{Parser, ValueEnum}; 62 use indexmap::IndexMap; 63 use rand::{CryptoRng, Rng, SeedableRng}; 64 use std::{ 65 collections::HashMap, 66 net::{IpAddr, Ipv4Addr, SocketAddr}, 67 path::PathBuf, 68 str::FromStr, 69 sync::{Arc, Mutex, OnceLock}, 70 }; 71 use tokio::{net::TcpListener, sync::oneshot}; 72 use tracing_subscriber::{ 73 layer::{Layer, SubscriberExt}, 74 util::SubscriberInitExt, 75 }; 76 77 type CurrentNetwork = alphavm::prelude::MainnetV0; 78 79 /**************************************************************************************************/ 80 81 /// Initializes the logger. 82 pub fn initialize_logger(verbosity: u8) { 83 let verbosity_str = match verbosity { 84 0 => "info", 85 1 => "debug", 86 2..=4 => "trace", 87 _ => "info", 88 }; 89 90 // Filter out undesirable logs. (unfortunately EnvFilter cannot be cloned) 91 let filter = tracing_subscriber::EnvFilter::from_str(verbosity_str) 92 .unwrap() 93 .add_directive("mio=off".parse().unwrap()) 94 .add_directive("tokio_util=off".parse().unwrap()) 95 .add_directive("hyper=off".parse().unwrap()) 96 .add_directive("reqwest=off".parse().unwrap()) 97 .add_directive("want=off".parse().unwrap()) 98 .add_directive("h2=off".parse().unwrap()); 99 100 let filter = if verbosity > 3 { 101 filter.add_directive("alphaos_node_bft::gateway=trace".parse().unwrap()) 102 } else { 103 filter.add_directive("alphaos_node_bft::gateway=off".parse().unwrap()) 104 }; 105 106 let filter = if verbosity > 4 { 107 filter.add_directive("alphaos_node_tcp=trace".parse().unwrap()) 108 } else { 109 filter.add_directive("alphaos_node_tcp=off".parse().unwrap()) 110 }; 111 112 // Initialize tracing. 113 let _ = tracing_subscriber::registry() 114 .with(tracing_subscriber::fmt::Layer::default().with_target(verbosity > 2).with_filter(filter)) 115 .try_init(); 116 } 117 118 /**************************************************************************************************/ 119 120 /// Starts the BFT instance. 121 pub async fn start_bft( 122 node_id: u16, 123 num_nodes: u16, 124 peers: HashMap<u16, SocketAddr>, 125 ) -> Result<(BFT<CurrentNetwork>, PrimarySender<CurrentNetwork>)> { 126 // Initialize the primary channels. 127 let (sender, receiver) = init_primary_channels(); 128 // Initialize the components. 129 let (committee, account) = initialize_components(node_id, num_nodes)?; 130 // Initialize the translucent ledger service. 131 let ledger = create_ledger(&account, num_nodes, committee, node_id); 132 // Initialize the storage. 133 let storage = Storage::new( 134 ledger.clone(), 135 Arc::new(BFTMemoryService::new()), 136 BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64, 137 ); 138 // Initialize the gateway IP and storage mode. 139 let ip = match peers.get(&node_id) { 140 Some(ip) => Some(*ip), 141 None => Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + node_id)), 142 }; 143 let storage_mode = StorageMode::new_test(None); 144 // Initialize the trusted validators. 145 let trusted_validators = trusted_validators(node_id, num_nodes, peers); 146 let trusted_peers_only = false; 147 // Initialize the consensus channels. 148 let (consensus_sender, consensus_receiver) = init_consensus_channels::<CurrentNetwork>(); 149 // Initialize the consensus receiver handler. 150 consensus_handler(consensus_receiver); 151 // Initialize the BFT instance. 152 let block_sync = Arc::new(BlockSync::new(ledger.clone())); 153 let mut bft = BFT::<CurrentNetwork>::new( 154 account, 155 storage, 156 ledger, 157 block_sync, 158 ip, 159 &trusted_validators, 160 trusted_peers_only, 161 storage_mode, 162 None, 163 )?; 164 // Run the BFT instance. 165 bft.run(None, Some(consensus_sender), sender.clone(), receiver).await?; 166 // Retrieve the BFT's primary. 167 let primary = bft.primary(); 168 // Handle OS signals. 169 handle_signals(primary); 170 // Return the BFT instance. 171 Ok((bft, sender)) 172 } 173 174 /// Starts the primary instance. 175 pub async fn start_primary( 176 node_id: u16, 177 num_nodes: u16, 178 peers: HashMap<u16, SocketAddr>, 179 ) -> Result<(Primary<CurrentNetwork>, PrimarySender<CurrentNetwork>)> { 180 // Initialize the primary channels. 181 let (sender, receiver) = init_primary_channels(); 182 // Initialize the components. 183 let (committee, account) = initialize_components(node_id, num_nodes)?; 184 // Initialize the translucent ledger service. 185 let ledger = create_ledger(&account, num_nodes, committee, node_id); 186 // Initialize the storage. 187 let storage = Storage::new( 188 ledger.clone(), 189 Arc::new(BFTMemoryService::new()), 190 BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64, 191 ); 192 // Initialize the gateway IP and storage mode. 193 let ip = match peers.get(&node_id) { 194 Some(ip) => Some(*ip), 195 None => Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + node_id)), 196 }; 197 let storage_mode = StorageMode::new_test(None); 198 // Initialize the trusted validators. 199 let trusted_validators = trusted_validators(node_id, num_nodes, peers); 200 let trusted_peers_only = false; 201 // Initialize the primary instance. 202 let block_sync = Arc::new(BlockSync::new(ledger.clone())); 203 let mut primary = Primary::<CurrentNetwork>::new( 204 account, 205 storage, 206 ledger, 207 block_sync, 208 ip, 209 &trusted_validators, 210 trusted_peers_only, 211 storage_mode, 212 None, 213 )?; 214 // Run the primary instance. 215 primary.run(None, None, sender.clone(), receiver).await?; 216 // Handle OS signals. 217 handle_signals(&primary); 218 // Return the primary instance. 219 Ok((primary, sender)) 220 } 221 222 /// Initialize the translucent ledger service. 223 fn create_ledger( 224 account: &Account<CurrentNetwork>, 225 num_nodes: u16, 226 committee: Committee<alphavm::prelude::MainnetV0>, 227 node_id: u16, 228 ) -> Arc<TranslucentLedgerService<alphavm::prelude::MainnetV0, ConsensusMemory<alphavm::prelude::MainnetV0>>> { 229 let gen_key = account.private_key(); 230 let public_balance_per_validator = 231 (CurrentNetwork::STARTING_SUPPLY - (num_nodes as u64) * MIN_VALIDATOR_STAKE) / (num_nodes as u64); 232 let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new(); 233 for address in committee.members().keys() { 234 balances.insert(*address, public_balance_per_validator); 235 } 236 let mut rng = TestRng::default(); 237 let gen_ledger = genesis_ledger(*gen_key, committee.clone(), balances.clone(), node_id, &mut rng); 238 Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new())) 239 } 240 241 pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>; 242 243 fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> { 244 static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new(); 245 CACHE.get_or_init(|| Mutex::new(HashMap::new())) 246 } 247 248 fn genesis_block( 249 genesis_private_key: PrivateKey<CurrentNetwork>, 250 committee: Committee<CurrentNetwork>, 251 public_balances: IndexMap<Address<CurrentNetwork>, u64>, 252 rng: &mut (impl Rng + CryptoRng), 253 ) -> Block<CurrentNetwork> { 254 // Initialize the store. 255 let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap(); 256 // Initialize a new VM. 257 let vm = VM::from(store).unwrap(); 258 // Initialize the genesis block. 259 let bonded_balances: IndexMap<_, _> = 260 committee.members().iter().map(|(address, (amount, _, _))| (*address, (*address, *address, *amount))).collect(); 261 vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap() 262 } 263 264 fn genesis_ledger( 265 genesis_private_key: PrivateKey<CurrentNetwork>, 266 committee: Committee<CurrentNetwork>, 267 public_balances: IndexMap<Address<CurrentNetwork>, u64>, 268 node_id: u16, 269 rng: &mut (impl Rng + CryptoRng), 270 ) -> CurrentLedger { 271 let cache_key = 272 to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap(); 273 // Initialize the genesis block on the first call; other callers 274 // will wait for it on the mutex. 275 let block = genesis_cache() 276 .lock() 277 .unwrap() 278 .entry(cache_key.clone()) 279 .or_insert_with(|| { 280 let hasher = BHP256::<CurrentNetwork>::setup("aleo.dev.block").unwrap(); 281 let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis"; 282 let file_path = std::env::temp_dir().join(file_name); 283 if file_path.exists() { 284 let buffer = std::fs::read(file_path).unwrap(); 285 return Block::from_bytes_le(&buffer).unwrap(); 286 } 287 288 let block = genesis_block(genesis_private_key, committee, public_balances, rng); 289 std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap(); 290 block 291 }) 292 .clone(); 293 // Initialize the ledger with the genesis block. 294 CurrentLedger::load(block, alpha_std::StorageMode::Development(node_id)).unwrap() 295 } 296 297 /// Initializes the components of the node. 298 fn initialize_components(node_id: u16, num_nodes: u16) -> Result<(Committee<CurrentNetwork>, Account<CurrentNetwork>)> { 299 // Ensure that the node ID is valid. 300 ensure!(node_id < num_nodes, "Node ID {node_id} must be less than {num_nodes}"); 301 302 // Sample a account. 303 let account = Account::new(&mut rand_chacha::ChaChaRng::seed_from_u64(node_id as u64))?; 304 println!("\n{account}\n"); 305 306 // Initialize a map for the committee members. 307 let mut members = IndexMap::with_capacity(num_nodes as usize); 308 // Add the validators as members. 309 for i in 0..num_nodes { 310 // Sample the account. 311 let account = Account::new(&mut rand_chacha::ChaChaRng::seed_from_u64(i as u64))?; 312 // Add the validator. 313 members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, i as u8)); 314 println!(" Validator {}: {}", i, account.address()); 315 } 316 println!(); 317 318 // Initialize the committee. 319 let committee = Committee::<CurrentNetwork>::new(0u64, members)?; 320 // Return the committee and account. 321 Ok((committee, account)) 322 } 323 324 /// Handles the consensus receiver. 325 fn consensus_handler(receiver: ConsensusReceiver<CurrentNetwork>) { 326 let ConsensusReceiver { mut rx_consensus_subdag } = receiver; 327 328 tokio::task::spawn(async move { 329 while let Some((subdag, transmissions, callback)) = rx_consensus_subdag.recv().await { 330 // Determine the amount of time to sleep for the subdag. 331 let subdag_ms = subdag.values().flatten().count(); 332 // Determine the amount of time to sleep for the transmissions. 333 let transmissions_ms = transmissions.len() * 25; 334 // Add a constant delay. 335 let constant_ms = 100; 336 // Compute the total amount of time to sleep. 337 let sleep_ms = (subdag_ms + transmissions_ms + constant_ms) as u64; 338 // Sleep for the determined amount of time. 339 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await; 340 // Call the callback. 341 callback.send(Ok(())).ok(); 342 } 343 }); 344 } 345 346 /// Returns the trusted validators. 347 fn trusted_validators(node_id: u16, num_nodes: u16, peers: HashMap<u16, SocketAddr>) -> Vec<SocketAddr> { 348 // Initialize a vector for the trusted nodes. 349 let mut trusted = Vec::with_capacity(num_nodes as usize); 350 // Iterate through the nodes. 351 for i in 0..num_nodes { 352 // Initialize the gateway IP. 353 let ip = match peers.get(&i) { 354 Some(ip) => *ip, 355 None => SocketAddr::from_str(&format!("127.0.0.1:{}", MEMORY_POOL_PORT + i)).unwrap(), 356 }; 357 // If the node is not the current node, add it to the trusted nodes. 358 if i != node_id { 359 trusted.push(ip); 360 } 361 } 362 // Return the trusted nodes. 363 trusted 364 } 365 366 /// Handles OS signals for the node to intercept and perform a clean shutdown. 367 /// Note: Only Ctrl-C is supported; it should work on both Unix-family systems and Windows. 368 fn handle_signals(primary: &Primary<CurrentNetwork>) { 369 let node = primary.clone(); 370 tokio::task::spawn(async move { 371 match tokio::signal::ctrl_c().await { 372 Ok(()) => { 373 node.shut_down().await; 374 std::process::exit(0); 375 } 376 Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error), 377 } 378 }); 379 } 380 381 /**************************************************************************************************/ 382 383 /// Fires *fake* unconfirmed solutions at the node. 384 fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) { 385 let tx_unconfirmed_solution = sender.tx_unconfirmed_solution.clone(); 386 tokio::task::spawn(async move { 387 // This RNG samples the *same* fake solutions for all nodes. 388 let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789); 389 // This RNG samples *different* fake solutions for each node. 390 let mut unique_rng = rand_chacha::ChaChaRng::seed_from_u64(node_id as u64); 391 392 // A closure to generate a solution ID and solution. 393 fn sample(mut rng: impl Rng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) { 394 // Sample a random fake solution ID. 395 let solution_id = rng.r#gen::<u64>().into(); 396 // Sample random fake solution bytes. 397 let solution = Data::Buffer(Bytes::from((0..1024).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 398 // Return the ID and solution. 399 (solution_id, solution) 400 } 401 402 // Initialize a counter. 403 let mut counter = 0; 404 405 loop { 406 // Sample a random fake solution ID and solution. 407 let (solution_id, solution) = 408 if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) }; 409 // Initialize a callback sender and receiver. 410 let (callback, callback_receiver) = oneshot::channel(); 411 // Send the fake solution. 412 if let Err(e) = tx_unconfirmed_solution.send((solution_id, solution, callback)).await { 413 error!("Failed to send unconfirmed solution: {e}"); 414 } 415 let _ = callback_receiver.await; 416 // Increment the counter. 417 counter += 1; 418 // Sleep briefly. 419 tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await; 420 } 421 }); 422 } 423 424 /// Fires *fake* unconfirmed transactions at the node. 425 fn fire_unconfirmed_transactions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) { 426 let tx_unconfirmed_transaction = sender.tx_unconfirmed_transaction.clone(); 427 tokio::task::spawn(async move { 428 // This RNG samples the *same* fake transactions for all nodes. 429 let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789); 430 // This RNG samples *different* fake transactions for each node. 431 let mut unique_rng = rand_chacha::ChaChaRng::seed_from_u64(node_id as u64); 432 433 // A closure to generate an ID and transaction. 434 fn sample( 435 mut rng: impl Rng, 436 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) { 437 // Sample a random fake transaction ID. 438 let id = Field::<CurrentNetwork>::rand(&mut rng).into(); 439 // Sample random fake transaction bytes. 440 let transaction = Data::Buffer(Bytes::from((0..1024).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 441 // Return the ID and transaction. 442 (id, transaction) 443 } 444 445 // Initialize a counter. 446 let mut counter = 0; 447 448 loop { 449 // Sample a random fake transaction ID and transaction. 450 let (id, transaction) = if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) }; 451 // Initialize a callback sender and receiver. 452 let (callback, callback_receiver) = oneshot::channel(); 453 // Send the fake transaction. 454 if let Err(e) = tx_unconfirmed_transaction.send((id, transaction, callback)).await { 455 error!("Failed to send unconfirmed transaction: {e}"); 456 } 457 let _ = callback_receiver.await; 458 // Increment the counter. 459 counter += 1; 460 // Sleep briefly. 461 tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await; 462 } 463 }); 464 } 465 466 /**************************************************************************************************/ 467 468 /// An enum of error handlers for the REST API server. 469 pub struct RestError(pub String); 470 471 impl IntoResponse for RestError { 472 fn into_response(self) -> Response { 473 (StatusCode::INTERNAL_SERVER_ERROR, format!("Something went wrong: {}", self.0)).into_response() 474 } 475 } 476 477 #[derive(Clone)] 478 struct NodeState { 479 bft: Option<BFT<CurrentNetwork>>, 480 primary: Primary<CurrentNetwork>, 481 } 482 483 /// Returns the leader of the previous round, if one was present. 484 async fn get_leader(State(node): State<NodeState>) -> Result<ErasedJson, RestError> { 485 match &node.bft { 486 Some(bft) => Ok(ErasedJson::pretty(bft.leader())), 487 None => Err(RestError("BFT is not enabled".into())), 488 } 489 } 490 491 /// Returns the current round. 492 async fn get_current_round(State(node): State<NodeState>) -> Result<ErasedJson, RestError> { 493 Ok(ErasedJson::pretty(node.primary.current_round())) 494 } 495 496 /// Returns the certificates for the given round. 497 async fn get_certificates_for_round( 498 State(node): State<NodeState>, 499 Path(round): Path<u64>, 500 ) -> Result<ErasedJson, RestError> { 501 Ok(ErasedJson::pretty(node.primary.storage().get_certificates_for_round(round))) 502 } 503 504 /// Starts up a local server for monitoring the node. 505 async fn start_server(bft: Option<BFT<CurrentNetwork>>, primary: Primary<CurrentNetwork>, node_id: u16) { 506 // Initialize the routes. 507 let router = Router::new() 508 .route("/", get(|| async { "Hello, World!" })) 509 .route("/leader", get(get_leader)) 510 .route("/round/current", get(get_current_round)) 511 .route("/certificates/:round", get(get_certificates_for_round)) 512 // Pass in the `NodeState` to access state. 513 .with_state(NodeState { bft, primary }); 514 515 // Construct the IP address and port. 516 let addr = format!("127.0.0.1:{}", 3000 + node_id); 517 518 // Run the server. 519 info!("Starting the server at '{addr}'..."); 520 let rest_addr: SocketAddr = addr.parse().unwrap(); 521 let rest_listener = TcpListener::bind(rest_addr).await.unwrap(); 522 axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap(); 523 } 524 525 /**************************************************************************************************/ 526 527 /// The operating mode of the node. 528 #[derive(Debug, Clone, ValueEnum)] 529 enum Mode { 530 /// Runs the node with the Narwhal memory pool protocol. 531 Narwhal, 532 /// Runs the node with the Bullshark BFT protocol (on top of Narwhal). 533 Bft, 534 } 535 536 /// A simple CLI for the node. 537 #[derive(Parser, Debug)] 538 struct Args { 539 /// The mode to run the node in. 540 #[arg(long)] 541 mode: Mode, 542 /// The ID of the node. 543 #[arg(long, value_name = "ID")] 544 id: u16, 545 /// The number of nodes in the network. 546 #[arg(long, value_name = "N")] 547 num_nodes: u16, 548 /// If set, the path to the file containing the committee peers. 549 #[arg(long, value_name = "PATH")] 550 peers: Option<PathBuf>, 551 /// Enables the solution cannons, and optionally the interval in ms to run them on. 552 #[arg(long, value_name = "INTERVAL_MS")] 553 fire_solutions: Option<Option<u64>>, 554 /// Enables the transaction cannons, and optionally the interval in ms to run them on. 555 #[arg(long, value_name = "INTERVAL_MS")] 556 fire_transactions: Option<Option<u64>>, 557 /// Enables the solution and transaction cannons, and optionally the interval in ms to run them on. 558 #[arg(long, value_name = "INTERVAL_MS")] 559 fire_transmissions: Option<Option<u64>>, 560 /// Enables the metrics exporter. 561 #[clap(long, default_value = "false")] 562 metrics: bool, 563 } 564 565 /// A helper method to parse the peers provided to the CLI. 566 fn parse_peers(peers_string: String) -> Result<HashMap<u16, SocketAddr>, Error> { 567 // Expect list of peers in the form of `node_id=ip:port`, one per line. 568 let mut peers = HashMap::new(); 569 for peer in peers_string.lines() { 570 let mut split = peer.split('='); 571 let node_id = u16::from_str(split.next().ok_or_else(|| anyhow!("Bad Format"))?)?; 572 let addr: String = split.next().ok_or_else(|| anyhow!("Bad Format"))?.parse()?; 573 let ip = SocketAddr::from_str(addr.as_str())?; 574 peers.insert(node_id, ip); 575 } 576 Ok(peers) 577 } 578 579 /**************************************************************************************************/ 580 581 #[tokio::main] 582 async fn main() -> Result<()> { 583 initialize_logger(1); 584 585 let args = Args::parse(); 586 587 let peers = match args.peers { 588 Some(path) => parse_peers(std::fs::read_to_string(path)?)?, 589 None => Default::default(), 590 }; 591 592 // Initialize an optional BFT holder. 593 let mut bft_holder = None; 594 595 // Start the node. 596 let (primary, sender) = match args.mode { 597 Mode::Bft => { 598 // Start the BFT. 599 let (bft, sender) = start_bft(args.id, args.num_nodes, peers).await?; 600 // Set the BFT holder. 601 bft_holder = Some(bft.clone()); 602 // Return the primary and sender. 603 (bft.primary().clone(), sender) 604 } 605 Mode::Narwhal => start_primary(args.id, args.num_nodes, peers).await?, 606 }; 607 608 // The default interval to fire transmissions at. 609 const DEFAULT_INTERVAL_MS: u64 = 450; // ms 610 611 // Fire unconfirmed solutions. 612 match (args.fire_transmissions, args.fire_solutions) { 613 // Note: We allow the user to overload the solutions rate, even when the 'fire-transmissions' flag is enabled. 614 (Some(rate), _) | (_, Some(rate)) => { 615 fire_unconfirmed_solutions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS)); 616 } 617 _ => (), 618 }; 619 620 // Fire unconfirmed transactions. 621 match (args.fire_transmissions, args.fire_transactions) { 622 // Note: We allow the user to overload the transactions rate, even when the 'fire-transmissions' flag is enabled. 623 (Some(rate), _) | (_, Some(rate)) => { 624 fire_unconfirmed_transactions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS)); 625 } 626 _ => (), 627 }; 628 629 // Initialize the metrics. 630 #[cfg(feature = "metrics")] 631 if args.metrics { 632 info!("Initializing metrics..."); 633 metrics::initialize_metrics(SocketAddr::from_str(&format!("0.0.0.0:{}", 9000 + args.id)).ok()); 634 } 635 636 // Start the monitoring server. 637 start_server(bft_holder, primary, args.id).await; 638 // // Note: Do not move this. 639 // std::future::pending::<()>().await; 640 Ok(()) 641 } 642 643 #[cfg(test)] 644 mod tests { 645 use super::*; 646 647 #[test] 648 fn parse_peers_empty() -> Result<(), Error> { 649 let peers = parse_peers("".to_owned())?; 650 assert_eq!(peers.len(), 0); 651 Ok(()) 652 } 653 654 #[test] 655 fn parse_peers_ok() -> Result<(), Error> { 656 let s = r#"0=192.168.1.176:5000 657 1=192.168.1.176:5001 658 2=192.168.1.176:5002 659 3=192.168.1.176:5003"#; 660 let peers = parse_peers(s.to_owned())?; 661 assert_eq!(peers.len(), 4); 662 Ok(()) 663 } 664 665 #[test] 666 fn parse_peers_bad_id() -> Result<(), Error> { 667 let s = "A=192.168.1.176:5000"; 668 let peers = parse_peers(s.to_owned()); 669 assert!(peers.is_err()); 670 Ok(()) 671 } 672 673 #[test] 674 fn parse_peers_bad_format() -> Result<(), Error> { 675 let s = "foo"; 676 let peers = parse_peers(s.to_owned()); 677 assert!(peers.is_err()); 678 Ok(()) 679 } 680 }