/ node / router / src / heartbeat.rs
heartbeat.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use crate::{
 17      ConnectedPeer,
 18      NodeType,
 19      Outbound,
 20      PeerPoolHandling,
 21      Router,
 22      bootstrap_peers,
 23      messages::{DisconnectReason, Message, PeerRequest},
 24  };
 25  use alphavm::prelude::Network;
 26  
 27  use alphaos_node_tcp::P2P;
 28  
 29  use colored::Colorize;
 30  use rand::{prelude::IteratorRandom, rngs::OsRng};
 31  
 32  /// A helper function to compute the maximum of two numbers.
 33  /// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
 34  pub const fn max(a: usize, b: usize) -> usize {
 35      match a > b {
 36          true => a,
 37          false => b,
 38      }
 39  }
 40  
 41  #[async_trait]
 42  pub trait Heartbeat<N: Network>: Outbound<N> {
 43      /// The duration in seconds to sleep in between heartbeat executions.
 44      const HEARTBEAT_IN_SECS: u64 = 25; // 25 seconds
 45      /// The minimum number of peers required to maintain connections with.
 46      const MINIMUM_NUMBER_OF_PEERS: usize = 3;
 47      /// The median number of peers to maintain connections with.
 48      const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
 49      /// The maximum number of peers permitted to maintain connections with.
 50      const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
 51      /// The maximum number of provers to maintain connections with.
 52      const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
 53      /// The amount of time an IP address is prohibited from connecting.
 54      const IP_BAN_TIME_IN_SECS: u64 = 300;
 55  
 56      /// Handles the heartbeat request.
 57      async fn heartbeat(&self) {
 58          self.safety_check_minimum_number_of_peers();
 59          self.log_connected_peers();
 60  
 61          // Remove any stale connected peers.
 62          self.remove_stale_connected_peers();
 63          // Remove the oldest connected peer.
 64          self.remove_oldest_connected_peer();
 65          // Keep the number of connected peers within the allowed range.
 66          self.handle_connected_peers();
 67          // Keep the bootstrap peers within the allowed range.
 68          self.handle_bootstrap_peers().await;
 69          // Keep the trusted peers connected.
 70          self.handle_trusted_peers().await;
 71          // Keep the puzzle request up to date.
 72          self.handle_puzzle_request();
 73          // Unban any addresses whose ban time has expired.
 74          self.handle_banned_ips();
 75      }
 76  
 77      /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
 78      /// This function performs safety checks on the setting for the minimum number of peers.
 79      fn safety_check_minimum_number_of_peers(&self) {
 80          // Perform basic sanity checks on the configuration for the number of peers.
 81          assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
 82          assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
 83          assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
 84          assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
 85          assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
 86      }
 87  
 88      /// This function logs the connected peers.
 89      fn log_connected_peers(&self) {
 90          // Log the connected peers.
 91          let connected_peers = self.router().connected_peers();
 92          let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
 93          match connected_peers.len() {
 94              0 => warn!("No connected peers"),
 95              1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
 96              num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
 97          }
 98      }
 99  
100      /// This function removes any connected peers that have not communicated within the predefined time.
101      fn remove_stale_connected_peers(&self) {
102          // Check if any connected peer is stale.
103          for peer in self.router().get_connected_peers() {
104              // Disconnect if the peer has not communicated back within the predefined time.
105              let elapsed = peer.last_seen.elapsed();
106              if elapsed > Router::<N>::MAX_RADIO_SILENCE {
107                  warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr);
108                  // Disconnect from this peer.
109                  self.router().disconnect(peer.listener_addr);
110              }
111          }
112      }
113  
114      /// Returns a sorted vector of network addresses of all removable connected peers
115      /// where the first entry has the lowest priority and the last one the highest.
116      ///
117      /// Rules:
118      ///     - Trusted peers and bootstrap nodes are not removable.
119      ///     - Peers that we are currently syncing with are not removable.
120      ///     - Connections that have not been seen in a while are considered lower priority.
121      fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
122          // Are we synced already? (cache this here, so it does not need to be recomputed)
123          let is_block_synced = self.is_block_synced();
124  
125          // Sort by priority, where lowest priority will be at the beginning
126          // of the vector.
127          // Note, that this gives equal priority to clients and provers, which
128          // we might want to change in the future.
129          let mut peers = self.router().filter_connected_peers(|peer| {
130              !peer.trusted
131                  && peer.node_type != NodeType::BootstrapClient
132                  && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us.
133                  && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer.
134          });
135          peers.sort_by_key(|peer| peer.last_seen);
136  
137          peers
138      }
139  
140      /// This function removes the peer that we have not heard from the longest,
141      /// to keep the connections fresh.
142      /// It only triggers if the router is above the minimum number of connected peers.
143      fn remove_oldest_connected_peer(&self) {
144          // Skip if the node is not requesting peers.
145          if self.router().trusted_peers_only() {
146              return;
147          }
148  
149          // Skip if the router is at or below the minimum number of connected peers.
150          if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
151              return;
152          }
153  
154          // Disconnect from the oldest connected peer, which is the first entry in the list
155          // of removable peers.
156          // Do nothing, if the list is empty.
157          if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
158              info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
159              let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
160              self.router().disconnect(oldest);
161          }
162      }
163  
164      /// This function keeps the number of connected peers within the allowed range.
165      fn handle_connected_peers(&self) {
166          // Initialize an RNG.
167          let rng = &mut OsRng;
168  
169          // Obtain the number of connected peers.
170          let num_connected = self.router().number_of_connected_peers();
171          // Obtain the number of connected provers.
172          let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
173  
174          // Determine the maximum number of peers and provers to keep.
175          let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS);
176  
177          // Compute the number of surplus peers.
178          let num_surplus_peers = num_connected.saturating_sub(max_peers);
179          // Compute the number of surplus provers.
180          let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
181          // Compute the number of provers remaining connected.
182          let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
183          // Compute the number of surplus clients and validators.
184          let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
185  
186          if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
187              debug!(
188                  "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
189              );
190  
191              // Determine the provers to disconnect from.
192              let provers_to_disconnect = self
193                  .router()
194                  .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
195                  .into_iter()
196                  .choose_multiple(rng, num_surplus_provers);
197  
198              // Determine the clients and validators to disconnect from.
199              let peers_to_disconnect = self
200                  .get_removable_peers()
201                  .into_iter()
202                  .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately
203                  .take(num_surplus_clients_validators);
204  
205              // Proceed to send disconnect requests to these peers.
206              for peer in peers_to_disconnect.chain(provers_to_disconnect) {
207                  // TODO (howardwu): Remove this after specializing this function.
208                  if self.router().node_type().is_prover() {
209                      continue;
210                  }
211  
212                  let peer_addr = peer.listener_addr;
213                  info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
214                  self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
215                  // Disconnect from this peer.
216                  self.router().disconnect(peer_addr);
217              }
218          }
219  
220          // Obtain the number of connected peers.
221          let num_connected = self.router().number_of_connected_peers();
222          // Compute the number of deficit peers.
223          let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
224  
225          if num_deficient > 0 {
226              // Initialize an RNG.
227              let rng = &mut OsRng;
228  
229              // Attempt to connect to more peers, separately choosing from those at a greater block
230              // height, and those whose height is lower or unknown to us.
231              let own_height = self.router().ledger.latest_block_height();
232              let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
233                  .router()
234                  .get_candidate_peers()
235                  .into_iter()
236                  .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
237              // We may not know of half of `num_deficient` candidates; account for it using `min`.
238              let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
239              for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) {
240                  self.router().connect(peer.listener_addr);
241              }
242              for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) {
243                  self.router().connect(peer.listener_addr);
244              }
245  
246              if !self.router().trusted_peers_only() {
247                  // Request more peers from the connected peers.
248                  for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
249                      self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
250                  }
251              }
252          }
253      }
254  
255      /// This function keeps the number of bootstrap peers within the allowed range.
256      async fn handle_bootstrap_peers(&self) {
257          // Return early if we are in trusted peers only mode.
258          if self.router().trusted_peers_only() {
259              return;
260          }
261          // Split the bootstrap peers into connected and candidate lists.
262          let mut candidate_bootstrap = Vec::new();
263          let connected_bootstrap =
264              self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
265          for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
266              if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
267                  candidate_bootstrap.push(bootstrap_ip);
268              }
269          }
270          // If there are not enough connected bootstrap peers, connect to more.
271          if connected_bootstrap.is_empty() {
272              // Initialize an RNG.
273              let rng = &mut OsRng;
274              // Attempt to connect to a bootstrap peer.
275              if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
276                  match self.router().connect(peer_ip) {
277                      Some(hdl) => {
278                          let result = hdl.await;
279                          if let Err(err) = result {
280                              warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
281                          }
282                      }
283                      None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
284                  }
285              }
286          }
287          // Determine if the node is connected to more bootstrap peers than allowed.
288          let num_surplus = connected_bootstrap.len().saturating_sub(1);
289          if num_surplus > 0 {
290              // Initialize an RNG.
291              let rng = &mut OsRng;
292              // Proceed to send disconnect requests to these bootstrap peers.
293              for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
294                  info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
295                  self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
296                  // Disconnect from this peer.
297                  self.router().disconnect(peer.listener_addr);
298              }
299          }
300      }
301  
302      /// This function attempts to connect to any disconnected trusted peers.
303      async fn handle_trusted_peers(&self) {
304          // Ensure that the trusted nodes are connected.
305          let handles: Vec<_> = self
306              .router()
307              .unconnected_trusted_peers()
308              .iter()
309              .filter_map(|listener_addr| {
310                  debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`");
311                  let hdl = self.router().connect(*listener_addr);
312                  if hdl.is_none() {
313                      warn!("Could not initiate connection to trusted peer at `{listener_addr}`");
314                  }
315                  hdl
316              })
317              .collect();
318  
319          for result in futures::future::join_all(handles).await {
320              if let Err(err) = result {
321                  warn!("Could not connect to trusted peer: {err}");
322              }
323          }
324      }
325  
326      /// This function updates the puzzle if network has updated.
327      fn handle_puzzle_request(&self) {
328          // No-op
329      }
330  
331      // Remove addresses whose ban time has expired.
332      fn handle_banned_ips(&self) {
333          self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
334      }
335  }