/ node / network / src / peering.rs
peering.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver, bootstrap_peers};
 17  
 18  use alpha_std::{StorageMode, alpha_ledger_dir};
 19  use alphaos_node_tcp::{P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
 20  use alphavm::prelude::{Address, Network};
 21  
 22  use anyhow::{Result, bail};
 23  #[cfg(feature = "locktick")]
 24  use locktick::parking_lot::RwLock;
 25  #[cfg(not(feature = "locktick"))]
 26  use parking_lot::RwLock;
 27  use std::{
 28      cmp,
 29      collections::{
 30          HashSet,
 31          hash_map::{Entry, HashMap},
 32      },
 33      fs,
 34      io::{self, Write},
 35      net::{IpAddr, SocketAddr},
 36      str::FromStr,
 37      time::Instant,
 38  };
 39  use tokio::task;
 40  use tracing::*;
 41  
 42  pub trait PeerPoolHandling<N: Network>: P2P {
 43      const OWNER: &str;
 44  
 45      /// The maximum number of peers permitted to be stored in the peer pool.
 46      const MAXIMUM_POOL_SIZE: usize;
 47  
 48      /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached.
 49      /// It must be lower than `MAXIMUM_POOL_SIZE`.
 50      const PEER_SLASHING_COUNT: usize;
 51  
 52      fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
 53  
 54      fn resolver(&self) -> &RwLock<Resolver<N>>;
 55  
 56      /// Returns `true` if the owning node is in development mode.
 57      fn is_dev(&self) -> bool;
 58  
 59      /// Returns `true` if the node is in trusted peers only mode.
 60      fn trusted_peers_only(&self) -> bool;
 61  
 62      /// Returns the node type.
 63      fn node_type(&self) -> NodeType;
 64  
 65      /// Returns the listener address of this node.
 66      fn local_ip(&self) -> SocketAddr {
 67          self.tcp().listening_addr().expect("The TCP listener is not enabled")
 68      }
 69  
 70      /// Returns `true` if the given IP is this node.
 71      fn is_local_ip(&self, addr: SocketAddr) -> bool {
 72          addr == self.local_ip()
 73              || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
 74      }
 75  
 76      /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
 77      fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
 78          !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
 79      }
 80  
 81      /// Returns the maximum number of connected peers.
 82      fn max_connected_peers(&self) -> usize {
 83          self.tcp().config().max_connections as usize
 84      }
 85  
 86      /// Ensure we are allowed to connect to the given listener address of a peer.
 87      ///
 88      /// # Return Values
 89      /// - `Ok(true)` if already connected (or connecting) to the peer.
 90      /// - `Ok(false)` if not connected to the peer but allowed to.
 91      /// - `Err(err)` if not allowed to connect to the peer.
 92      fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<bool> {
 93          // Ensure the peer IP is not this node.
 94          if self.is_local_ip(listener_addr) {
 95              bail!("{} Dropping connection attempt to '{listener_addr}' (attempted to self-connect)", Self::OWNER);
 96          }
 97          // Ensure the node does not surpass the maximum number of peer connections.
 98          if self.number_of_connected_peers() >= self.max_connected_peers() {
 99              bail!("{} Dropping connection attempt to '{listener_addr}' (maximum peers reached)", Self::OWNER);
100          }
101          // Ensure the node is not already connected to this peer.
102          if self.is_connected(listener_addr) {
103              debug!("{} Dropping connection attempt to '{listener_addr}' (already connected)", Self::OWNER);
104              return Ok(true);
105          }
106          // Ensure the node is not already connecting to this peer.
107          if self.is_connecting(listener_addr) {
108              debug!("{} Dropping connection attempt to '{listener_addr}' (already connecting)", Self::OWNER);
109              return Ok(true);
110          }
111          // If the IP is already banned, reject the attempt.
112          if self.is_ip_banned(listener_addr.ip()) {
113              bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip());
114          }
115          // If the node is in trusted peers only mode, ensure the peer is trusted.
116          if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
117              bail!("{} Dropping connection attempt to '{listener_addr}' (untrusted)", Self::OWNER);
118          }
119          Ok(false)
120      }
121  
122      /// Attempts to connect to the given peer's listener address.
123      ///
124      /// Returns None if we are already connected to the peer or cannot connect.
125      /// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
126      fn connect(&self, listener_addr: SocketAddr) -> Option<task::JoinHandle<bool>> {
127          // Return early if the attempt is against the protocol rules.
128          match self.check_connection_attempt(listener_addr) {
129              Ok(true) => return None,
130              Ok(false) => {}
131              Err(error) => {
132                  warn!("{} {error}", Self::OWNER);
133                  return None;
134              }
135          }
136  
137          // Determine whether the peer is trusted or a bootstrap node in order to decide
138          // how problematic any potential connection issues are.
139          let is_trusted_or_bootstrap =
140              self.is_trusted(listener_addr) || bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr);
141  
142          let tcp = self.tcp().clone();
143          Some(tokio::spawn(async move {
144              debug!("{} Connecting to {listener_addr}...", Self::OWNER);
145              // Attempt to connect to the peer.
146              match tcp.connect(listener_addr).await {
147                  Ok(_) => true,
148                  Err(error) => {
149                      if is_trusted_or_bootstrap {
150                          warn!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
151                      } else {
152                          debug!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
153                      }
154                      false
155                  }
156              }
157          }))
158      }
159  
160      /// Disconnects from the given peer IP, if the peer is connected. The returned boolean
161      /// indicates whether the peer was actually disconnected from, or if this was a noop.
162      fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
163          if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
164              let tcp = self.tcp().clone();
165              tokio::spawn(async move { tcp.disconnect(connected_addr).await })
166          } else {
167              tokio::spawn(async { false })
168          }
169      }
170  
171      /// Downgrades a connected peer to candidate status.
172      ///
173      /// Returns true if the peer was fully connected.
174      fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
175          let mut peer_pool = self.peer_pool().write();
176          let Some(peer) = peer_pool.get_mut(&listener_addr) else {
177              trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
178              return false;
179          };
180  
181          if let Peer::Connected(conn_peer) = peer {
182              // Exception: the BootstrapClient only has a single Resolver,
183              // so it may only map a validator's Aleo address once, for its
184              // Gateway-mode connection. This also means that the Router-mode
185              // connection may not remove that mapping.
186              let aleo_addr = if self.node_type() == NodeType::BootstrapClient
187                  && conn_peer.connection_mode == ConnectionMode::Router
188              {
189                  None
190              } else {
191                  Some(conn_peer.aleo_addr)
192              };
193              self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
194              peer.downgrade_to_candidate(listener_addr);
195              true
196          } else {
197              peer.downgrade_to_candidate(listener_addr);
198              false
199          }
200      }
201  
202      /// Adds new candidate peers to the peer pool, ensuring their validity and following the
203      /// limit on the number of peers in the pool. The listener addresses may be paired with
204      /// the last known block height of the associated peer.
205      fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
206          let trusted_peers = self.trusted_peers();
207  
208          // Hold a write guard from now on, so as not to accidentally slash multiple times
209          // based on multiple batches of candidate peers, and to not overwrite any entries.
210          let mut peer_pool = self.peer_pool().write();
211  
212          // Perform filtering to ensure candidate validity. Also count how many entries are updates.
213          let mut num_updates: usize = 0;
214          listener_addrs.retain(|&(addr, height)| {
215              !self.is_ip_banned(addr.ip())
216                  && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
217                  && peer_pool
218                      .get(&addr)
219                      .map(|peer| peer.is_candidate() && height.is_some())
220                      .inspect(|is_valid_update| {
221                          if *is_valid_update {
222                              num_updates += 1
223                          }
224                      })
225                      .unwrap_or(true)
226          });
227  
228          // If we've managed to filter out every entry, there's nothing to do.
229          if listener_addrs.is_empty() {
230              return;
231          }
232  
233          // If we're about to exceed the peer pool size limit, apply candidate slashing.
234          if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
235              && Self::PEER_SLASHING_COUNT != 0
236          {
237              // Collect the addresses of prospect peers.
238              let mut peers_to_slash = peer_pool
239                  .iter()
240                  .filter_map(|(addr, peer)| {
241                      (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
242                  })
243                  .collect::<Vec<_>>();
244  
245              // Get the low-level peer stats.
246              let known_peers = self.tcp().known_peers().snapshot();
247  
248              // Sort the list of candidate peers by failure count (descending) and timestamp (ascending).
249              let default_value = (0, Instant::now());
250              peers_to_slash.sort_unstable_by_key(|addr| {
251                  let (num_failures, last_seen) = known_peers
252                      .get(&addr.ip())
253                      .map(|stats| (stats.failures(), stats.timestamp()))
254                      .unwrap_or(default_value);
255                  (cmp::Reverse(num_failures), last_seen)
256              });
257  
258              // Retain the candidate peers with the most failures and oldest timestamps.
259              peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
260  
261              // Remove the peers to slash from the pool.
262              peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
263  
264              // Remove the peers to slash from the low-level list of known peers.
265              self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
266          }
267  
268          // Make sure that we won't breach the pool size limit in case the slashing didn't suffice.
269          listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
270  
271          // If we've managed to truncate to 0, exit.
272          if listener_addrs.is_empty() {
273              return;
274          }
275  
276          // Insert or update the applicable candidate peers.
277          for (addr, height) in listener_addrs {
278              match peer_pool.entry(addr) {
279                  Entry::Vacant(entry) => {
280                      entry.insert(Peer::new_candidate(addr, false));
281                  }
282                  Entry::Occupied(mut entry) => {
283                      if let Peer::Candidate(peer) = entry.get_mut() {
284                          peer.last_height_seen = height;
285                      }
286                  }
287              }
288          }
289      }
290  
291      /// Completely removes an entry from the peer pool.
292      fn remove_peer(&self, listener_addr: SocketAddr) {
293          self.peer_pool().write().remove(&listener_addr);
294      }
295  
296      /// Returns the connected peer address from the listener IP address.
297      fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
298          if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
299              Some(peer.connected_addr)
300          } else {
301              None
302          }
303      }
304  
305      /// Returns the connected peer aleo address from the listener IP address.
306      fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
307          if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
308              Some(peer.aleo_addr)
309          } else {
310              None
311          }
312      }
313  
314      /// Returns `true` if the node is connecting to the given peer's listener address.
315      fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
316          self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
317      }
318  
319      /// Returns `true` if the node is connected to the given peer listener address.
320      fn is_connected(&self, listener_addr: SocketAddr) -> bool {
321          self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
322      }
323  
324      /// Returns `true` if the node is connected to the given Aleo address.
325      fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
326          // The resolver only contains data on connected peers.
327          self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
328      }
329  
330      /// Returns `true` if the given listener address is trusted.
331      fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
332          self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
333      }
334  
335      /// Returns the number of all peers.
336      fn number_of_peers(&self) -> usize {
337          self.peer_pool().read().len()
338      }
339  
340      /// Returns the number of connected peers.
341      fn number_of_connected_peers(&self) -> usize {
342          self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
343      }
344  
345      /// Returns the number of connecting peers.
346      fn number_of_connecting_peers(&self) -> usize {
347          self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
348      }
349  
350      /// Returns the number of candidate peers.
351      fn number_of_candidate_peers(&self) -> usize {
352          self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
353      }
354  
355      /// Returns the connected peer given the peer IP, if it exists.
356      fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
357          if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
358              Some(peer.clone())
359          } else {
360              None
361          }
362      }
363  
364      /// Updates the connected peer - if it exists -  given the peer IP and a closure.
365      /// The returned status indicates whether the update was successful, i.e. the peer had existed.
366      fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
367          &self,
368          listener_addr: &SocketAddr,
369          mut update_fn: F,
370      ) -> bool {
371          if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
372              update_fn(peer);
373              true
374          } else {
375              false
376          }
377      }
378  
379      /// Returns the list of all peers (connected, connecting, and candidate).
380      fn get_peers(&self) -> Vec<Peer<N>> {
381          self.peer_pool().read().values().cloned().collect()
382      }
383  
384      /// Returns all connected peers.
385      fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
386          self.filter_connected_peers(|_| true)
387      }
388  
389      /// Returns an optionally bounded list of all connected peers sorted by their
390      /// block height (highest first) and failure count (lowest first).
391      fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
392          // Get a snapshot of the currently connected peers.
393          let mut peers = self.get_connected_peers();
394          // Get the low-level peer stats.
395          let known_peers = self.tcp().known_peers().snapshot();
396  
397          // Sort the prospect peers.
398          peers.sort_unstable_by_key(|peer| {
399              if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
400                  // Prioritize greatest height, then lowest failure count.
401                  (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
402              } else {
403                  // Unreachable; use an else-compatible dummy.
404                  (cmp::Reverse(peer.last_height_seen), 0)
405              }
406          });
407          if let Some(max) = max_entries {
408              peers.truncate(max);
409          }
410  
411          peers
412      }
413  
414      /// Returns all connected peers that satisify the given predicate.
415      fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
416          self.peer_pool()
417              .read()
418              .values()
419              .filter_map(|p| {
420                  if let Peer::Connected(peer) = p
421                      && predicate(peer)
422                  {
423                      Some(peer)
424                  } else {
425                      None
426                  }
427              })
428              .cloned()
429              .collect()
430      }
431  
432      /// Returns the list of connected peers.
433      fn connected_peers(&self) -> Vec<SocketAddr> {
434          self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
435      }
436  
437      /// Returns the list of trusted peers.
438      fn trusted_peers(&self) -> Vec<SocketAddr> {
439          self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
440      }
441  
442      /// Returns the list of candidate peers.
443      fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
444          self.peer_pool()
445              .read()
446              .values()
447              .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
448              .collect()
449      }
450  
451      /// Returns the list of unconnected trusted peers.
452      fn unconnected_trusted_peers(&self) -> HashSet<SocketAddr> {
453          self.peer_pool()
454              .read()
455              .iter()
456              .filter_map(
457                  |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None },
458              )
459              .collect()
460      }
461  
462      /// Loads any previously cached peer addresses so they can be introduced as initial
463      /// candidate peers to connect to.
464      fn load_cached_peers(storage_mode: &StorageMode, filename: &str) -> Result<Vec<SocketAddr>> {
465          let mut peer_cache_path = alpha_ledger_dir(N::ID, storage_mode);
466          peer_cache_path.push(filename);
467  
468          let peers = match fs::read_to_string(&peer_cache_path) {
469              Ok(cached_peers_str) => {
470                  let mut cached_peers = Vec::new();
471                  for peer_addr_str in cached_peers_str.lines() {
472                      match SocketAddr::from_str(peer_addr_str) {
473                          Ok(addr) => cached_peers.push(addr),
474                          Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
475                      }
476                  }
477                  cached_peers
478              }
479              Err(error) if error.kind() == io::ErrorKind::NotFound => {
480                  // Not an issue - the cache may not exist yet.
481                  Vec::new()
482              }
483              Err(error) => {
484                  warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, peer_cache_path.display());
485                  Vec::new()
486              }
487          };
488  
489          Ok(peers)
490      }
491  
492      /// Preserve the peers who have the greatest known block heights, and the lowest
493      /// number of registered network failures.
494      fn save_best_peers(&self, storage_mode: &StorageMode, filename: &str, max_entries: Option<usize>) -> Result<()> {
495          // Collect all prospect peers.
496          let mut peers = self.get_peers();
497  
498          // Get the low-level peer stats.
499          let known_peers = self.tcp().known_peers().snapshot();
500  
501          // Sort the list of peers.
502          peers.sort_unstable_by_key(|peer| {
503              if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
504                  // Prioritize greatest height, then lowest failure count.
505                  (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
506              } else {
507                  // Unreachable; use an else-compatible dummy.
508                  (cmp::Reverse(peer.last_height_seen()), 0)
509              }
510          });
511          if let Some(max) = max_entries {
512              peers.truncate(max);
513          }
514  
515          // Dump the connected peers to a file.
516          let mut path = alpha_ledger_dir(N::ID, storage_mode);
517          path.push(filename);
518          let mut file = fs::File::create(path)?;
519          for peer in peers {
520              writeln!(file, "{}", peer.listener_addr())?;
521          }
522  
523          Ok(())
524      }
525  
526      // Introduces a new connecting peer into the peer pool if unknown, or promotes
527      // a known candidate peer to a connecting one. The returned boolean indicates
528      // whether the peer has been added/promoted, or rejected due to already being
529      // shaken hands with or connected.
530      fn add_connecting_peer(&self, listener_addr: SocketAddr) -> bool {
531          match self.peer_pool().write().entry(listener_addr) {
532              Entry::Vacant(entry) => {
533                  entry.insert(Peer::new_connecting(listener_addr, false));
534                  true
535              }
536              Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
537                  entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted()));
538                  true
539              }
540              Entry::Occupied(_) => false,
541          }
542      }
543  
544      /// Temporarily IP-ban and disconnect from the peer with the given listener address and an
545      /// optional reason for the ban. This also removes the peer from the candidate pool.
546      fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
547          // Ignore IP-banning if we are in dev mode.
548          if self.is_dev() {
549              return;
550          }
551  
552          let ip = listener_addr.ip();
553          debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
554  
555          // Insert/update the low-level IP ban list.
556          self.tcp().banned_peers().update_ip_ban(ip);
557  
558          // Disconnect from the peer.
559          self.disconnect(listener_addr);
560          // Remove the peer from the pool.
561          self.remove_peer(listener_addr);
562      }
563  
564      /// Check whether the given IP address is currently banned.
565      fn is_ip_banned(&self, ip: IpAddr) -> bool {
566          self.tcp().banned_peers().is_ip_banned(&ip)
567      }
568  
569      /// Insert or update a banned IP.
570      fn update_ip_ban(&self, ip: IpAddr) {
571          self.tcp().banned_peers().update_ip_ban(ip);
572      }
573  }