/ node / src / bootstrap_client / mod.rs
mod.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  mod codec;
 17  mod handshake;
 18  mod network;
 19  
 20  use crate::tcp::{self, Tcp};
 21  use alphaos_account::Account;
 22  use alphaos_node_network::{ConnectionMode, Peer, Resolver};
 23  use alphaos_node_tcp::{P2P, protocols::*};
 24  use alphaos_utilities::SignalHandler;
 25  use alphavm::{
 26      ledger::committee::Committee,
 27      prelude::{Address, Field, Header, Network, PrivateKey, ViewKey},
 28      synthesizer::Restrictions,
 29  };
 30  
 31  #[cfg(feature = "locktick")]
 32  use locktick::{
 33      parking_lot::{Mutex, RwLock},
 34      tokio::Mutex as TMutex,
 35  };
 36  #[cfg(not(feature = "locktick"))]
 37  use parking_lot::{Mutex, RwLock};
 38  use std::{
 39      collections::{HashMap, HashSet},
 40      net::SocketAddr,
 41      ops::Deref,
 42      str::FromStr,
 43      sync::Arc,
 44      time::{Duration, Instant},
 45  };
 46  #[cfg(not(feature = "locktick"))]
 47  use tokio::sync::Mutex as TMutex;
 48  use tokio::sync::oneshot;
 49  
 50  #[derive(Clone)]
 51  pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>);
 52  
 53  impl<N: Network> Deref for BootstrapClient<N> {
 54      type Target = Arc<InnerBootstrapClient<N>>;
 55  
 56      fn deref(&self) -> &Self::Target {
 57          &self.0
 58      }
 59  }
 60  
 61  // A tuple holding the validator's Aleo address, and its connection mode.
 62  type KnownValidatorInfo<N> = (Address<N>, ConnectionMode);
 63  
 64  pub struct InnerBootstrapClient<N: Network> {
 65      tcp: Tcp,
 66      peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
 67      known_validators: RwLock<HashMap<SocketAddr, KnownValidatorInfo<N>>>,
 68      resolver: RwLock<Resolver<N>>,
 69      account: Account<N>,
 70      genesis_header: Header<N>,
 71      restrictions_id: Field<N>,
 72      http_client: reqwest::Client,
 73      latest_committee: TMutex<(HashSet<Address<N>>, Instant)>,
 74      dev: Option<u16>,
 75      shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
 76  }
 77  
 78  impl<N: Network> BootstrapClient<N> {
 79      // The interval for validator committee refreshes.
 80      const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20);
 81      // The maximum amount of time per connection.
 82      const CONNECTION_LIFETIME: Duration = Duration::from_secs(15);
 83      // The maximum number of connected peers.
 84      const MAX_PEERS: u16 = 1_000;
 85  
 86      pub async fn new(
 87          listener_addr: SocketAddr,
 88          account: Account<N>,
 89          genesis_header: Header<N>,
 90          dev: Option<u16>,
 91      ) -> anyhow::Result<Self> {
 92          // Initialize the TCP stack.
 93          let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS));
 94          // Initialize the peer pool.
 95          let peer_pool = Default::default();
 96          // Initialize a collection of validators.
 97          let known_validators = Default::default();
 98          // Load the restrictions ID.
 99          let restrictions_id = Restrictions::load()?.restrictions_id();
