heartbeat.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{ 17 ConnectedPeer, 18 NodeType, 19 Outbound, 20 PeerPoolHandling, 21 Router, 22 bootstrap_peers, 23 messages::{DisconnectReason, Message, PeerRequest}, 24 }; 25 use alphavm::prelude::Network; 26 27 use alphaos_node_tcp::P2P; 28 29 use colored::Colorize; 30 use rand::{prelude::IteratorRandom, rngs::OsRng}; 31 32 /// A helper function to compute the maximum of two numbers. 33 /// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391. 34 pub const fn max(a: usize, b: usize) -> usize { 35 match a > b { 36 true => a, 37 false => b, 38 } 39 } 40 41 #[async_trait] 42 pub trait Heartbeat<N: Network>: Outbound<N> { 43 /// The duration in seconds to sleep in between heartbeat executions. 44 const HEARTBEAT_IN_SECS: u64 = 25; // 25 seconds 45 /// The minimum number of peers required to maintain connections with. 46 const MINIMUM_NUMBER_OF_PEERS: usize = 3; 47 /// The median number of peers to maintain connections with. 48 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS); 49 /// The maximum number of peers permitted to maintain connections with. 50 const MAXIMUM_NUMBER_OF_PEERS: usize = 21; 51 /// The maximum number of provers to maintain connections with. 52 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4; 53 /// The amount of time an IP address is prohibited from connecting. 54 const IP_BAN_TIME_IN_SECS: u64 = 300; 55 56 /// Handles the heartbeat request. 57 async fn heartbeat(&self) { 58 self.safety_check_minimum_number_of_peers(); 59 self.log_connected_peers(); 60 61 // Remove any stale connected peers. 62 self.remove_stale_connected_peers(); 63 // Remove the oldest connected peer. 64 self.remove_oldest_connected_peer(); 65 // Keep the number of connected peers within the allowed range. 66 self.handle_connected_peers(); 67 // Keep the bootstrap peers within the allowed range. 68 self.handle_bootstrap_peers().await; 69 // Keep the trusted peers connected. 70 self.handle_trusted_peers().await; 71 // Keep the puzzle request up to date. 72 self.handle_puzzle_request(); 73 // Unban any addresses whose ban time has expired. 74 self.handle_banned_ips(); 75 } 76 77 /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers. 78 /// This function performs safety checks on the setting for the minimum number of peers. 79 fn safety_check_minimum_number_of_peers(&self) { 80 // Perform basic sanity checks on the configuration for the number of peers. 81 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1."); 82 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS); 83 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS); 84 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS); 85 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS); 86 } 87 88 /// This function logs the connected peers. 89 fn log_connected_peers(&self) { 90 // Log the connected peers. 91 let connected_peers = self.router().connected_peers(); 92 let connected_peers_fmt = format!("{connected_peers:?}").dimmed(); 93 match connected_peers.len() { 94 0 => warn!("No connected peers"), 95 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"), 96 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"), 97 } 98 } 99 100 /// This function removes any connected peers that have not communicated within the predefined time. 101 fn remove_stale_connected_peers(&self) { 102 // Check if any connected peer is stale. 103 for peer in self.router().get_connected_peers() { 104 // Disconnect if the peer has not communicated back within the predefined time. 105 let elapsed = peer.last_seen.elapsed(); 106 if elapsed > Router::<N>::MAX_RADIO_SILENCE { 107 warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr); 108 // Disconnect from this peer. 109 self.router().disconnect(peer.listener_addr); 110 } 111 } 112 } 113 114 /// Returns a sorted vector of network addresses of all removable connected peers 115 /// where the first entry has the lowest priority and the last one the highest. 116 /// 117 /// Rules: 118 /// - Trusted peers and bootstrap nodes are not removable. 119 /// - Peers that we are currently syncing with are not removable. 120 /// - Connections that have not been seen in a while are considered lower priority. 121 fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> { 122 // Are we synced already? (cache this here, so it does not need to be recomputed) 123 let is_block_synced = self.is_block_synced(); 124 125 // Sort by priority, where lowest priority will be at the beginning 126 // of the vector. 127 // Note, that this gives equal priority to clients and provers, which 128 // we might want to change in the future. 129 let mut peers = self.router().filter_connected_peers(|peer| { 130 !peer.trusted 131 && peer.node_type != NodeType::BootstrapClient 132 && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us. 133 && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer. 134 }); 135 peers.sort_by_key(|peer| peer.last_seen); 136 137 peers 138 } 139 140 /// This function removes the peer that we have not heard from the longest, 141 /// to keep the connections fresh. 142 /// It only triggers if the router is above the minimum number of connected peers. 143 fn remove_oldest_connected_peer(&self) { 144 // Skip if the node is not requesting peers. 145 if self.router().trusted_peers_only() { 146 return; 147 } 148 149 // Skip if the router is at or below the minimum number of connected peers. 150 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS { 151 return; 152 } 153 154 // Disconnect from the oldest connected peer, which is the first entry in the list 155 // of removable peers. 156 // Do nothing, if the list is empty. 157 if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) { 158 info!("Disconnecting from '{oldest}' (periodic refresh of peers)"); 159 let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into())); 160 self.router().disconnect(oldest); 161 } 162 } 163 164 /// This function keeps the number of connected peers within the allowed range. 165 fn handle_connected_peers(&self) { 166 // Initialize an RNG. 167 let rng = &mut OsRng; 168 169 // Obtain the number of connected peers. 170 let num_connected = self.router().number_of_connected_peers(); 171 // Obtain the number of connected provers. 172 let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len(); 173 174 // Determine the maximum number of peers and provers to keep. 175 let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS); 176 177 // Compute the number of surplus peers. 178 let num_surplus_peers = num_connected.saturating_sub(max_peers); 179 // Compute the number of surplus provers. 180 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers); 181 // Compute the number of provers remaining connected. 182 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers); 183 // Compute the number of surplus clients and validators. 184 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers); 185 186 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 { 187 debug!( 188 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers" 189 ); 190 191 // Determine the provers to disconnect from. 192 let provers_to_disconnect = self 193 .router() 194 .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted) 195 .into_iter() 196 .choose_multiple(rng, num_surplus_provers); 197 198 // Determine the clients and validators to disconnect from. 199 let peers_to_disconnect = self 200 .get_removable_peers() 201 .into_iter() 202 .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately 203 .take(num_surplus_clients_validators); 204 205 // Proceed to send disconnect requests to these peers. 206 for peer in peers_to_disconnect.chain(provers_to_disconnect) { 207 // TODO (howardwu): Remove this after specializing this function. 208 if self.router().node_type().is_prover() { 209 continue; 210 } 211 212 let peer_addr = peer.listener_addr; 213 info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)"); 214 self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into())); 215 // Disconnect from this peer. 216 self.router().disconnect(peer_addr); 217 } 218 } 219 220 // Obtain the number of connected peers. 221 let num_connected = self.router().number_of_connected_peers(); 222 // Compute the number of deficit peers. 223 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected); 224 225 if num_deficient > 0 { 226 // Initialize an RNG. 227 let rng = &mut OsRng; 228 229 // Attempt to connect to more peers, separately choosing from those at a greater block 230 // height, and those whose height is lower or unknown to us. 231 let own_height = self.router().ledger.latest_block_height(); 232 let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self 233 .router() 234 .get_candidate_peers() 235 .into_iter() 236 .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false)); 237 // We may not know of half of `num_deficient` candidates; account for it using `min`. 238 let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len()); 239 for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) { 240 self.router().connect(peer.listener_addr); 241 } 242 for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) { 243 self.router().connect(peer.listener_addr); 244 } 245 246 if !self.router().trusted_peers_only() { 247 // Request more peers from the connected peers. 248 for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) { 249 self.router().send(peer_ip, Message::PeerRequest(PeerRequest)); 250 } 251 } 252 } 253 } 254 255 /// This function keeps the number of bootstrap peers within the allowed range. 256 async fn handle_bootstrap_peers(&self) { 257 // Return early if we are in trusted peers only mode. 258 if self.router().trusted_peers_only() { 259 return; 260 } 261 // Split the bootstrap peers into connected and candidate lists. 262 let mut candidate_bootstrap = Vec::new(); 263 let connected_bootstrap = 264 self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient); 265 for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) { 266 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) { 267 candidate_bootstrap.push(bootstrap_ip); 268 } 269 } 270 // If there are not enough connected bootstrap peers, connect to more. 271 if connected_bootstrap.is_empty() { 272 // Initialize an RNG. 273 let rng = &mut OsRng; 274 // Attempt to connect to a bootstrap peer. 275 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) { 276 match self.router().connect(peer_ip) { 277 Some(hdl) => { 278 let result = hdl.await; 279 if let Err(err) = result { 280 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}"); 281 } 282 } 283 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"), 284 } 285 } 286 } 287 // Determine if the node is connected to more bootstrap peers than allowed. 288 let num_surplus = connected_bootstrap.len().saturating_sub(1); 289 if num_surplus > 0 { 290 // Initialize an RNG. 291 let rng = &mut OsRng; 292 // Proceed to send disconnect requests to these bootstrap peers. 293 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) { 294 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr); 295 self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into())); 296 // Disconnect from this peer. 297 self.router().disconnect(peer.listener_addr); 298 } 299 } 300 } 301 302 /// This function attempts to connect to any disconnected trusted peers. 303 async fn handle_trusted_peers(&self) { 304 // Ensure that the trusted nodes are connected. 305 let handles: Vec<_> = self 306 .router() 307 .unconnected_trusted_peers() 308 .iter() 309 .filter_map(|listener_addr| { 310 debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`"); 311 let hdl = self.router().connect(*listener_addr); 312 if hdl.is_none() { 313 warn!("Could not initiate connection to trusted peer at `{listener_addr}`"); 314 } 315 hdl 316 }) 317 .collect(); 318 319 for result in futures::future::join_all(handles).await { 320 if let Err(err) = result { 321 warn!("Could not connect to trusted peer: {err}"); 322 } 323 } 324 } 325 326 /// This function updates the puzzle if network has updated. 327 fn handle_puzzle_request(&self) { 328 // No-op 329 } 330 331 // Remove addresses whose ban time has expired. 332 fn handle_banned_ips(&self) { 333 self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS); 334 } 335 }