mod.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 mod codec; 17 mod handshake; 18 mod network; 19 20 use crate::tcp::{self, Tcp}; 21 use alphaos_account::Account; 22 use alphaos_node_network::{ConnectionMode, Peer, Resolver}; 23 use alphaos_node_tcp::{P2P, protocols::*}; 24 use alphaos_utilities::SignalHandler; 25 use alphavm::{ 26 ledger::committee::Committee, 27 prelude::{Address, Field, Header, Network, PrivateKey, ViewKey}, 28 synthesizer::Restrictions, 29 }; 30 31 #[cfg(feature = "locktick")] 32 use locktick::{ 33 parking_lot::{Mutex, RwLock}, 34 tokio::Mutex as TMutex, 35 }; 36 #[cfg(not(feature = "locktick"))] 37 use parking_lot::{Mutex, RwLock}; 38 use std::{ 39 collections::{HashMap, HashSet}, 40 net::SocketAddr, 41 ops::Deref, 42 str::FromStr, 43 sync::Arc, 44 time::{Duration, Instant}, 45 }; 46 #[cfg(not(feature = "locktick"))] 47 use tokio::sync::Mutex as TMutex; 48 use tokio::sync::oneshot; 49 50 #[derive(Clone)] 51 pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>); 52 53 impl<N: Network> Deref for BootstrapClient<N> { 54 type Target = Arc<InnerBootstrapClient<N>>; 55 56 fn deref(&self) -> &Self::Target { 57 &self.0 58 } 59 } 60 61 // A tuple holding the validator's Aleo address, and its connection mode. 62 type KnownValidatorInfo<N> = (Address<N>, ConnectionMode); 63 64 pub struct InnerBootstrapClient<N: Network> { 65 tcp: Tcp, 66 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>, 67 known_validators: RwLock<HashMap<SocketAddr, KnownValidatorInfo<N>>>, 68 resolver: RwLock<Resolver<N>>, 69 account: Account<N>, 70 genesis_header: Header<N>, 71 restrictions_id: Field<N>, 72 http_client: reqwest::Client, 73 latest_committee: TMutex<(HashSet<Address<N>>, Instant)>, 74 dev: Option<u16>, 75 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>, 76 } 77 78 impl<N: Network> BootstrapClient<N> { 79 // The interval for validator committee refreshes. 80 const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20); 81 // The maximum amount of time per connection. 82 const CONNECTION_LIFETIME: Duration = Duration::from_secs(15); 83 // The maximum number of connected peers. 84 const MAX_PEERS: u16 = 1_000; 85 86 pub async fn new( 87 listener_addr: SocketAddr, 88 account: Account<N>, 89 genesis_header: Header<N>, 90 dev: Option<u16>, 91 ) -> anyhow::Result<Self> { 92 // Initialize the TCP stack. 93 let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS)); 94 // Initialize the peer pool. 95 let peer_pool = Default::default(); 96 // Initialize a collection of validators. 97 let known_validators = Default::default(); 98 // Load the restrictions ID. 99 let restrictions_id = Restrictions::load()?.restrictions_id(); 100 // Create a resolver. 101 let resolver = Default::default(); 102 // Create an HTTP client to obtain the current committee. 103 let http_client = reqwest::Client::new(); 104 // Prepare a placeholder committee, ensuring that it's insta-outdated. 105 let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME)); 106 107 // Prepare the shutdown channel. 108 let (shutdown_tx, shutdown_rx) = oneshot::channel(); 109 let shutdown_tx = Mutex::new(Some(shutdown_tx)); 110 111 // Construct and return the bootstrap client. 112 let inner = InnerBootstrapClient { 113 tcp, 114 peer_pool, 115 known_validators, 116 resolver, 117 account, 118 genesis_header, 119 restrictions_id, 120 http_client, 121 latest_committee, 122 dev, 123 shutdown_tx, 124 }; 125 let node = BootstrapClient(Arc::new(inner)); 126 127 // Enable the TCP protocols. 128 node.enable_handshake().await; 129 node.enable_reading().await; 130 node.enable_writing().await; 131 node.enable_disconnect().await; 132 node.enable_on_connect().await; 133 // Enable the TCP listener. Note: This must be called after the above protocols. 134 node.tcp().enable_listener().await.expect("Failed to enable the TCP listener"); 135 136 // Await the shutdown signal. 137 let _ = shutdown_rx.await; 138 139 Ok(node) 140 } 141 142 /// Returns the account address of the node. 143 pub fn address(&self) -> Address<N> { 144 self.account.address() 145 } 146 147 /// Returns the account private key of the node. 148 pub fn private_key(&self) -> &PrivateKey<N> { 149 self.account.private_key() 150 } 151 152 /// Returns the account view key of the node. 153 pub fn view_key(&self) -> &ViewKey<N> { 154 self.account.view_key() 155 } 156 157 /// Returns the listener IP address from the connected peer address. 158 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> { 159 self.resolver.read().get_listener(connected_addr) 160 } 161 162 /// Returns `true` if the node is in development mode. 163 pub fn is_dev(&self) -> bool { 164 self.dev.is_some() 165 } 166 167 /// Returns the current validator committee or updates it from the explorer, if 168 /// we are capable of obtaining it from the network. 169 pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> { 170 // Development testing may include a list of committee Aleo addresses loaded from the environment. 171 if cfg!(feature = "test") || self.is_dev() { 172 match std::env::var("TEST_COMMITTEE_ADDRS") { 173 Ok(aleo_addrs) => { 174 let dev_committee = 175 aleo_addrs.split(',').map(|addr| Address::<N>::from_str(addr).unwrap()).collect(); 176 return Ok(Some(dev_committee)); 177 } 178 Err(err) => { 179 warn!("Failed to load committee peers from environment: {err}"); 180 return Ok(None); 181 } 182 } 183 } 184 185 let now = Instant::now(); 186 let (committee, timestamp) = &mut *self.latest_committee.lock().await; 187 if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME { 188 debug!("Updating the validator committee"); 189 *timestamp = now; 190 let committe_query_addr = format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::NAME); 191 let response = self.http_client.get(committe_query_addr).send().await?; 192 let json = response.text().await?; 193 let full_committee = Committee::from_str(&json)?; 194 *committee = full_committee.members().keys().copied().collect(); 195 debug!("The validator committee has {} members now", committee.len()); 196 197 Ok(Some(committee.clone())) 198 } else { 199 Ok(Some(committee.clone())) 200 } 201 } 202 203 // Return the known addresses of current committee members, or all known 204 // validators if the committee info is unavailable. 205 pub async fn get_validator_addrs(&self) -> HashMap<SocketAddr, KnownValidatorInfo<N>> { 206 // First, collect info on all the validators we had connected to before. 207 let mut known_validators = self.known_validators.read().clone(); 208 // If the committee info is available, prune non-committee members. 209 match self.get_or_update_committee().await { 210 Ok(Some(committee)) => { 211 known_validators.retain(|_, (aleo_addr, _)| committee.contains(aleo_addr)); 212 known_validators 213 } 214 Ok(None) => known_validators, 215 Err(error) => { 216 error!("Couldn't update the validator committee: {error}"); 217 known_validators 218 } 219 } 220 } 221 222 /// Shuts down the bootstrap client. 223 pub async fn shut_down(&self) { 224 info!("Shutting down the bootstrap client..."); 225 226 // Shut down the low-level network features. 227 self.tcp.shut_down().await; 228 229 // Shut down the node. 230 if let Some(shutdown_tx) = self.shutdown_tx.lock().take() { 231 let _ = shutdown_tx.send(()); 232 } 233 } 234 235 /// Blocks until a shutdown signal was received or manual shutdown was triggered. 236 pub async fn wait_for_signals(&self, handler: &SignalHandler) { 237 handler.wait_for_signals().await; 238 239 // If the node is already initialized, then shut it down. 240 self.shut_down().await; 241 } 242 }