gateway.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 #[cfg(feature = "telemetry")] 20 use crate::helpers::Telemetry; 21 use crate::{ 22 events::{Disconnect as DisconnectEvent, DisconnectReason, EventCodec, PrimaryPing}, 23 helpers::{assign_to_worker, Cache, PrimarySender, Storage, SyncSender, WorkerSender}, 24 spawn_blocking, 25 Worker, 26 CONTEXT, 27 MAX_BATCH_DELAY_IN_MS, 28 MEMORY_POOL_PORT, 29 }; 30 use alphaos_account::Account; 31 use alphaos_node_bft_events::{ 32 BlockRequest, 33 BlockResponse, 34 CertificateRequest, 35 CertificateResponse, 36 ChallengeRequest, 37 ChallengeResponse, 38 DataBlocks, 39 Event, 40 EventTrait, 41 TransmissionRequest, 42 TransmissionResponse, 43 ValidatorsRequest, 44 ValidatorsResponse, 45 }; 46 use alphaos_node_bft_ledger_service::LedgerService; 47 use alphaos_node_network::{ 48 bootstrap_peers, 49 get_repo_commit_hash, 50 log_repo_sha_comparison, 51 ConnectionMode, 52 NodeType, 53 Peer, 54 PeerPoolHandling, 55 Resolver, 56 }; 57 use alphaos_node_sync::{communication_service::CommunicationService, MAX_BLOCKS_BEHIND}; 58 use alphaos_node_tcp::{ 59 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, 60 Config, 61 Connection, 62 ConnectionSide, 63 Tcp, 64 P2P, 65 }; 66 use alphastd::StorageMode; 67 use alphavm::{ 68 console::prelude::*, 69 ledger::{ 70 committee::Committee, 71 narwhal::{BatchHeader, Data}, 72 }, 73 prelude::{Address, Field}, 74 }; 75 76 use colored::Colorize; 77 use futures::SinkExt; 78 use indexmap::IndexMap; 79 #[cfg(feature = "locktick")] 80 use locktick::parking_lot::{Mutex, RwLock}; 81 #[cfg(not(feature = "locktick"))] 82 use parking_lot::{Mutex, RwLock}; 83 use rand::{ 84 rngs::OsRng, 85 seq::{IteratorRandom, SliceRandom}, 86 }; 87 use std::{ 88 collections::{HashMap, HashSet}, 89 future::Future, 90 io, 91 net::{Ipv4Addr, SocketAddr, SocketAddrV4}, 92 sync::Arc, 93 time::{Duration, Instant}, 94 }; 95 use tokio::{ 96 net::TcpStream, 97 sync::{oneshot, OnceCell}, 98 task::{self, JoinHandle}, 99 }; 100 use tokio_stream::StreamExt; 101 use tokio_util::codec::Framed; 102 103 /// The maximum interval of events to cache. 104 const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds 105 /// The maximum interval of requests to cache. 106 const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds 107 108 /// The maximum number of connection attempts in an interval. 109 const MAX_CONNECTION_ATTEMPTS: usize = 10; 110 /// The maximum interval to restrict a peer. 111 const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds 112 113 /// The maximum number of validators to send in a validators response event. 114 pub const MAX_VALIDATORS_TO_SEND: usize = 200; 115 116 /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. 117 #[cfg(not(any(test)))] 118 const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; 119 /// The amount of time an IP address is prohibited from connecting. 120 const IP_BAN_TIME_IN_SECS: u64 = 300; 121 122 /// The name of the file containing cached validators. 123 const VALIDATOR_CACHE_FILENAME: &str = "cached_gateway_peers"; 124 125 /// The name of the file containing the dynamic validator whitelist. 126 const VALIDATOR_WHITELIST_FILENAME: &str = "dynamic_validator_whitelist"; 127 128 /// Part of the Gateway API that deals with networking. 129 /// This is a separate trait to allow for easier testing/mocking. 130 #[async_trait] 131 pub trait Transport<N: Network>: Send + Sync { 132 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>; 133 fn broadcast(&self, event: Event<N>); 134 } 135 136 /// The gateway maintains connections to other validators. 137 /// For connections with clients and provers, the Router logic is used. 138 #[derive(Clone)] 139 pub struct Gateway<N: Network>(Arc<InnerGateway<N>>); 140 141 impl<N: Network> Deref for Gateway<N> { 142 type Target = Arc<InnerGateway<N>>; 143 144 fn deref(&self) -> &Self::Target { 145 &self.0 146 } 147 } 148 149 pub struct InnerGateway<N: Network> { 150 /// The account of the node. 151 account: Account<N>, 152 /// The storage. 153 storage: Storage<N>, 154 /// The ledger service. 155 ledger: Arc<dyn LedgerService<N>>, 156 /// The TCP stack. 157 tcp: Tcp, 158 /// The cache. 159 cache: Cache<N>, 160 /// The resolver. 161 resolver: RwLock<Resolver<N>>, 162 /// The collection of both candidate and connected peers. 163 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>, 164 #[cfg(feature = "telemetry")] 165 validator_telemetry: Telemetry<N>, 166 /// The primary sender. 167 primary_sender: OnceCell<PrimarySender<N>>, 168 /// The worker senders. 169 worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>, 170 /// The sync sender. 171 sync_sender: OnceCell<SyncSender<N>>, 172 /// The spawned handles. 173 handles: Mutex<Vec<JoinHandle<()>>>, 174 /// The storage mode. 175 storage_mode: StorageMode, 176 /// If the flag is set, the node will only connect to trusted peers. 177 trusted_peers_only: bool, 178 /// The development mode. 179 dev: Option<u16>, 180 } 181 182 impl<N: Network> PeerPoolHandling<N> for Gateway<N> { 183 const MAXIMUM_POOL_SIZE: usize = 200; 184 const OWNER: &str = CONTEXT; 185 const PEER_SLASHING_COUNT: usize = 20; 186 187 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> { 188 &self.peer_pool 189 } 190 191 fn resolver(&self) -> &RwLock<Resolver<N>> { 192 &self.resolver 193 } 194 195 fn is_dev(&self) -> bool { 196 self.dev.is_some() 197 } 198 199 fn trusted_peers_only(&self) -> bool { 200 self.trusted_peers_only 201 } 202 203 fn node_type(&self) -> NodeType { 204 NodeType::Validator 205 } 206 } 207 208 impl<N: Network> Gateway<N> { 209 /// Initializes a new gateway. 210 #[allow(clippy::too_many_arguments)] 211 pub fn new( 212 account: Account<N>, 213 storage: Storage<N>, 214 ledger: Arc<dyn LedgerService<N>>, 215 ip: Option<SocketAddr>, 216 trusted_validators: &[SocketAddr], 217 trusted_peers_only: bool, 218 storage_mode: StorageMode, 219 dev: Option<u16>, 220 ) -> Result<Self> { 221 // Initialize the gateway IP. 222 let ip = match (ip, dev) { 223 (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)), 224 (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)), 225 (Some(ip), _) => ip, 226 }; 227 // Initialize the TCP stack. 228 let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?)); 229 230 // Prepare the collection of the initial peers. 231 let mut initial_peers = HashMap::new(); 232 233 // Load entries from the validator cache (if present and if we are not in trusted peers only mode). 234 if !trusted_peers_only { 235 let cached_peers = Self::load_cached_peers(&storage_mode, VALIDATOR_CACHE_FILENAME)?; 236 for addr in cached_peers { 237 initial_peers.insert(addr, Peer::new_candidate(addr, false)); 238 } 239 } 240 241 // Add the trusted peers to the list of the initial peers; this may promote 242 // some of the cached validators to trusted ones. 243 initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true)))); 244 245 // Return the gateway. 246 Ok(Self(Arc::new(InnerGateway { 247 account, 248 storage, 249 ledger, 250 tcp, 251 cache: Default::default(), 252 resolver: Default::default(), 253 peer_pool: RwLock::new(initial_peers), 254 #[cfg(feature = "telemetry")] 255 validator_telemetry: Default::default(), 256 primary_sender: Default::default(), 257 worker_senders: Default::default(), 258 sync_sender: Default::default(), 259 handles: Default::default(), 260 storage_mode, 261 trusted_peers_only, 262 dev, 263 }))) 264 } 265 266 /// Run the gateway. 267 pub async fn run( 268 &self, 269 primary_sender: PrimarySender<N>, 270 worker_senders: IndexMap<u8, WorkerSender<N>>, 271 sync_sender: Option<SyncSender<N>>, 272 ) { 273 debug!("Starting the gateway for the memory pool..."); 274 275 // Set the primary sender. 276 self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway"); 277 278 // Set the worker senders. 279 self.worker_senders.set(worker_senders).expect("The worker senders are already set"); 280 281 // If the sync sender was provided, set the sync sender. 282 if let Some(sync_sender) = sync_sender { 283 self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway"); 284 } 285 286 // Enable the TCP protocols. 287 self.enable_handshake().await; 288 self.enable_reading().await; 289 self.enable_writing().await; 290 self.enable_disconnect().await; 291 self.enable_on_connect().await; 292 293 // Enable the TCP listener. Note: This must be called after the above protocols. 294 let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener"); 295 debug!("Listening for validator connections at address {listen_addr:?}"); 296 297 // Initialize the heartbeat. 298 self.initialize_heartbeat(); 299 300 info!("Started the gateway for the memory pool at '{}'", self.local_ip()); 301 } 302 } 303 304 // Dynamic rate limiting. 305 impl<N: Network> Gateway<N> { 306 /// The current maximum committee size. 307 fn max_committee_size(&self) -> usize { 308 self.ledger.current_committee().map_or_else( 309 |_e| Committee::<N>::max_committee_size().unwrap() as usize, 310 |committee| committee.num_members(), 311 ) 312 } 313 314 /// The maximum number of events to cache. 315 fn max_cache_events(&self) -> usize { 316 self.max_cache_transmissions() 317 } 318 319 /// The maximum number of certificate requests to cache. 320 fn max_cache_certificates(&self) -> usize { 321 2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size() 322 } 323 324 /// The maximum number of transmission requests to cache. 325 fn max_cache_transmissions(&self) -> usize { 326 self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH 327 } 328 329 /// The maximum number of duplicates for any particular request. 330 fn max_cache_duplicates(&self) -> usize { 331 self.max_committee_size().pow(2) 332 } 333 } 334 335 #[async_trait] 336 impl<N: Network> CommunicationService for Gateway<N> { 337 /// The message type. 338 type Message = Event<N>; 339 340 /// Prepares a block request to be sent. 341 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message { 342 debug_assert!(start_height < end_height, "Invalid block request format"); 343 Event::BlockRequest(BlockRequest { start_height, end_height }) 344 } 345 346 /// Sends the given message to specified peer. 347 /// 348 /// This function returns as soon as the message is queued to be sent, 349 /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] 350 /// which can be used to determine when and whether the message has been delivered. 351 async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> { 352 Transport::send(self, peer_ip, message).await 353 } 354 } 355 356 impl<N: Network> Gateway<N> { 357 /// Returns the account of the node. 358 pub fn account(&self) -> &Account<N> { 359 &self.account 360 } 361 362 /// Returns the dev identifier of the node. 363 pub fn dev(&self) -> Option<u16> { 364 self.dev 365 } 366 367 /// Returns the resolver. 368 pub fn resolver(&self) -> &RwLock<Resolver<N>> { 369 &self.resolver 370 } 371 372 /// Returns the listener IP address from the (ambiguous) peer address. 373 pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> { 374 self.resolver.read().get_listener(*connected_addr) 375 } 376 377 /// Returns the validator telemetry. 378 #[cfg(feature = "telemetry")] 379 pub fn validator_telemetry(&self) -> &Telemetry<N> { 380 &self.validator_telemetry 381 } 382 383 /// Returns the primary sender. 384 pub fn primary_sender(&self) -> &PrimarySender<N> { 385 self.primary_sender.get().expect("Primary sender not set in gateway") 386 } 387 388 /// Returns the number of workers. 389 pub fn num_workers(&self) -> u8 { 390 u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len()) 391 .expect("Too many workers") 392 } 393 394 /// Returns the worker sender for the given worker ID. 395 pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> { 396 self.worker_senders.get().and_then(|senders| senders.get(&worker_id)) 397 } 398 399 /// Returns `true` if the given peer IP is an authorized validator. 400 pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool { 401 // If the peer IP is in the trusted validators, return early. 402 if self.trusted_peers().contains(&ip) { 403 return true; 404 } 405 // Retrieve the Alpha address of the peer IP. 406 match self.resolve_to_alpha_addr(ip) { 407 // Determine if the peer IP is an authorized validator. 408 Some(address) => self.is_authorized_validator_address(address), 409 None => false, 410 } 411 } 412 413 /// Returns `true` if the given address is an authorized validator. 414 pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool { 415 // Determine if the validator address is a member of the committee lookback, 416 // the current committee, or the previous committee lookbacks. 417 // We allow leniency in this validation check in order to accommodate these two scenarios: 418 // 1. New validators should be able to connect immediately once bonded as a committee member. 419 // 2. Existing validators must remain connected until they are no longer bonded as a committee member. 420 // (i.e. meaning they must stay online until the next block has been produced) 421 422 // Determine if the validator is in the current committee with lookback. 423 if self 424 .ledger 425 .get_committee_lookback_for_round(self.storage.current_round()) 426 .is_ok_and(|committee| committee.is_committee_member(validator_address)) 427 { 428 return true; 429 } 430 431 // Determine if the validator is in the latest committee on the ledger. 432 if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) { 433 return true; 434 } 435 436 // Retrieve the previous block height to consider from the sync tolerance. 437 let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND); 438 // Determine if the validator is in any of the previous committee lookbacks. 439 match self.ledger.get_block_round(previous_block_height) { 440 Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| { 441 self.ledger 442 .get_committee_lookback_for_round(round) 443 .is_ok_and(|committee| committee.is_committee_member(validator_address)) 444 }), 445 Err(_) => false, 446 } 447 } 448 449 /// Returns the list of connected addresses. 450 pub fn connected_addresses(&self) -> HashSet<Address<N>> { 451 self.get_connected_peers().into_iter().map(|peer| peer.alpha_addr).collect() 452 } 453 454 /// Ensure the peer is allowed to connect. 455 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> { 456 // Ensure the peer IP is not this node. 457 if self.is_local_ip(listener_addr) { 458 bail!("{CONTEXT} Dropping connection request from '{listener_addr}' (attempted to self-connect)"); 459 } 460 // Ensure the peer is not spamming connection attempts. 461 if !listener_addr.ip().is_loopback() { 462 // Add this connection attempt and retrieve the number of attempts. 463 let num_attempts = self.cache.insert_inbound_connection(listener_addr.ip(), RESTRICTED_INTERVAL); 464 // Ensure the connecting peer has not surpassed the connection attempt limit. 465 if num_attempts > MAX_CONNECTION_ATTEMPTS { 466 bail!("Dropping connection request from '{listener_addr}' (tried {num_attempts} times)"); 467 } 468 } 469 Ok(()) 470 } 471 472 #[cfg(feature = "metrics")] 473 fn update_metrics(&self) { 474 metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64); 475 metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64); 476 } 477 478 /// Inserts the given peer into the connected peers. This is only used in testing. 479 #[cfg(test)] 480 pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) { 481 // Adds a bidirectional map between the listener address and (ambiguous) peer address. 482 self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address)); 483 // Add a transmission for this peer in the connected peers. 484 self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false)); 485 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { 486 peer.upgrade_to_connected( 487 peer_addr, 488 peer_ip.port(), 489 address, 490 NodeType::Validator, 491 0, 492 ConnectionMode::Gateway, 493 ); 494 } 495 } 496 497 /// Sends the given event to specified peer. 498 /// 499 /// This function returns as soon as the event is queued to be sent, 500 /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] 501 /// which can be used to determine when and whether the event has been delivered. 502 fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> { 503 // Resolve the listener IP to the (ambiguous) peer address. 504 let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else { 505 warn!("Unable to resolve the listener IP address '{peer_ip}'"); 506 return None; 507 }; 508 // Retrieve the event name. 509 let name = event.name(); 510 // Send the event to the peer. 511 trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'"); 512 let result = self.unicast(peer_addr, event); 513 // If the event was unable to be sent, disconnect. 514 if let Err(e) = &result { 515 warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}"); 516 debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)"); 517 self.disconnect(peer_ip); 518 } 519 result.ok() 520 } 521 522 /// Handles the inbound event from the peer. The returned value indicates whether 523 /// the connection is still active, and errors cause a disconnect once they are 524 /// propagated to the caller. 525 async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> { 526 // Retrieve the listener IP for the peer. 527 let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else { 528 // No longer connected to the peer. 529 trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name()); 530 return Ok(false); 531 }; 532 // Ensure that the peer is an authorized committee member or a bootstrapper. 533 if !(self.is_authorized_validator_ip(peer_ip) 534 || self 535 .get_connected_peer(peer_ip) 536 .map(|peer| peer.node_type == NodeType::BootstrapClient) 537 .unwrap_or(false)) 538 { 539 bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name()) 540 } 541 // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us). 542 let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL); 543 if num_events >= self.max_cache_events() { 544 bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})") 545 } 546 // Rate limit for duplicate requests. 547 match event { 548 Event::CertificateRequest(_) | Event::CertificateResponse(_) => { 549 // Retrieve the certificate ID. 550 let certificate_id = match &event { 551 Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id, 552 Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(), 553 _ => unreachable!(), 554 }; 555 // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate). 556 let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL); 557 if num_events >= self.max_cache_duplicates() { 558 return Ok(true); 559 } 560 } 561 Event::TransmissionRequest(TransmissionRequest { transmission_id }) 562 | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => { 563 // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate). 564 let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL); 565 if num_events >= self.max_cache_duplicates() { 566 return Ok(true); 567 } 568 } 569 Event::BlockRequest(_) => { 570 let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL); 571 if num_events >= self.max_cache_duplicates() { 572 return Ok(true); 573 } 574 } 575 _ => {} 576 } 577 trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name()); 578 579 // This match statement handles the inbound event by deserializing the event, 580 // checking the event is valid, and then calling the appropriate (trait) handler. 581 match event { 582 Event::BatchPropose(batch_propose) => { 583 // Send the batch propose to the primary. 584 let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await; 585 Ok(true) 586 } 587 Event::BatchSignature(batch_signature) => { 588 // Send the batch signature to the primary. 589 let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await; 590 Ok(true) 591 } 592 Event::BatchCertified(batch_certified) => { 593 // Send the batch certificate to the primary. 594 let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await; 595 Ok(true) 596 } 597 Event::BlockRequest(block_request) => { 598 let BlockRequest { start_height, end_height } = block_request; 599 600 // Ensure the block request is well-formed. 601 if start_height >= end_height { 602 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})") 603 } 604 // Ensure that the block request is within the allowed bounds. 605 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 { 606 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})") 607 } 608 609 // End height is exclusive. 610 let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?; 611 612 let self_ = self.clone(); 613 let blocks = match task::spawn_blocking(move || { 614 // Retrieve the blocks within the requested range. 615 match self_.ledger.get_blocks(start_height..end_height) { 616 Ok(blocks) => Ok(DataBlocks(blocks)), 617 Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"), 618 } 619 }) 620 .await 621 { 622 Ok(Ok(blocks)) => blocks, 623 Ok(Err(error)) => return Err(error), 624 Err(error) => return Err(anyhow!("[BlockRequest] {error}")), 625 }; 626 627 let self_ = self.clone(); 628 tokio::spawn(async move { 629 // Send the `BlockResponse` message to the peer. 630 let event = 631 Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version)); 632 Transport::send(&self_, peer_ip, event).await; 633 }); 634 Ok(true) 635 } 636 Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => { 637 // Process the block response. Except for some tests, there is always a sync sender. 638 if let Some(sync_sender) = self.sync_sender.get() { 639 // Check the response corresponds to a request. 640 if !self.cache.remove_outbound_block_request(peer_ip, &request) { 641 bail!("Unsolicited block response from '{peer_ip}'") 642 } 643 644 // Perform the deferred non-blocking deserialization of the blocks. 645 // The deserialization can take a long time (minutes). We should not be running 646 // this on a blocking task, but on a rayon thread pool. 647 let (send, recv) = tokio::sync::oneshot::channel(); 648 rayon::spawn_fifo(move || { 649 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}")); 650 let _ = send.send(blocks); 651 }); 652 let blocks = match recv.await { 653 Ok(Ok(blocks)) => blocks, 654 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"), 655 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"), 656 }; 657 658 // Ensure the block response is well-formed. 659 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?; 660 // Send the blocks to the sync module. 661 if let Err(err) = 662 sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await 663 { 664 warn!("Unable to process block response from '{peer_ip}' - {err}"); 665 } 666 } 667 Ok(true) 668 } 669 Event::CertificateRequest(certificate_request) => { 670 // Send the certificate request to the sync module. 671 // Except for some tests, there is always a sync sender. 672 if let Some(sync_sender) = self.sync_sender.get() { 673 // Send the certificate request to the sync module. 674 let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await; 675 } 676 Ok(true) 677 } 678 Event::CertificateResponse(certificate_response) => { 679 // Send the certificate response to the sync module. 680 // Except for some tests, there is always a sync sender. 681 if let Some(sync_sender) = self.sync_sender.get() { 682 // Send the certificate response to the sync module. 683 let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await; 684 } 685 Ok(true) 686 } 687 Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => { 688 // Disconnect as the peer is not following the protocol. 689 bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol") 690 } 691 Event::Disconnect(message) => { 692 // The peer informs us that they had disconnected. Disconnect from them too. 693 debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason); 694 self.disconnect(peer_ip); 695 Ok(false) 696 } 697 Event::PrimaryPing(ping) => { 698 let PrimaryPing { version, block_locators, primary_certificate } = ping; 699 700 // Ensure the event version is not outdated. 701 if version < Event::<N>::VERSION { 702 bail!("Dropping '{peer_ip}' on event version {version} (outdated)"); 703 } 704 705 // Log the validator's height. 706 debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height()); 707 708 // Update the peer locators. Except for some tests, there is always a sync sender. 709 if let Some(sync_sender) = self.sync_sender.get() { 710 // Check the block locators are valid, and update the validators in the sync module. 711 if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await { 712 bail!("Validator '{peer_ip}' sent invalid block locators - {error}"); 713 } 714 } 715 716 // Send the batch certificates to the primary. 717 let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await; 718 Ok(true) 719 } 720 Event::TransmissionRequest(request) => { 721 // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis. 722 // Determine the worker ID. 723 let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else { 724 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id); 725 return Ok(true); 726 }; 727 // Send the transmission request to the worker. 728 if let Some(sender) = self.get_worker_sender(worker_id) { 729 // Send the transmission request to the worker. 730 let _ = sender.tx_transmission_request.send((peer_ip, request)).await; 731 } 732 Ok(true) 733 } 734 Event::TransmissionResponse(response) => { 735 // Determine the worker ID. 736 let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else { 737 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id); 738 return Ok(true); 739 }; 740 // Send the transmission response to the worker. 741 if let Some(sender) = self.get_worker_sender(worker_id) { 742 // Send the transmission response to the worker. 743 let _ = sender.tx_transmission_response.send((peer_ip, response)).await; 744 } 745 Ok(true) 746 } 747 Event::ValidatorsRequest(_) => { 748 let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND)); 749 connected_peers.shuffle(&mut rand::thread_rng()); 750 751 let self_ = self.clone(); 752 tokio::spawn(async move { 753 // Initialize the validators. 754 let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND); 755 // Iterate over the validators. 756 for validator in connected_peers.into_iter() { 757 // Add the validator to the list of validators. 758 validators.insert(validator.listener_addr, validator.alpha_addr); 759 } 760 // Send the validators response to the peer. 761 let event = Event::ValidatorsResponse(ValidatorsResponse { validators }); 762 Transport::send(&self_, peer_ip, event).await; 763 }); 764 Ok(true) 765 } 766 Event::ValidatorsResponse(response) => { 767 if self.trusted_peers_only { 768 bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)"); 769 } 770 let ValidatorsResponse { validators } = response; 771 // Ensure the number of validators is not too large. 772 ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators"); 773 // Ensure the cache contains a validators request for this peer. 774 if !self.cache.contains_outbound_validators_request(peer_ip) { 775 bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request") 776 } 777 // Decrement the number of validators requests for this peer. 778 self.cache.decrement_outbound_validators_requests(peer_ip); 779 780 // Add valid validators as candidates to the peer pool; only validator-related 781 // filters need to be applied, the rest is handled by `PeerPoolHandling`. 782 let valid_addrs = validators 783 .into_iter() 784 .filter_map(|(listener_addr, alpha_addr)| { 785 (self.account.address() != alpha_addr 786 && !self.is_connected_address(alpha_addr) 787 && self.is_authorized_validator_address(alpha_addr)) 788 .then_some((listener_addr, None)) 789 }) 790 .collect::<Vec<_>>(); 791 if !valid_addrs.is_empty() { 792 self.insert_candidate_peers(valid_addrs); 793 } 794 795 #[cfg(feature = "metrics")] 796 self.update_metrics(); 797 798 Ok(true) 799 } 800 Event::WorkerPing(ping) => { 801 // Ensure the number of transmissions is not too large. 802 ensure!( 803 ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING, 804 "{CONTEXT} Received too many transmissions" 805 ); 806 // Retrieve the number of workers. 807 let num_workers = self.num_workers(); 808 // Iterate over the transmission IDs. 809 for transmission_id in ping.transmission_ids.into_iter() { 810 // Determine the worker ID. 811 let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else { 812 warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker"); 813 continue; 814 }; 815 // Send the transmission ID to the worker. 816 if let Some(sender) = self.get_worker_sender(worker_id) { 817 // Send the transmission ID to the worker. 818 let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await; 819 } 820 } 821 Ok(true) 822 } 823 } 824 } 825 826 /// Initialize a new instance of the heartbeat. 827 fn initialize_heartbeat(&self) { 828 let self_clone = self.clone(); 829 self.spawn(async move { 830 let start = Instant::now(); 831 // Sleep briefly to ensure the other nodes are ready to connect. 832 tokio::time::sleep(std::time::Duration::from_millis(1000)).await; 833 info!("Starting the heartbeat of the gateway..."); 834 loop { 835 // Process a heartbeat in the gateway. 836 let uptime = start.elapsed(); 837 self_clone.heartbeat(uptime).await; 838 // Sleep for the heartbeat interval. 839 tokio::time::sleep(Duration::from_secs(15)).await; 840 } 841 }); 842 } 843 844 /// Spawns a task with the given future; it should only be used for long-running tasks. 845 #[allow(dead_code)] 846 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 847 self.handles.lock().push(tokio::spawn(future)); 848 } 849 850 /// Shuts down the gateway. 851 pub async fn shut_down(&self) { 852 info!("Shutting down the gateway..."); 853 // Save the best peers for future use. 854 if let Err(e) = self.save_best_peers(&self.storage_mode, VALIDATOR_CACHE_FILENAME, None, true) { 855 warn!("Failed to persist best validators to disk: {e}"); 856 } 857 // Abort the tasks. 858 self.handles.lock().iter().for_each(|handle| handle.abort()); 859 // Close the listener. 860 self.tcp.shut_down().await; 861 } 862 } 863 864 impl<N: Network> Gateway<N> { 865 /// The uptime after which nodes log a warning about missing validator connections. 866 const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60); 867 868 /// Handles the heartbeat request. 869 async fn heartbeat(&self, uptime: Duration) { 870 // Log the connected validators. 871 self.log_connected_validators(uptime); 872 // Log the validator participation scores. 873 #[cfg(feature = "telemetry")] 874 self.log_participation_scores(); 875 // Keep the trusted validators connected. 876 self.handle_trusted_validators(); 877 // Keep the bootstrap peers within the allowed range. 878 self.handle_bootstrap_peers().await; 879 // Removes any validators that not in the current committee. 880 self.handle_unauthorized_validators(); 881 // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`. 882 self.handle_min_connected_validators(); 883 // Unban any addresses whose ban time has expired. 884 self.handle_banned_ips(); 885 // Update the dynamic validator whitelist. 886 self.update_validator_whitelist(); 887 } 888 889 /// Logs the connected validators. 890 fn log_connected_validators(&self, uptime: Duration) { 891 // Retrieve the connected validators and current committee. 892 let connected_validators = self.connected_peers(); 893 let committee = match self.ledger.current_committee() { 894 Ok(c) => c, 895 Err(err) => { 896 error!("Failed to get current committee: {err}"); 897 return; 898 } 899 }; 900 901 // Resolve the total number of connectable validators. 902 let validators_total = committee.num_members().saturating_sub(1); 903 // Format the total validators message. 904 let total_validators = format!("(of {validators_total} bonded validators)").dimmed(); 905 // Construct the connections message. 906 let connections_msg = match connected_validators.len() { 907 0 => "No connected validators".to_string(), 908 num_connected => format!("Connected to {num_connected} validators {total_validators}"), 909 }; 910 info!("{connections_msg}"); 911 912 // Collect the connected validator addresses and stake. 913 let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len()); 914 // Include our own address. 915 connected_validator_addresses.insert(self.account.address()); 916 // Include and log the connected validators. 917 for peer_ip in &connected_validators { 918 let address = self.resolve_to_alpha_addr(*peer_ip).map_or("Unknown".to_string(), |a| { 919 connected_validator_addresses.insert(a); 920 a.to_string() 921 }); 922 debug!("{}", format!(" Connected to: {peer_ip} - {address}").dimmed()); 923 } 924 925 // Log the validators that are not connected. 926 let num_not_connected = validators_total.saturating_sub(connected_validators.len()); 927 if num_not_connected > 0 { 928 // Cache the total stake for computing percentages. 929 let total_stake = committee.total_stake(); 930 let total_stake_f64 = total_stake as f64; 931 932 // Collect the committee members. 933 let committee_members: HashSet<_> = 934 self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default(); 935 936 let not_connected_stake: u64 = committee_members 937 .difference(&connected_validator_addresses) 938 .map(|address| { 939 let address_stake = committee.get_stake(*address); 940 let address_stake_as_percentage = 941 if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 }; 942 debug!( 943 "{}", 944 format!(" Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)") 945 .dimmed() 946 ); 947 address_stake 948 }) 949 .sum(); 950 951 let not_connected_stake_as_percentage = 952 if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 }; 953 warn!( 954 "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)" 955 ); 956 #[cfg(feature = "metrics")] 957 { 958 let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage; 959 metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage); 960 } 961 } else { 962 #[cfg(feature = "metrics")] 963 metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0); 964 }; 965 966 if !committee.is_quorum_threshold_reached(&connected_validator_addresses) { 967 // Not being connected to a quorum of validators is begning during startup. 968 if uptime > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD { 969 error!("Not connected to a quorum of validators"); 970 } else { 971 debug!("Not connected to a quorum of validators"); 972 } 973 } 974 } 975 976 // Logs the validator participation scores. 977 #[cfg(feature = "telemetry")] 978 fn log_participation_scores(&self) { 979 if let Ok(current_committee) = self.ledger.current_committee() { 980 // Retrieve the participation scores. 981 let participation_scores = self.validator_telemetry().get_participation_scores(¤t_committee); 982 // Log the participation scores. 983 debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds()); 984 for (address, score) in participation_scores { 985 debug!("{}", format!(" {address} - {score:.2}%").dimmed()); 986 } 987 } 988 } 989 990 /// This function attempts to connect to any disconnected trusted validators. 991 fn handle_trusted_validators(&self) { 992 // Ensure that the trusted nodes are connected. 993 for validator_ip in &self.trusted_peers() { 994 // Attempt to connect to the trusted validator. 995 self.connect(*validator_ip); 996 } 997 } 998 999 /// This function keeps the number of bootstrap peers within the allowed range. 1000 async fn handle_bootstrap_peers(&self) { 1001 // Return early if we are in trusted peers only mode. 1002 if self.trusted_peers_only { 1003 return; 1004 } 1005 // Split the bootstrap peers into connected and candidate lists. 1006 let mut candidate_bootstrap = Vec::new(); 1007 let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient); 1008 for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) { 1009 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) { 1010 candidate_bootstrap.push(bootstrap_ip); 1011 } 1012 } 1013 // If there are not enough connected bootstrap peers, connect to more. 1014 if connected_bootstrap.is_empty() { 1015 // Initialize an RNG. 1016 let rng = &mut OsRng; 1017 // Attempt to connect to a bootstrap peer. 1018 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) { 1019 match self.connect(peer_ip) { 1020 Some(hdl) => { 1021 let result = hdl.await; 1022 if let Err(err) = result { 1023 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}"); 1024 } 1025 } 1026 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"), 1027 } 1028 } 1029 } 1030 // Determine if the node is connected to more bootstrap peers than allowed. 1031 let num_surplus = connected_bootstrap.len().saturating_sub(1); 1032 if num_surplus > 0 { 1033 // Initialize an RNG. 1034 let rng = &mut OsRng; 1035 // Proceed to send disconnect requests to these bootstrap peers. 1036 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) { 1037 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr); 1038 <Self as Transport<N>>::send( 1039 self, 1040 peer.listener_addr, 1041 Event::Disconnect(DisconnectReason::NoReasonGiven.into()), 1042 ) 1043 .await; 1044 // Disconnect from this peer. 1045 self.disconnect(peer.listener_addr); 1046 } 1047 } 1048 } 1049 1050 /// This function attempts to disconnect any validators that are not in the current committee. 1051 fn handle_unauthorized_validators(&self) { 1052 let self_ = self.clone(); 1053 tokio::spawn(async move { 1054 // Retrieve the connected validators. 1055 let validators = self_.get_connected_peers(); 1056 // Iterate over the validator IPs. 1057 for peer in validators { 1058 // Skip bootstrapper peers. 1059 if peer.node_type == NodeType::BootstrapClient { 1060 continue; 1061 } 1062 // Disconnect any validator that is not in the current committee. 1063 if !self_.is_authorized_validator_ip(peer.listener_addr) { 1064 warn!( 1065 "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee", 1066 peer.listener_addr 1067 ); 1068 Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await; 1069 // Disconnect from this peer. 1070 self_.disconnect(peer.listener_addr); 1071 } 1072 } 1073 }); 1074 } 1075 1076 /// This function sends a `ValidatorsRequest` to a random validator, 1077 /// if the number of connected validators is less than the minimum. 1078 /// It also attempts to connect to known unconnected validators. 1079 fn handle_min_connected_validators(&self) { 1080 // Attempt to connect to untrusted validators we're not connected to yet. 1081 // The trusted ones are already handled by `handle_trusted_validators`. 1082 let trusted_validators = self.trusted_peers(); 1083 if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize { 1084 for peer in self.get_candidate_peers() { 1085 if !trusted_validators.contains(&peer.listener_addr) { 1086 // Attempt to connect to unconnected validators. 1087 self.connect(peer.listener_addr); 1088 } 1089 } 1090 1091 // Retrieve the connected validators. 1092 let validators = self.connected_peers(); 1093 // If there are no validator IPs to connect to, return early. 1094 if validators.is_empty() { 1095 return; 1096 } 1097 // Select a random validator IP. 1098 if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) { 1099 let self_ = self.clone(); 1100 tokio::spawn(async move { 1101 // Increment the number of outbound validators requests for this validator. 1102 self_.cache.increment_outbound_validators_requests(validator_ip); 1103 // Send a `ValidatorsRequest` to the validator. 1104 let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await; 1105 }); 1106 } 1107 } 1108 } 1109 1110 /// Processes a message received from the network. 1111 async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) { 1112 // Process the message. Disconnect if the peer violated the protocol. 1113 if let Err(error) = self.inbound(peer_addr, message).await { 1114 if let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) { 1115 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}"); 1116 let self_ = self.clone(); 1117 tokio::spawn(async move { 1118 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await; 1119 // Disconnect from this peer. 1120 self_.disconnect(peer_ip); 1121 }); 1122 } 1123 } 1124 } 1125 1126 // Remove addresses whose ban time has expired. 1127 fn handle_banned_ips(&self) { 1128 self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS); 1129 } 1130 1131 // Update the dynamic validator whitelist. 1132 fn update_validator_whitelist(&self) { 1133 if let Err(e) = 1134 self.save_best_peers(&self.storage_mode, VALIDATOR_WHITELIST_FILENAME, Some(MAX_VALIDATORS_TO_SEND), false) 1135 { 1136 warn!("Couldn't update the validator whitelist: {e}"); 1137 } 1138 } 1139 } 1140 1141 #[async_trait] 1142 impl<N: Network> Transport<N> for Gateway<N> { 1143 /// Sends the given event to specified peer. 1144 /// 1145 /// This method is rate limited to prevent spamming the peer. 1146 /// 1147 /// This function returns as soon as the event is queued to be sent, 1148 /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] 1149 /// which can be used to determine when and whether the event has been delivered. 1150 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> { 1151 macro_rules! send { 1152 ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{ 1153 // Rate limit the number of certificate requests sent to the peer. 1154 while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() { 1155 // Sleep for a short period of time to allow the cache to clear. 1156 tokio::time::sleep(Duration::from_millis(10)).await; 1157 } 1158 // Send the event to the peer. 1159 $self.send_inner(peer_ip, event) 1160 }}; 1161 } 1162 1163 // Increment the cache for certificate, transmission and block events. 1164 match event { 1165 Event::CertificateRequest(_) | Event::CertificateResponse(_) => { 1166 // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events. 1167 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL); 1168 // Send the event to the peer. 1169 send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates) 1170 } 1171 Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => { 1172 // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events. 1173 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL); 1174 // Send the event to the peer. 1175 send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions) 1176 } 1177 Event::BlockRequest(request) => { 1178 // Insert the outbound request so we can match it to responses. 1179 self.cache.insert_outbound_block_request(peer_ip, request); 1180 // Send the event to the peer and update the outbound event cache, use the general rate limit. 1181 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events) 1182 } 1183 _ => { 1184 // Send the event to the peer, use the general rate limit. 1185 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events) 1186 } 1187 } 1188 } 1189 1190 /// Broadcasts the given event to all connected peers. 1191 // TODO(ljedrz): the event should be checked for the presence of Data::Object, and 1192 // serialized in advance if it's there. 1193 fn broadcast(&self, event: Event<N>) { 1194 // Ensure there are connected peers. 1195 if self.number_of_connected_peers() > 0 { 1196 let self_ = self.clone(); 1197 let connected_peers = self.connected_peers(); 1198 tokio::spawn(async move { 1199 // Iterate through all connected peers. 1200 for peer_ip in connected_peers { 1201 // Send the event to the peer. 1202 let _ = Transport::send(&self_, peer_ip, event.clone()).await; 1203 } 1204 }); 1205 } 1206 } 1207 } 1208 1209 impl<N: Network> P2P for Gateway<N> { 1210 /// Returns a reference to the TCP instance. 1211 fn tcp(&self) -> &Tcp { 1212 &self.tcp 1213 } 1214 } 1215 1216 #[async_trait] 1217 impl<N: Network> Reading for Gateway<N> { 1218 type Codec = EventCodec<N>; 1219 type Message = Event<N>; 1220 1221 /// Creates a [`Decoder`] used to interpret messages from the network. 1222 /// The `side` param indicates the connection side **from the node's perspective**. 1223 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 1224 Default::default() 1225 } 1226 1227 /// Processes a message received from the network. 1228 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 1229 if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) { 1230 let self_ = self.clone(); 1231 // Handle BlockRequest and BlockResponse messages in a separate task to not block the 1232 // inbound queue. 1233 tokio::spawn(async move { 1234 self_.process_message_inner(peer_addr, message).await; 1235 }); 1236 } else { 1237 self.process_message_inner(peer_addr, message).await; 1238 } 1239 Ok(()) 1240 } 1241 1242 /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment. 1243 /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks. 1244 fn message_queue_depth(&self) -> usize { 1245 2 * BatchHeader::<N>::MAX_GC_ROUNDS 1246 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize 1247 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH 1248 } 1249 } 1250 1251 #[async_trait] 1252 impl<N: Network> Writing for Gateway<N> { 1253 type Codec = EventCodec<N>; 1254 type Message = Event<N>; 1255 1256 /// Creates an [`Encoder`] used to write the outbound messages to the target stream. 1257 /// The `side` parameter indicates the connection side **from the node's perspective**. 1258 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 1259 Default::default() 1260 } 1261 1262 /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment. 1263 /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation 1264 /// (like slow serialization) or network. 1265 fn message_queue_depth(&self) -> usize { 1266 2 * BatchHeader::<N>::MAX_GC_ROUNDS 1267 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize 1268 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH 1269 } 1270 } 1271 1272 #[async_trait] 1273 impl<N: Network> Disconnect for Gateway<N> { 1274 /// Any extra operations to be performed during a disconnect. 1275 async fn handle_disconnect(&self, peer_addr: SocketAddr) { 1276 if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) { 1277 self.downgrade_peer_to_candidate(peer_ip); 1278 // Remove the peer from the sync module. Except for some tests, there is always a sync sender. 1279 if let Some(sync_sender) = self.sync_sender.get() { 1280 let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone(); 1281 tokio::spawn(async move { 1282 if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await { 1283 warn!("Unable to remove '{peer_ip}' from the sync module - {e}"); 1284 } 1285 }); 1286 } 1287 // We don't clear this map based on time but only on peer disconnect. 1288 // This is sufficient to avoid infinite growth as the committee has a fixed number 1289 // of members. 1290 self.cache.clear_outbound_validators_requests(peer_ip); 1291 self.cache.clear_outbound_block_requests(peer_ip); 1292 #[cfg(feature = "metrics")] 1293 self.update_metrics(); 1294 } 1295 } 1296 } 1297 1298 #[async_trait] 1299 impl<N: Network> OnConnect for Gateway<N> { 1300 async fn on_connect(&self, peer_addr: SocketAddr) { 1301 if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) { 1302 if let Some(peer) = self.get_connected_peer(listener_addr) { 1303 if peer.node_type == NodeType::BootstrapClient { 1304 let _ = 1305 <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest)) 1306 .await; 1307 } 1308 } 1309 } 1310 } 1311 } 1312 1313 #[async_trait] 1314 impl<N: Network> Handshake for Gateway<N> { 1315 /// Performs the handshake protocol. 1316 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> { 1317 // Perform the handshake. 1318 let peer_addr = connection.addr(); 1319 let peer_side = connection.side(); 1320 1321 // Check (or impose) IP-level bans. 1322 #[cfg(not(any(test)))] 1323 if self.dev().is_none() && peer_side == ConnectionSide::Initiator { 1324 // If the IP is already banned reject the connection. 1325 if self.is_ip_banned(peer_addr.ip()) { 1326 trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip()); 1327 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); 1328 } 1329 1330 let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS); 1331 1332 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); 1333 if num_attempts > MAX_CONNECTION_ATTEMPTS { 1334 self.update_ip_ban(peer_addr.ip()); 1335 trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip()); 1336 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); 1337 } 1338 } 1339 1340 let stream = self.borrow_stream(&mut connection); 1341 1342 // If this is an inbound connection, we log it, but don't know the listening address yet. 1343 // Otherwise, we can immediately register the listening address. 1344 let mut listener_addr = if peer_side == ConnectionSide::Initiator { 1345 debug!("{CONTEXT} Received a connection request from '{peer_addr}'"); 1346 None 1347 } else { 1348 debug!("{CONTEXT} Shaking hands with {peer_addr}..."); 1349 Some(peer_addr) 1350 }; 1351 1352 // Retrieve the restrictions ID. 1353 let restrictions_id = self.ledger.latest_restrictions_id(); 1354 1355 // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time. 1356 let handshake_result = if peer_side == ConnectionSide::Responder { 1357 self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await 1358 } else { 1359 self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await 1360 }; 1361 1362 if let Some(addr) = listener_addr { 1363 match handshake_result { 1364 Ok(Some(ref cr)) => { 1365 let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) { 1366 NodeType::BootstrapClient 1367 } else { 1368 NodeType::Validator 1369 }; 1370 if let Some(peer) = self.peer_pool.write().get_mut(&addr) { 1371 self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address)); 1372 peer.upgrade_to_connected( 1373 peer_addr, 1374 cr.listener_port, 1375 cr.address, 1376 node_type, 1377 cr.version, 1378 ConnectionMode::Gateway, 1379 ); 1380 } 1381 #[cfg(feature = "metrics")] 1382 self.update_metrics(); 1383 info!("{CONTEXT} Connected to '{addr}'"); 1384 } 1385 Ok(None) => { 1386 return Err(error(format!("Duplicate handshake attempt with '{addr}'"))); 1387 } 1388 Err(error) => { 1389 if let Some(peer) = self.peer_pool.write().get_mut(&addr) { 1390 // The peer may only be downgraded if it's a ConnectingPeer. 1391 if peer.is_connecting() { 1392 peer.downgrade_to_candidate(addr); 1393 } 1394 } 1395 // This error needs to be "repackaged" in order to conform to the return type. 1396 return Err(error); 1397 } 1398 } 1399 } 1400 1401 Ok(connection) 1402 } 1403 } 1404 1405 /// A macro unwrapping the expected handshake event or returning an error for unexpected events. 1406 macro_rules! expect_event { 1407 ($event_ty:path, $framed:expr, $peer_addr:expr) => { 1408 match $framed.try_next().await? { 1409 // Received the expected event, proceed. 1410 Some($event_ty(data)) => { 1411 trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr); 1412 data 1413 } 1414 // Received a disconnect event, abort. 1415 Some(Event::Disconnect(DisconnectEvent { reason })) => { 1416 return Err(error(format!("{CONTEXT} '{}' disconnected: {reason}", $peer_addr))); 1417 } 1418 // Received an unexpected event, abort. 1419 Some(ty) => { 1420 return Err(error(format!( 1421 "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}", 1422 $peer_addr, 1423 ty.name(), 1424 stringify!($event_ty), 1425 ))) 1426 } 1427 // Received nothing. 1428 None => { 1429 return Err(error(format!( 1430 "{CONTEXT} the peer disconnected before sending {:?}, likely due to peer saturation or shutdown", 1431 stringify!($event_ty) 1432 ))) 1433 } 1434 } 1435 }; 1436 } 1437 1438 /// Send the given message to the peer. 1439 async fn send_event<N: Network>( 1440 framed: &mut Framed<&mut TcpStream, EventCodec<N>>, 1441 peer_addr: SocketAddr, 1442 event: Event<N>, 1443 ) -> io::Result<()> { 1444 trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name()); 1445 framed.send(event).await 1446 } 1447 1448 impl<N: Network> Gateway<N> { 1449 /// The connection initiator side of the handshake. 1450 async fn handshake_inner_initiator<'a>( 1451 &'a self, 1452 peer_addr: SocketAddr, 1453 restrictions_id: Field<N>, 1454 stream: &'a mut TcpStream, 1455 ) -> io::Result<Option<ChallengeRequest<N>>> { 1456 // Introduce the peer into the peer pool. 1457 if !self.add_connecting_peer(peer_addr) { 1458 return Ok(None); 1459 } 1460 1461 // Construct the stream. 1462 let mut framed = Framed::new(stream, EventCodec::<N>::handshake()); 1463 1464 // Initialize an RNG. 1465 let rng = &mut rand::rngs::OsRng; 1466 1467 /* Step 1: Send the challenge request. */ 1468 1469 // Sample a random nonce. 1470 let our_nonce = rng.r#gen(); 1471 // Determine the alphaos SHA to send to the peer. 1472 let current_block_height = self.ledger.latest_block_height(); 1473 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap(); 1474 let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) { 1475 (true, Some(sha)) => Some(sha), 1476 _ => None, 1477 }; 1478 // Send a challenge request to the peer. 1479 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, alphaos_sha); 1480 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?; 1481 1482 /* Step 2: Receive the peer's challenge response followed by the challenge request. */ 1483 1484 // Listen for the challenge response message. 1485 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr); 1486 // Listen for the challenge request message. 1487 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr); 1488 1489 // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort. 1490 if let Some(reason) = self 1491 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce) 1492 .await 1493 { 1494 send_event(&mut framed, peer_addr, reason.into()).await?; 1495 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 1496 } 1497 // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort. 1498 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) { 1499 send_event(&mut framed, peer_addr, reason.into()).await?; 1500 if reason == DisconnectReason::NoReasonGiven { 1501 // The Alpha address is already connected; no reason to return an error. 1502 return Ok(None); 1503 } else { 1504 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 1505 } 1506 } 1507 1508 /* Step 3: Send the challenge response. */ 1509 1510 // Sign the counterparty nonce. 1511 let response_nonce: u64 = rng.r#gen(); 1512 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat(); 1513 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else { 1514 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'"))); 1515 }; 1516 // Send the challenge response. 1517 let our_response = 1518 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce }; 1519 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?; 1520 1521 Ok(Some(peer_request)) 1522 } 1523 1524 /// The connection responder side of the handshake. 1525 async fn handshake_inner_responder<'a>( 1526 &'a self, 1527 peer_addr: SocketAddr, 1528 peer_ip: &mut Option<SocketAddr>, 1529 restrictions_id: Field<N>, 1530 stream: &'a mut TcpStream, 1531 ) -> io::Result<Option<ChallengeRequest<N>>> { 1532 // Construct the stream. 1533 let mut framed = Framed::new(stream, EventCodec::<N>::handshake()); 1534 1535 /* Step 1: Receive the challenge request. */ 1536 1537 // Listen for the challenge request message. 1538 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr); 1539 1540 // Ensure the address is not the same as this node. 1541 if self.account.address() == peer_request.address { 1542 return Err(error("Skipping request to connect to self".to_string())); 1543 } 1544 1545 // Obtain the peer's listening address. 1546 *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port)); 1547 let peer_ip = peer_ip.unwrap(); 1548 1549 // Knowing the peer's listening address, ensure it is allowed to connect. 1550 if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) { 1551 return Err(error(format!("{forbidden_message}"))); 1552 } 1553 1554 // Introduce the peer into the peer pool. 1555 if !self.add_connecting_peer(peer_ip) { 1556 return Ok(None); 1557 } 1558 1559 // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort. 1560 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) { 1561 send_event(&mut framed, peer_addr, reason.into()).await?; 1562 if reason == DisconnectReason::NoReasonGiven { 1563 // The Alpha address is already connected; no reason to return an error. 1564 return Ok(None); 1565 } else { 1566 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 1567 } 1568 } 1569 1570 /* Step 2: Send the challenge response followed by own challenge request. */ 1571 1572 // Initialize an RNG. 1573 let rng = &mut rand::rngs::OsRng; 1574 1575 // Sign the counterparty nonce. 1576 let response_nonce: u64 = rng.r#gen(); 1577 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat(); 1578 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else { 1579 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'"))); 1580 }; 1581 // Send the challenge response. 1582 let our_response = 1583 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce }; 1584 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?; 1585 1586 // Sample a random nonce. 1587 let our_nonce = rng.r#gen(); 1588 // Determine the alphaos SHA to send to the peer. 1589 let current_block_height = self.ledger.latest_block_height(); 1590 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap(); 1591 let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) { 1592 (true, Some(sha)) => Some(sha), 1593 _ => None, 1594 }; 1595 // Send the challenge request. 1596 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, alphaos_sha); 1597 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?; 1598 1599 /* Step 3: Receive the challenge response. */ 1600 1601 // Listen for the challenge response message. 1602 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr); 1603 // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort. 1604 if let Some(reason) = self 1605 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce) 1606 .await 1607 { 1608 send_event(&mut framed, peer_addr, reason.into()).await?; 1609 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 1610 } 1611 1612 Ok(Some(peer_request)) 1613 } 1614 1615 /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid. 1616 fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> { 1617 // Retrieve the components of the challenge request. 1618 let &ChallengeRequest { version, listener_port, address, nonce: _, ref alphaos_sha } = event; 1619 log_repo_sha_comparison(peer_addr, alphaos_sha, CONTEXT); 1620 1621 let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port); 1622 1623 // Ensure the event protocol version is not outdated. 1624 if version < Event::<N>::VERSION { 1625 warn!("{CONTEXT} Dropping '{peer_addr}' on version {version} (outdated)"); 1626 return Some(DisconnectReason::OutdatedClientVersion); 1627 } 1628 // If the node is in trusted peers only mode, ensure the peer is trusted. 1629 if self.trusted_peers_only && !self.is_trusted(listener_addr) { 1630 warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})"); 1631 return Some(DisconnectReason::ProtocolViolation); 1632 } 1633 if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) { 1634 // Ensure the address is a current committee member. 1635 if !self.is_authorized_validator_address(address) { 1636 warn!("{CONTEXT} Dropping '{peer_addr}' for being an unauthorized validator ({address})"); 1637 return Some(DisconnectReason::ProtocolViolation); 1638 } 1639 } 1640 // Ensure the address is not already connected. 1641 if self.is_connected_address(address) { 1642 warn!("{CONTEXT} Dropping '{peer_addr}' for being already connected ({address})"); 1643 return Some(DisconnectReason::NoReasonGiven); 1644 } 1645 None 1646 } 1647 1648 /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid. 1649 async fn verify_challenge_response( 1650 &self, 1651 peer_addr: SocketAddr, 1652 peer_address: Address<N>, 1653 response: ChallengeResponse<N>, 1654 expected_restrictions_id: Field<N>, 1655 expected_nonce: u64, 1656 ) -> Option<DisconnectReason> { 1657 // Retrieve the components of the challenge response. 1658 let ChallengeResponse { restrictions_id, signature, nonce } = response; 1659 1660 // Verify the restrictions ID. 1661 if restrictions_id != expected_restrictions_id { 1662 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)"); 1663 return Some(DisconnectReason::InvalidChallengeResponse); 1664 } 1665 // Perform the deferred non-blocking deserialization of the signature. 1666 let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else { 1667 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)"); 1668 return Some(DisconnectReason::InvalidChallengeResponse); 1669 }; 1670 // Verify the signature. 1671 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) { 1672 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)"); 1673 return Some(DisconnectReason::InvalidChallengeResponse); 1674 } 1675 None 1676 } 1677 } 1678 1679 #[cfg(test)] 1680 mod prop_tests { 1681 use crate::{ 1682 gateway::prop_tests::GatewayAddress::{Dev, Prod}, 1683 helpers::{init_primary_channels, init_worker_channels, Storage}, 1684 Gateway, 1685 Worker, 1686 MAX_WORKERS, 1687 MEMORY_POOL_PORT, 1688 }; 1689 use alphaos_account::Account; 1690 use alphaos_node_bft_ledger_service::MockLedgerService; 1691 use alphaos_node_bft_storage_service::BFTMemoryService; 1692 use alphaos_node_network::PeerPoolHandling; 1693 use alphaos_node_tcp::P2P; 1694 use alphastd::StorageMode; 1695 use alphavm::{ 1696 ledger::{ 1697 committee::{ 1698 prop_tests::{CommitteeContext, ValidatorSet}, 1699 test_helpers::sample_committee_for_round_and_members, 1700 Committee, 1701 }, 1702 narwhal::{batch_certificate::test_helpers::sample_batch_certificate_for_round, BatchHeader}, 1703 }, 1704 prelude::{MainnetV0, PrivateKey}, 1705 utilities::TestRng, 1706 }; 1707 1708 use indexmap::{IndexMap, IndexSet}; 1709 use proptest::{ 1710 prelude::{any, any_with, Arbitrary, BoxedStrategy, Just, Strategy}, 1711 sample::Selector, 1712 }; 1713 use std::{ 1714 fmt::{Debug, Formatter}, 1715 net::{IpAddr, Ipv4Addr, SocketAddr}, 1716 sync::Arc, 1717 }; 1718 use test_strategy::proptest; 1719 1720 type CurrentNetwork = MainnetV0; 1721 1722 impl Debug for Gateway<CurrentNetwork> { 1723 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 1724 // TODO implement Debug properly and move it over to production code 1725 f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish() 1726 } 1727 } 1728 1729 #[derive(Debug, test_strategy::Arbitrary)] 1730 enum GatewayAddress { 1731 Dev(u8), 1732 Prod(Option<SocketAddr>), 1733 } 1734 1735 impl GatewayAddress { 1736 fn ip(&self) -> Option<SocketAddr> { 1737 if let GatewayAddress::Prod(ip) = self { 1738 return *ip; 1739 } 1740 None 1741 } 1742 1743 fn port(&self) -> Option<u16> { 1744 if let GatewayAddress::Dev(port) = self { 1745 return Some(*port as u16); 1746 } 1747 None 1748 } 1749 } 1750 1751 impl Arbitrary for Gateway<CurrentNetwork> { 1752 type Parameters = (); 1753 type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>; 1754 1755 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { 1756 any_valid_dev_gateway() 1757 .prop_map(|(storage, _, private_key, address)| { 1758 Gateway::new( 1759 Account::try_from(private_key).unwrap(), 1760 storage.clone(), 1761 storage.ledger().clone(), 1762 address.ip(), 1763 &[], 1764 false, 1765 StorageMode::new_test(None), 1766 address.port(), 1767 ) 1768 .unwrap() 1769 }) 1770 .boxed() 1771 } 1772 } 1773 1774 type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress); 1775 1776 fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> { 1777 (any::<CommitteeContext>(), any::<Selector>()) 1778 .prop_flat_map(|(context, account_selector)| { 1779 let CommitteeContext(_, ValidatorSet(validators)) = context.clone(); 1780 ( 1781 any_with::<Storage<CurrentNetwork>>(context.clone()), 1782 Just(context), 1783 Just(account_selector.select(validators)), 1784 0u8.., 1785 ) 1786 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d))) 1787 }) 1788 .boxed() 1789 } 1790 1791 fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> { 1792 (any::<CommitteeContext>(), any::<Selector>()) 1793 .prop_flat_map(|(context, account_selector)| { 1794 let CommitteeContext(_, ValidatorSet(validators)) = context.clone(); 1795 ( 1796 any_with::<Storage<CurrentNetwork>>(context.clone()), 1797 Just(context), 1798 Just(account_selector.select(validators)), 1799 any::<Option<SocketAddr>>(), 1800 ) 1801 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d))) 1802 }) 1803 .boxed() 1804 } 1805 1806 #[proptest] 1807 fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) { 1808 let (storage, _, private_key, dev) = input; 1809 let account = Account::try_from(private_key).unwrap(); 1810 1811 let gateway = Gateway::new( 1812 account.clone(), 1813 storage.clone(), 1814 storage.ledger().clone(), 1815 dev.ip(), 1816 &[], 1817 false, 1818 StorageMode::new_test(None), 1819 dev.port(), 1820 ) 1821 .unwrap(); 1822 let tcp_config = gateway.tcp().config(); 1823 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST))); 1824 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap())); 1825 1826 let tcp_config = gateway.tcp().config(); 1827 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap()); 1828 assert_eq!(gateway.account().address(), account.address()); 1829 } 1830 1831 #[proptest] 1832 fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) { 1833 let (storage, _, private_key, dev) = input; 1834 let account = Account::try_from(private_key).unwrap(); 1835 1836 let gateway = Gateway::new( 1837 account.clone(), 1838 storage.clone(), 1839 storage.ledger().clone(), 1840 dev.ip(), 1841 &[], 1842 false, 1843 StorageMode::new_test(None), 1844 dev.port(), 1845 ) 1846 .unwrap(); 1847 let tcp_config = gateway.tcp().config(); 1848 if let Some(socket_addr) = dev.ip() { 1849 assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip())); 1850 assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port())); 1851 } else { 1852 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED))); 1853 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT)); 1854 } 1855 1856 let tcp_config = gateway.tcp().config(); 1857 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap()); 1858 assert_eq!(gateway.account().address(), account.address()); 1859 } 1860 1861 #[proptest(async = "tokio")] 1862 async fn gateway_start( 1863 #[strategy(any_valid_dev_gateway())] input: GatewayInput, 1864 #[strategy(0..MAX_WORKERS)] workers_count: u8, 1865 ) { 1866 let (storage, committee, private_key, dev) = input; 1867 let committee = committee.0; 1868 let worker_storage = storage.clone(); 1869 let account = Account::try_from(private_key).unwrap(); 1870 1871 let gateway = Gateway::new( 1872 account, 1873 storage.clone(), 1874 storage.ledger().clone(), 1875 dev.ip(), 1876 &[], 1877 false, 1878 StorageMode::new_test(None), 1879 dev.port(), 1880 ) 1881 .unwrap(); 1882 1883 let (primary_sender, _) = init_primary_channels(); 1884 1885 let (workers, worker_senders) = { 1886 // Construct a map of the worker senders. 1887 let mut tx_workers = IndexMap::new(); 1888 let mut workers = IndexMap::new(); 1889 1890 // Initialize the workers. 1891 for id in 0..workers_count { 1892 // Construct the worker channels. 1893 let (tx_worker, rx_worker) = init_worker_channels(); 1894 // Construct the worker instance. 1895 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1896 let worker = 1897 Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default()) 1898 .unwrap(); 1899 // Run the worker instance. 1900 worker.run(rx_worker); 1901 1902 // Add the worker and the worker sender to maps 1903 workers.insert(id, worker); 1904 tx_workers.insert(id, tx_worker); 1905 } 1906 (workers, tx_workers) 1907 }; 1908 1909 gateway.run(primary_sender, worker_senders, None).await; 1910 assert_eq!( 1911 gateway.local_ip(), 1912 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap()) 1913 ); 1914 assert_eq!(gateway.num_workers(), workers.len() as u8); 1915 } 1916 1917 #[proptest] 1918 fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) { 1919 let rng = &mut TestRng::default(); 1920 1921 // Initialize the round parameters. 1922 let current_round = 2; 1923 let committee_size = 4; 1924 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64; 1925 let (_, _, private_key, dev) = input; 1926 let account = Account::try_from(private_key).unwrap(); 1927 1928 // Sample the certificates. 1929 let mut certificates = IndexSet::new(); 1930 for _ in 0..committee_size { 1931 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 1932 } 1933 let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect(); 1934 // Initialize the committee. 1935 let committee = sample_committee_for_round_and_members(current_round, addresses, rng); 1936 // Sample extra certificates from non-committee members. 1937 for _ in 0..committee_size { 1938 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 1939 } 1940 // Initialize the ledger. 1941 let ledger = Arc::new(MockLedgerService::new(committee.clone())); 1942 // Initialize the storage. 1943 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1944 // Initialize the gateway. 1945 let gateway = Gateway::new( 1946 account.clone(), 1947 storage.clone(), 1948 ledger.clone(), 1949 dev.ip(), 1950 &[], 1951 false, 1952 StorageMode::new_test(None), 1953 dev.port(), 1954 ) 1955 .unwrap(); 1956 // Insert certificate to the storage. 1957 for certificate in certificates.iter() { 1958 storage.testing_only_insert_certificate_testing_only(certificate.clone()); 1959 } 1960 // Check that the current committee members are authorized validators. 1961 for i in 0..certificates.clone().len() { 1962 let is_authorized = gateway.is_authorized_validator_address(certificates[i].author()); 1963 if i < committee_size { 1964 assert!(is_authorized); 1965 } else { 1966 assert!(!is_authorized); 1967 } 1968 } 1969 } 1970 }