peering.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver, bootstrap_peers}; 17 18 use alpha_std::{StorageMode, alpha_ledger_dir}; 19 use alphaos_node_tcp::{P2P, is_bogon_ip, is_unspecified_or_broadcast_ip}; 20 use alphavm::prelude::{Address, Network}; 21 22 use anyhow::{Result, bail}; 23 #[cfg(feature = "locktick")] 24 use locktick::parking_lot::RwLock; 25 #[cfg(not(feature = "locktick"))] 26 use parking_lot::RwLock; 27 use std::{ 28 cmp, 29 collections::{ 30 HashSet, 31 hash_map::{Entry, HashMap}, 32 }, 33 fs, 34 io::{self, Write}, 35 net::{IpAddr, SocketAddr}, 36 str::FromStr, 37 time::Instant, 38 }; 39 use tokio::task; 40 use tracing::*; 41 42 pub trait PeerPoolHandling<N: Network>: P2P { 43 const OWNER: &str; 44 45 /// The maximum number of peers permitted to be stored in the peer pool. 46 const MAXIMUM_POOL_SIZE: usize; 47 48 /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached. 49 /// It must be lower than `MAXIMUM_POOL_SIZE`. 50 const PEER_SLASHING_COUNT: usize; 51 52 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>; 53 54 fn resolver(&self) -> &RwLock<Resolver<N>>; 55 56 /// Returns `true` if the owning node is in development mode. 57 fn is_dev(&self) -> bool; 58 59 /// Returns `true` if the node is in trusted peers only mode. 60 fn trusted_peers_only(&self) -> bool; 61 62 /// Returns the node type. 63 fn node_type(&self) -> NodeType; 64 65 /// Returns the listener address of this node. 66 fn local_ip(&self) -> SocketAddr { 67 self.tcp().listening_addr().expect("The TCP listener is not enabled") 68 } 69 70 /// Returns `true` if the given IP is this node. 71 fn is_local_ip(&self, addr: SocketAddr) -> bool { 72 addr == self.local_ip() 73 || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port() 74 } 75 76 /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified. 77 fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool { 78 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip()) 79 } 80 81 /// Returns the maximum number of connected peers. 82 fn max_connected_peers(&self) -> usize { 83 self.tcp().config().max_connections as usize 84 } 85 86 /// Ensure we are allowed to connect to the given listener address of a peer. 87 /// 88 /// # Return Values 89 /// - `Ok(true)` if already connected (or connecting) to the peer. 90 /// - `Ok(false)` if not connected to the peer but allowed to. 91 /// - `Err(err)` if not allowed to connect to the peer. 92 fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<bool> { 93 // Ensure the peer IP is not this node. 94 if self.is_local_ip(listener_addr) { 95 bail!("{} Dropping connection attempt to '{listener_addr}' (attempted to self-connect)", Self::OWNER); 96 } 97 // Ensure the node does not surpass the maximum number of peer connections. 98 if self.number_of_connected_peers() >= self.max_connected_peers() { 99 bail!("{} Dropping connection attempt to '{listener_addr}' (maximum peers reached)", Self::OWNER); 100 } 101 // Ensure the node is not already connected to this peer. 102 if self.is_connected(listener_addr) { 103 debug!("{} Dropping connection attempt to '{listener_addr}' (already connected)", Self::OWNER); 104 return Ok(true); 105 } 106 // Ensure the node is not already connecting to this peer. 107 if self.is_connecting(listener_addr) { 108 debug!("{} Dropping connection attempt to '{listener_addr}' (already connecting)", Self::OWNER); 109 return Ok(true); 110 } 111 // If the IP is already banned, reject the attempt. 112 if self.is_ip_banned(listener_addr.ip()) { 113 bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip()); 114 } 115 // If the node is in trusted peers only mode, ensure the peer is trusted. 116 if self.trusted_peers_only() && !self.is_trusted(listener_addr) { 117 bail!("{} Dropping connection attempt to '{listener_addr}' (untrusted)", Self::OWNER); 118 } 119 Ok(false) 120 } 121 122 /// Attempts to connect to the given peer's listener address. 123 /// 124 /// Returns None if we are already connected to the peer or cannot connect. 125 /// Otherwise, it returns a handle to the tokio tasks that sets up the connection. 126 fn connect(&self, listener_addr: SocketAddr) -> Option<task::JoinHandle<bool>> { 127 // Return early if the attempt is against the protocol rules. 128 match self.check_connection_attempt(listener_addr) { 129 Ok(true) => return None, 130 Ok(false) => {} 131 Err(error) => { 132 warn!("{} {error}", Self::OWNER); 133 return None; 134 } 135 } 136 137 // Determine whether the peer is trusted or a bootstrap node in order to decide 138 // how problematic any potential connection issues are. 139 let is_trusted_or_bootstrap = 140 self.is_trusted(listener_addr) || bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr); 141 142 let tcp = self.tcp().clone(); 143 Some(tokio::spawn(async move { 144 debug!("{} Connecting to {listener_addr}...", Self::OWNER); 145 // Attempt to connect to the peer. 146 match tcp.connect(listener_addr).await { 147 Ok(_) => true, 148 Err(error) => { 149 if is_trusted_or_bootstrap { 150 warn!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER); 151 } else { 152 debug!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER); 153 } 154 false 155 } 156 } 157 })) 158 } 159 160 /// Disconnects from the given peer IP, if the peer is connected. The returned boolean 161 /// indicates whether the peer was actually disconnected from, or if this was a noop. 162 fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> { 163 if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) { 164 let tcp = self.tcp().clone(); 165 tokio::spawn(async move { tcp.disconnect(connected_addr).await }) 166 } else { 167 tokio::spawn(async { false }) 168 } 169 } 170 171 /// Downgrades a connected peer to candidate status. 172 /// 173 /// Returns true if the peer was fully connected. 174 fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool { 175 let mut peer_pool = self.peer_pool().write(); 176 let Some(peer) = peer_pool.get_mut(&listener_addr) else { 177 trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER); 178 return false; 179 }; 180 181 if let Peer::Connected(conn_peer) = peer { 182 // Exception: the BootstrapClient only has a single Resolver, 183 // so it may only map a validator's Aleo address once, for its 184 // Gateway-mode connection. This also means that the Router-mode 185 // connection may not remove that mapping. 186 let aleo_addr = if self.node_type() == NodeType::BootstrapClient 187 && conn_peer.connection_mode == ConnectionMode::Router 188 { 189 None 190 } else { 191 Some(conn_peer.aleo_addr) 192 }; 193 self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr); 194 peer.downgrade_to_candidate(listener_addr); 195 true 196 } else { 197 peer.downgrade_to_candidate(listener_addr); 198 false 199 } 200 } 201 202 /// Adds new candidate peers to the peer pool, ensuring their validity and following the 203 /// limit on the number of peers in the pool. The listener addresses may be paired with 204 /// the last known block height of the associated peer. 205 fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) { 206 let trusted_peers = self.trusted_peers(); 207 208 // Hold a write guard from now on, so as not to accidentally slash multiple times 209 // based on multiple batches of candidate peers, and to not overwrite any entries. 210 let mut peer_pool = self.peer_pool().write(); 211 212 // Perform filtering to ensure candidate validity. Also count how many entries are updates. 213 let mut num_updates: usize = 0; 214 listener_addrs.retain(|&(addr, height)| { 215 !self.is_ip_banned(addr.ip()) 216 && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) } 217 && peer_pool 218 .get(&addr) 219 .map(|peer| peer.is_candidate() && height.is_some()) 220 .inspect(|is_valid_update| { 221 if *is_valid_update { 222 num_updates += 1 223 } 224 }) 225 .unwrap_or(true) 226 }); 227 228 // If we've managed to filter out every entry, there's nothing to do. 229 if listener_addrs.is_empty() { 230 return; 231 } 232 233 // If we're about to exceed the peer pool size limit, apply candidate slashing. 234 if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE 235 && Self::PEER_SLASHING_COUNT != 0 236 { 237 // Collect the addresses of prospect peers. 238 let mut peers_to_slash = peer_pool 239 .iter() 240 .filter_map(|(addr, peer)| { 241 (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr) 242 }) 243 .collect::<Vec<_>>(); 244 245 // Get the low-level peer stats. 246 let known_peers = self.tcp().known_peers().snapshot(); 247 248 // Sort the list of candidate peers by failure count (descending) and timestamp (ascending). 249 let default_value = (0, Instant::now()); 250 peers_to_slash.sort_unstable_by_key(|addr| { 251 let (num_failures, last_seen) = known_peers 252 .get(&addr.ip()) 253 .map(|stats| (stats.failures(), stats.timestamp())) 254 .unwrap_or(default_value); 255 (cmp::Reverse(num_failures), last_seen) 256 }); 257 258 // Retain the candidate peers with the most failures and oldest timestamps. 259 peers_to_slash.truncate(Self::PEER_SLASHING_COUNT); 260 261 // Remove the peers to slash from the pool. 262 peer_pool.retain(|addr, _| !peers_to_slash.contains(addr)); 263 264 // Remove the peers to slash from the low-level list of known peers. 265 self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip())); 266 } 267 268 // Make sure that we won't breach the pool size limit in case the slashing didn't suffice. 269 listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len())); 270 271 // If we've managed to truncate to 0, exit. 272 if listener_addrs.is_empty() { 273 return; 274 } 275 276 // Insert or update the applicable candidate peers. 277 for (addr, height) in listener_addrs { 278 match peer_pool.entry(addr) { 279 Entry::Vacant(entry) => { 280 entry.insert(Peer::new_candidate(addr, false)); 281 } 282 Entry::Occupied(mut entry) => { 283 if let Peer::Candidate(peer) = entry.get_mut() { 284 peer.last_height_seen = height; 285 } 286 } 287 } 288 } 289 } 290 291 /// Completely removes an entry from the peer pool. 292 fn remove_peer(&self, listener_addr: SocketAddr) { 293 self.peer_pool().write().remove(&listener_addr); 294 } 295 296 /// Returns the connected peer address from the listener IP address. 297 fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> { 298 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) { 299 Some(peer.connected_addr) 300 } else { 301 None 302 } 303 } 304 305 /// Returns the connected peer aleo address from the listener IP address. 306 fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> { 307 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) { 308 Some(peer.aleo_addr) 309 } else { 310 None 311 } 312 } 313 314 /// Returns `true` if the node is connecting to the given peer's listener address. 315 fn is_connecting(&self, listener_addr: SocketAddr) -> bool { 316 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting()) 317 } 318 319 /// Returns `true` if the node is connected to the given peer listener address. 320 fn is_connected(&self, listener_addr: SocketAddr) -> bool { 321 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected()) 322 } 323 324 /// Returns `true` if the node is connected to the given Aleo address. 325 fn is_connected_address(&self, aleo_address: Address<N>) -> bool { 326 // The resolver only contains data on connected peers. 327 self.resolver().read().get_peer_ip_for_address(aleo_address).is_some() 328 } 329 330 /// Returns `true` if the given listener address is trusted. 331 fn is_trusted(&self, listener_addr: SocketAddr) -> bool { 332 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted()) 333 } 334 335 /// Returns the number of all peers. 336 fn number_of_peers(&self) -> usize { 337 self.peer_pool().read().len() 338 } 339 340 /// Returns the number of connected peers. 341 fn number_of_connected_peers(&self) -> usize { 342 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count() 343 } 344 345 /// Returns the number of connecting peers. 346 fn number_of_connecting_peers(&self) -> usize { 347 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count() 348 } 349 350 /// Returns the number of candidate peers. 351 fn number_of_candidate_peers(&self) -> usize { 352 self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count() 353 } 354 355 /// Returns the connected peer given the peer IP, if it exists. 356 fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> { 357 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) { 358 Some(peer.clone()) 359 } else { 360 None 361 } 362 } 363 364 /// Updates the connected peer - if it exists - given the peer IP and a closure. 365 /// The returned status indicates whether the update was successful, i.e. the peer had existed. 366 fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>( 367 &self, 368 listener_addr: &SocketAddr, 369 mut update_fn: F, 370 ) -> bool { 371 if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) { 372 update_fn(peer); 373 true 374 } else { 375 false 376 } 377 } 378 379 /// Returns the list of all peers (connected, connecting, and candidate). 380 fn get_peers(&self) -> Vec<Peer<N>> { 381 self.peer_pool().read().values().cloned().collect() 382 } 383 384 /// Returns all connected peers. 385 fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> { 386 self.filter_connected_peers(|_| true) 387 } 388 389 /// Returns an optionally bounded list of all connected peers sorted by their 390 /// block height (highest first) and failure count (lowest first). 391 fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> { 392 // Get a snapshot of the currently connected peers. 393 let mut peers = self.get_connected_peers(); 394 // Get the low-level peer stats. 395 let known_peers = self.tcp().known_peers().snapshot(); 396 397 // Sort the prospect peers. 398 peers.sort_unstable_by_key(|peer| { 399 if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) { 400 // Prioritize greatest height, then lowest failure count. 401 (cmp::Reverse(peer.last_height_seen), peer_stats.failures()) 402 } else { 403 // Unreachable; use an else-compatible dummy. 404 (cmp::Reverse(peer.last_height_seen), 0) 405 } 406 }); 407 if let Some(max) = max_entries { 408 peers.truncate(max); 409 } 410 411 peers 412 } 413 414 /// Returns all connected peers that satisify the given predicate. 415 fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> { 416 self.peer_pool() 417 .read() 418 .values() 419 .filter_map(|p| { 420 if let Peer::Connected(peer) = p 421 && predicate(peer) 422 { 423 Some(peer) 424 } else { 425 None 426 } 427 }) 428 .cloned() 429 .collect() 430 } 431 432 /// Returns the list of connected peers. 433 fn connected_peers(&self) -> Vec<SocketAddr> { 434 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect() 435 } 436 437 /// Returns the list of trusted peers. 438 fn trusted_peers(&self) -> Vec<SocketAddr> { 439 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect() 440 } 441 442 /// Returns the list of candidate peers. 443 fn get_candidate_peers(&self) -> Vec<CandidatePeer> { 444 self.peer_pool() 445 .read() 446 .values() 447 .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None }) 448 .collect() 449 } 450 451 /// Returns the list of unconnected trusted peers. 452 fn unconnected_trusted_peers(&self) -> HashSet<SocketAddr> { 453 self.peer_pool() 454 .read() 455 .iter() 456 .filter_map( 457 |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None }, 458 ) 459 .collect() 460 } 461 462 /// Loads any previously cached peer addresses so they can be introduced as initial 463 /// candidate peers to connect to. 464 fn load_cached_peers(storage_mode: &StorageMode, filename: &str) -> Result<Vec<SocketAddr>> { 465 let mut peer_cache_path = alpha_ledger_dir(N::ID, storage_mode); 466 peer_cache_path.push(filename); 467 468 let peers = match fs::read_to_string(&peer_cache_path) { 469 Ok(cached_peers_str) => { 470 let mut cached_peers = Vec::new(); 471 for peer_addr_str in cached_peers_str.lines() { 472 match SocketAddr::from_str(peer_addr_str) { 473 Ok(addr) => cached_peers.push(addr), 474 Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"), 475 } 476 } 477 cached_peers 478 } 479 Err(error) if error.kind() == io::ErrorKind::NotFound => { 480 // Not an issue - the cache may not exist yet. 481 Vec::new() 482 } 483 Err(error) => { 484 warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, peer_cache_path.display()); 485 Vec::new() 486 } 487 }; 488 489 Ok(peers) 490 } 491 492 /// Preserve the peers who have the greatest known block heights, and the lowest 493 /// number of registered network failures. 494 fn save_best_peers(&self, storage_mode: &StorageMode, filename: &str, max_entries: Option<usize>) -> Result<()> { 495 // Collect all prospect peers. 496 let mut peers = self.get_peers(); 497 498 // Get the low-level peer stats. 499 let known_peers = self.tcp().known_peers().snapshot(); 500 501 // Sort the list of peers. 502 peers.sort_unstable_by_key(|peer| { 503 if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) { 504 // Prioritize greatest height, then lowest failure count. 505 (cmp::Reverse(peer.last_height_seen()), peer_stats.failures()) 506 } else { 507 // Unreachable; use an else-compatible dummy. 508 (cmp::Reverse(peer.last_height_seen()), 0) 509 } 510 }); 511 if let Some(max) = max_entries { 512 peers.truncate(max); 513 } 514 515 // Dump the connected peers to a file. 516 let mut path = alpha_ledger_dir(N::ID, storage_mode); 517 path.push(filename); 518 let mut file = fs::File::create(path)?; 519 for peer in peers { 520 writeln!(file, "{}", peer.listener_addr())?; 521 } 522 523 Ok(()) 524 } 525 526 // Introduces a new connecting peer into the peer pool if unknown, or promotes 527 // a known candidate peer to a connecting one. The returned boolean indicates 528 // whether the peer has been added/promoted, or rejected due to already being 529 // shaken hands with or connected. 530 fn add_connecting_peer(&self, listener_addr: SocketAddr) -> bool { 531 match self.peer_pool().write().entry(listener_addr) { 532 Entry::Vacant(entry) => { 533 entry.insert(Peer::new_connecting(listener_addr, false)); 534 true 535 } 536 Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => { 537 entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted())); 538 true 539 } 540 Entry::Occupied(_) => false, 541 } 542 } 543 544 /// Temporarily IP-ban and disconnect from the peer with the given listener address and an 545 /// optional reason for the ban. This also removes the peer from the candidate pool. 546 fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) { 547 // Ignore IP-banning if we are in dev mode. 548 if self.is_dev() { 549 return; 550 } 551 552 let ip = listener_addr.ip(); 553 debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default()); 554 555 // Insert/update the low-level IP ban list. 556 self.tcp().banned_peers().update_ip_ban(ip); 557 558 // Disconnect from the peer. 559 self.disconnect(listener_addr); 560 // Remove the peer from the pool. 561 self.remove_peer(listener_addr); 562 } 563 564 /// Check whether the given IP address is currently banned. 565 fn is_ip_banned(&self, ip: IpAddr) -> bool { 566 self.tcp().banned_peers().is_ip_banned(&ip) 567 } 568 569 /// Insert or update a banned IP. 570 fn update_ip_ban(&self, ip: IpAddr) { 571 self.tcp().banned_peers().update_ip_ban(ip); 572 } 573 }