network.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{ 17 BootstrapClient, 18 bft::{ 19 MAX_VALIDATORS_TO_SEND, 20 events::{self, Event}, 21 }, 22 bootstrap_client::codec::BootstrapClientCodec, 23 network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver}, 24 router::{ 25 MAX_PEERS_TO_SEND, 26 messages::{self, Message}, 27 }, 28 tcp::{ConnectionSide, P2P, Tcp, protocols::*}, 29 }; 30 use alphavm::prelude::Network; 31 32 use indexmap::IndexMap; 33 #[cfg(feature = "locktick")] 34 use locktick::parking_lot::RwLock; 35 #[cfg(not(feature = "locktick"))] 36 use parking_lot::RwLock; 37 use std::{collections::HashMap, io, net::SocketAddr}; 38 use tokio::time::sleep; 39 use tokio_util::codec::Decoder; 40 41 impl<N: Network> P2P for BootstrapClient<N> { 42 fn tcp(&self) -> &Tcp { 43 &self.tcp 44 } 45 } 46 47 impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> { 48 const MAXIMUM_POOL_SIZE: usize = 10_000; 49 const OWNER: &'static str = "[Network]"; 50 const PEER_SLASHING_COUNT: usize = 200; 51 52 fn is_dev(&self) -> bool { 53 self.dev.is_some() 54 } 55 56 fn trusted_peers_only(&self) -> bool { 57 false 58 } 59 60 fn node_type(&self) -> NodeType { 61 NodeType::BootstrapClient 62 } 63 64 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> { 65 &self.peer_pool 66 } 67 68 fn resolver(&self) -> &RwLock<Resolver<N>> { 69 &self.resolver 70 } 71 } 72 73 /// The bootstrap client can handle both validator and non-validator messages. 74 #[derive(Debug)] 75 pub enum MessageOrEvent<N: Network> { 76 Message(Message<N>), 77 Event(Event<N>), 78 } 79 80 #[async_trait] 81 impl<N: Network> OnConnect for BootstrapClient<N> { 82 async fn on_connect(&self, peer_addr: SocketAddr) { 83 // If the peer is connected in validator (Gateway) mode, save it to the collection 84 // of known validators. 85 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) { 86 if let Some(peer) = self.get_connected_peer(listener_addr) { 87 if peer.node_type == NodeType::Validator { 88 self.known_validators.write().insert(listener_addr, (peer.aleo_addr, peer.connection_mode)); 89 } 90 } 91 } 92 // The peers should only ask us for the peer list; spawn a task that will 93 // terminate the connection after a while. 94 let tcp = self.tcp().clone(); 95 tokio::spawn(async move { 96 sleep(Self::CONNECTION_LIFETIME).await; 97 tcp.disconnect(peer_addr).await; 98 }); 99 } 100 } 101 102 #[async_trait] 103 impl<N: Network> Disconnect for BootstrapClient<N> { 104 /// Any extra operations to be performed during a disconnect. 105 async fn handle_disconnect(&self, peer_addr: SocketAddr) { 106 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) { 107 self.downgrade_peer_to_candidate(listener_addr); 108 } 109 } 110 } 111 112 #[async_trait] 113 impl<N: Network> Reading for BootstrapClient<N> { 114 type Codec = BootstrapClientCodec<N>; 115 type Message = <BootstrapClientCodec<N> as Decoder>::Item; 116 117 /// Creates a [`Decoder`] used to interpret messages from the network. 118 /// The `side` param indicates the connection side **from the node's perspective**. 119 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 120 Default::default() 121 } 122 123 /// Processes a message received from the network. 124 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 125 // Identify the connected peer. 126 let Some(listener_addr) = self.resolve_to_listener(peer_addr) else { 127 // Already disconnecting, ignore. 128 return Ok(()); 129 }; 130 131 // Handle the right peer request. 132 match message { 133 MessageOrEvent::Message(Message::PeerRequest(_)) => { 134 debug!("Received a PeerRequest from '{listener_addr}'"); 135 let mut peers = self.get_candidate_peers(); 136 137 // In order to filter out validators properly, we'll need the 138 // peer's node type and the list of validators. 139 let Some(peer) = self.get_connected_peer(listener_addr) else { 140 return Ok(()); 141 }; 142 let validators = self.get_validator_addrs().await; 143 144 if peer.node_type == NodeType::Validator { 145 // Filter out Gateway addresses. 146 peers.retain(|peer| { 147 validators 148 .get(&peer.listener_addr) 149 .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway) 150 .unwrap_or(true) 151 }); 152 } else { 153 // Filter out all validator addresses. 154 peers.retain(|peer| !validators.contains_key(&peer.listener_addr)); 155 } 156 peers.truncate(MAX_PEERS_TO_SEND); 157 let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>(); 158 159 debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len()); 160 let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers })); 161 if let Err(err) = self.unicast(peer_addr, msg)?.await { 162 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting"); 163 } else { 164 debug!("Disconnecting from '{listener_addr}' - peers provided"); 165 } 166 167 self.tcp().disconnect(peer_addr).await; 168 } 169 MessageOrEvent::Event(Event::ValidatorsRequest(_)) => { 170 debug!("Received a ValidatorsRequest from '{listener_addr}'"); 171 172 // Procure a list of applicable validator addresses. 173 let validators = self.get_validator_addrs().await; 174 let validators = validators 175 .into_iter() 176 .filter_map(|(listener_addr, (aleo_addr, connection_mode))| { 177 // Only pick addresses connected in Gateway mode. 178 (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, aleo_addr)) 179 }) 180 .take(MAX_VALIDATORS_TO_SEND) 181 .collect::<IndexMap<_, _>>(); 182 183 debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len()); 184 let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators })); 185 if let Err(err) = self.unicast(peer_addr, msg)?.await { 186 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting"); 187 } else { 188 debug!("Disconnecting from '{listener_addr}' - peers provided"); 189 } 190 191 self.tcp().disconnect(peer_addr).await; 192 } 193 msg => { 194 let name = match msg { 195 MessageOrEvent::Message(msg) => msg.name(), 196 MessageOrEvent::Event(msg) => msg.name(), 197 }; 198 trace!("Ignoring an unhandled message ({name}) from {listener_addr}"); 199 } 200 } 201 202 Ok(()) 203 } 204 } 205 206 #[async_trait] 207 impl<N: Network> Writing for BootstrapClient<N> { 208 type Codec = BootstrapClientCodec<N>; 209 type Message = MessageOrEvent<N>; 210 211 /// Creates an [`Encoder`] used to write the outbound messages to the target stream. 212 /// The `side` parameter indicates the connection side **from the node's perspective**. 213 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 214 Default::default() 215 } 216 }