100          // Create a resolver.
101          let resolver = Default::default();
102          // Create an HTTP client to obtain the current committee.
103          let http_client = reqwest::Client::new();
104          // Prepare a placeholder committee, ensuring that it's insta-outdated.
105          let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME));
106  
107          // Prepare the shutdown channel.
108          let (shutdown_tx, shutdown_rx) = oneshot::channel();
109          let shutdown_tx = Mutex::new(Some(shutdown_tx));
110  
111          // Construct and return the bootstrap client.
112          let inner = InnerBootstrapClient {
113              tcp,
114              peer_pool,
115              known_validators,
116              resolver,
117              account,
118              genesis_header,
119              restrictions_id,
120              http_client,
121              latest_committee,
122              dev,
123              shutdown_tx,
124          };
125          let node = BootstrapClient(Arc::new(inner));
126  
127          // Enable the TCP protocols.
128          node.enable_handshake().await;
129          node.enable_reading().await;
130          node.enable_writing().await;
131          node.enable_disconnect().await;
132          node.enable_on_connect().await;
133          // Enable the TCP listener. Note: This must be called after the above protocols.
134          node.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
135  
136          // Await the shutdown signal.
137          let _ = shutdown_rx.await;
138  
139          Ok(node)
140      }
141  
142      /// Returns the account address of the node.
143      pub fn address(&self) -> Address<N> {
144          self.account.address()
145      }
146  
147      /// Returns the account private key of the node.
148      pub fn private_key(&self) -> &PrivateKey<N> {
149          self.account.private_key()
150      }
151  
152      /// Returns the account view key of the node.
153      pub fn view_key(&self) -> &ViewKey<N> {
154          self.account.view_key()
155      }
156  
157      /// Returns the listener IP address from the connected peer address.
158      pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
159          self.resolver.read().get_listener(connected_addr)
160      }
161  
162      /// Returns `true` if the node is in development mode.
163      pub fn is_dev(&self) -> bool {
164          self.dev.is_some()
165      }
166  
167      /// Returns the current validator committee or updates it from the explorer, if
168      /// we are capable of obtaining it from the network.
169      pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> {
170          // Development testing may include a list of committee Aleo addresses loaded from the environment.
171          if cfg!(feature = "test") || self.is_dev() {
172              match std::env::var("TEST_COMMITTEE_ADDRS") {
173                  Ok(aleo_addrs) => {
174                      let dev_committee =
175                          aleo_addrs.split(',').map(|addr| Address::<N>::from_str(addr).unwrap()).collect();
176                      return Ok(Some(dev_committee));
177                  }
178                  Err(err) => {
179                      warn!("Failed to load committee peers from environment: {err}");
180                      return Ok(None);
181                  }
182              }
183          }
184  
185          let now = Instant::now();
186          let (committee, timestamp) = &mut *self.latest_committee.lock().await;
187          if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME {
188              debug!("Updating the validator committee");
189              *timestamp = now;
190              let committe_query_addr = format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::NAME);
191              let response = self.http_client.get(committe_query_addr).send().await?;
192              let json = response.text().await?;
193              let full_committee = Committee::from_str(&json)?;
194              *committee = full_committee.members().keys().copied().collect();
195              debug!("The validator committee has {} members now", committee.len());
196  
197              Ok(Some(committee.clone()))
198          } else {
199              Ok(Some(committee.clone()))
200          }
201      }
202  
203      // Return the known addresses of current committee members, or all known
204      // validators if the committee info is unavailable.
205      pub async fn get_validator_addrs(&self) -> HashMap<SocketAddr, KnownValidatorInfo<N>> {
206          // First, collect info on all the validators we had connected to before.
207          let mut known_validators = self.known_validators.read().clone();
208          // If the committee info is available, prune non-committee members.
209          match self.get_or_update_committee().await {
210              Ok(Some(committee)) => {
211                  known_validators.retain(|_, (aleo_addr, _)| committee.contains(aleo_addr));
212                  known_validators
213              }
214              Ok(None) => known_validators,
215              Err(error) => {
216                  error!("Couldn't update the validator committee: {error}");
217                  known_validators
218              }
219          }
220      }
221  
222      /// Shuts down the bootstrap client.
223      pub async fn shut_down(&self) {
224          info!("Shutting down the bootstrap client...");
225  
226          // Shut down the low-level network features.
227          self.tcp.shut_down().await;
228  
229          // Shut down the node.
230          if let Some(shutdown_tx) = self.shutdown_tx.lock().take() {
231              let _ = shutdown_tx.send(());
232          }
233      }
234  
235      /// Blocks until a shutdown signal was received or manual shutdown was triggered.
236      pub async fn wait_for_signals(&self, handler: &SignalHandler) {
237          handler.wait_for_signals().await;
238  
239          // If the node is already initialized, then shut it down.
240          self.shut_down().await;
241      }
242  }