/ node / src / bootstrap_client / network.rs
network.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use crate::{
 17      BootstrapClient,
 18      bft::{
 19          MAX_VALIDATORS_TO_SEND,
 20          events::{self, Event},
 21      },
 22      bootstrap_client::codec::BootstrapClientCodec,
 23      network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver},
 24      router::{
 25          MAX_PEERS_TO_SEND,
 26          messages::{self, Message},
 27      },
 28      tcp::{ConnectionSide, P2P, Tcp, protocols::*},
 29  };
 30  use alphavm::prelude::Network;
 31  
 32  use indexmap::IndexMap;
 33  #[cfg(feature = "locktick")]
 34  use locktick::parking_lot::RwLock;
 35  #[cfg(not(feature = "locktick"))]
 36  use parking_lot::RwLock;
 37  use std::{collections::HashMap, io, net::SocketAddr};
 38  use tokio::time::sleep;
 39  use tokio_util::codec::Decoder;
 40  
 41  impl<N: Network> P2P for BootstrapClient<N> {
 42      fn tcp(&self) -> &Tcp {
 43          &self.tcp
 44      }
 45  }
 46  
 47  impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
 48      const MAXIMUM_POOL_SIZE: usize = 10_000;
 49      const OWNER: &'static str = "[Network]";
 50      const PEER_SLASHING_COUNT: usize = 200;
 51  
 52      fn is_dev(&self) -> bool {
 53          self.dev.is_some()
 54      }
 55  
 56      fn trusted_peers_only(&self) -> bool {
 57          false
 58      }
 59  
 60      fn node_type(&self) -> NodeType {
 61          NodeType::BootstrapClient
 62      }
 63  
 64      fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
 65          &self.peer_pool
 66      }
 67  
 68      fn resolver(&self) -> &RwLock<Resolver<N>> {
 69          &self.resolver
 70      }
 71  }
 72  
 73  /// The bootstrap client can handle both validator and non-validator messages.
 74  #[derive(Debug)]
 75  pub enum MessageOrEvent<N: Network> {
 76      Message(Message<N>),
 77      Event(Event<N>),
 78  }
 79  
 80  #[async_trait]
 81  impl<N: Network> OnConnect for BootstrapClient<N> {
 82      async fn on_connect(&self, peer_addr: SocketAddr) {
 83          // If the peer is connected in validator (Gateway) mode, save it to the collection
 84          // of known validators.
 85          if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
 86              if let Some(peer) = self.get_connected_peer(listener_addr) {
 87                  if peer.node_type == NodeType::Validator {
 88                      self.known_validators.write().insert(listener_addr, (peer.aleo_addr, peer.connection_mode));
 89                  }
 90              }
 91          }
 92          // The peers should only ask us for the peer list; spawn a task that will
 93          // terminate the connection after a while.
 94          let tcp = self.tcp().clone();
 95          tokio::spawn(async move {
 96              sleep(Self::CONNECTION_LIFETIME).await;
 97              tcp.disconnect(peer_addr).await;
 98          });
 99      }
100  }
101  
102  #[async_trait]
103  impl<N: Network> Disconnect for BootstrapClient<N> {
104      /// Any extra operations to be performed during a disconnect.
105      async fn handle_disconnect(&self, peer_addr: SocketAddr) {
106          if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
107              self.downgrade_peer_to_candidate(listener_addr);
108          }
109      }
110  }
111  
112  #[async_trait]
113  impl<N: Network> Reading for BootstrapClient<N> {
114      type Codec = BootstrapClientCodec<N>;
115      type Message = <BootstrapClientCodec<N> as Decoder>::Item;
116  
117      /// Creates a [`Decoder`] used to interpret messages from the network.
118      /// The `side` param indicates the connection side **from the node's perspective**.
119      fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
120          Default::default()
121      }
122  
123      /// Processes a message received from the network.
124      async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
125          // Identify the connected peer.
126          let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
127              // Already disconnecting, ignore.
128              return Ok(());
129          };
130  
131          // Handle the right peer request.
132          match message {
133              MessageOrEvent::Message(Message::PeerRequest(_)) => {
134                  debug!("Received a PeerRequest from '{listener_addr}'");
135                  let mut peers = self.get_candidate_peers();
136  
137                  // In order to filter out validators properly, we'll need the
138                  // peer's node type and the list of validators.
139                  let Some(peer) = self.get_connected_peer(listener_addr) else {
140                      return Ok(());
141                  };
142                  let validators = self.get_validator_addrs().await;
143  
144                  if peer.node_type == NodeType::Validator {
145                      // Filter out Gateway addresses.
146                      peers.retain(|peer| {
147                          validators
148                              .get(&peer.listener_addr)
149                              .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway)
150                              .unwrap_or(true)
151                      });
152                  } else {
153                      // Filter out all validator addresses.
154                      peers.retain(|peer| !validators.contains_key(&peer.listener_addr));
155                  }
156                  peers.truncate(MAX_PEERS_TO_SEND);
157                  let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
158  
159                  debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
160                  let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
161                  if let Err(err) = self.unicast(peer_addr, msg)?.await {
162                      warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
163                  } else {
164                      debug!("Disconnecting from '{listener_addr}' - peers provided");
165                  }
166  
167                  self.tcp().disconnect(peer_addr).await;
168              }
169              MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
170                  debug!("Received a ValidatorsRequest from '{listener_addr}'");
171  
172                  // Procure a list of applicable validator addresses.
173                  let validators = self.get_validator_addrs().await;
174                  let validators = validators
175                      .into_iter()
176                      .filter_map(|(listener_addr, (aleo_addr, connection_mode))| {
177                          // Only pick addresses connected in Gateway mode.
178                          (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, aleo_addr))
179                      })
180                      .take(MAX_VALIDATORS_TO_SEND)
181                      .collect::<IndexMap<_, _>>();
182  
183                  debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
184                  let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
185                  if let Err(err) = self.unicast(peer_addr, msg)?.await {
186                      warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
187                  } else {
188                      debug!("Disconnecting from '{listener_addr}' - peers provided");
189                  }
190  
191                  self.tcp().disconnect(peer_addr).await;
192              }
193              msg => {
194                  let name = match msg {
195                      MessageOrEvent::Message(msg) => msg.name(),
196                      MessageOrEvent::Event(msg) => msg.name(),
197                  };
198                  trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
199              }
200          }
201  
202          Ok(())
203      }
204  }
205  
206  #[async_trait]
207  impl<N: Network> Writing for BootstrapClient<N> {
208      type Codec = BootstrapClientCodec<N>;
209      type Message = MessageOrEvent<N>;
210  
211      /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
212      /// The `side` parameter indicates the connection side **from the node's perspective**.
213      fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
214          Default::default()
215      }
216  }