/ node / bft / src / gateway.rs
gateway.rs
   1  // Copyright (c) 2025-2026 ACDC Network
   2  // This file is part of the alphaos library.
   3  //
   4  // Alpha Chain | Delta Chain Protocol
   5  // International Monetary Graphite.
   6  //
   7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
   8  // They built world-class ZK infrastructure. We installed the EASY button.
   9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
  10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
  11  //
  12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
  13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
  14  // No rights reserved. No permission required. No warranty. No refunds.
  15  //
  16  // https://creativecommons.org/publicdomain/zero/1.0/
  17  // SPDX-License-Identifier: CC0-1.0
  18  
  19  #[cfg(feature = "telemetry")]
  20  use crate::helpers::Telemetry;
  21  use crate::{
  22      events::{Disconnect as DisconnectEvent, DisconnectReason, EventCodec, PrimaryPing},
  23      helpers::{assign_to_worker, Cache, PrimarySender, Storage, SyncSender, WorkerSender},
  24      spawn_blocking,
  25      Worker,
  26      CONTEXT,
  27      MAX_BATCH_DELAY_IN_MS,
  28      MEMORY_POOL_PORT,
  29  };
  30  use alphaos_account::Account;
  31  use alphaos_node_bft_events::{
  32      BlockRequest,
  33      BlockResponse,
  34      CertificateRequest,
  35      CertificateResponse,
  36      ChallengeRequest,
  37      ChallengeResponse,
  38      DataBlocks,
  39      Event,
  40      EventTrait,
  41      TransmissionRequest,
  42      TransmissionResponse,
  43      ValidatorsRequest,
  44      ValidatorsResponse,
  45  };
  46  use alphaos_node_bft_ledger_service::LedgerService;
  47  use alphaos_node_network::{
  48      bootstrap_peers,
  49      get_repo_commit_hash,
  50      log_repo_sha_comparison,
  51      ConnectionMode,
  52      NodeType,
  53      Peer,
  54      PeerPoolHandling,
  55      Resolver,
  56  };
  57  use alphaos_node_sync::{communication_service::CommunicationService, MAX_BLOCKS_BEHIND};
  58  use alphaos_node_tcp::{
  59      protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
  60      Config,
  61      Connection,
  62      ConnectionSide,
  63      Tcp,
  64      P2P,
  65  };
  66  use alphastd::StorageMode;
  67  use alphavm::{
  68      console::prelude::*,
  69      ledger::{
  70          committee::Committee,
  71          narwhal::{BatchHeader, Data},
  72      },
  73      prelude::{Address, Field},
  74  };
  75  
  76  use colored::Colorize;
  77  use futures::SinkExt;
  78  use indexmap::IndexMap;
  79  #[cfg(feature = "locktick")]
  80  use locktick::parking_lot::{Mutex, RwLock};
  81  #[cfg(not(feature = "locktick"))]
  82  use parking_lot::{Mutex, RwLock};
  83  use rand::{
  84      rngs::OsRng,
  85      seq::{IteratorRandom, SliceRandom},
  86  };
  87  use std::{
  88      collections::{HashMap, HashSet},
  89      future::Future,
  90      io,
  91      net::{Ipv4Addr, SocketAddr, SocketAddrV4},
  92      sync::Arc,
  93      time::{Duration, Instant},
  94  };
  95  use tokio::{
  96      net::TcpStream,
  97      sync::{oneshot, OnceCell},
  98      task::{self, JoinHandle},
  99  };
 100  use tokio_stream::StreamExt;
 101  use tokio_util::codec::Framed;
 102  
 103  /// The maximum interval of events to cache.
 104  const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
 105  /// The maximum interval of requests to cache.
 106  const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
 107  
 108  /// The maximum number of connection attempts in an interval.
 109  const MAX_CONNECTION_ATTEMPTS: usize = 10;
 110  /// The maximum interval to restrict a peer.
 111  const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
 112  
 113  /// The maximum number of validators to send in a validators response event.
 114  pub const MAX_VALIDATORS_TO_SEND: usize = 200;
 115  
 116  /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
 117  #[cfg(not(any(test)))]
 118  const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
 119  /// The amount of time an IP address is prohibited from connecting.
 120  const IP_BAN_TIME_IN_SECS: u64 = 300;
 121  
 122  /// The name of the file containing cached validators.
 123  const VALIDATOR_CACHE_FILENAME: &str = "cached_gateway_peers";
 124  
 125  /// The name of the file containing the dynamic validator whitelist.
 126  const VALIDATOR_WHITELIST_FILENAME: &str = "dynamic_validator_whitelist";
 127  
 128  /// Part of the Gateway API that deals with networking.
 129  /// This is a separate trait to allow for easier testing/mocking.
 130  #[async_trait]
 131  pub trait Transport<N: Network>: Send + Sync {
 132      async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
 133      fn broadcast(&self, event: Event<N>);
 134  }
 135  
 136  /// The gateway maintains connections to other validators.
 137  /// For connections with clients and provers, the Router logic is used.
 138  #[derive(Clone)]
 139  pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
 140  
 141  impl<N: Network> Deref for Gateway<N> {
 142      type Target = Arc<InnerGateway<N>>;
 143  
 144      fn deref(&self) -> &Self::Target {
 145          &self.0
 146      }
 147  }
 148  
 149  pub struct InnerGateway<N: Network> {
 150      /// The account of the node.
 151      account: Account<N>,
 152      /// The storage.
 153      storage: Storage<N>,
 154      /// The ledger service.
 155      ledger: Arc<dyn LedgerService<N>>,
 156      /// The TCP stack.
 157      tcp: Tcp,
 158      /// The cache.
 159      cache: Cache<N>,
 160      /// The resolver.
 161      resolver: RwLock<Resolver<N>>,
 162      /// The collection of both candidate and connected peers.
 163      peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
 164      #[cfg(feature = "telemetry")]
 165      validator_telemetry: Telemetry<N>,
 166      /// The primary sender.
 167      primary_sender: OnceCell<PrimarySender<N>>,
 168      /// The worker senders.
 169      worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
 170      /// The sync sender.
 171      sync_sender: OnceCell<SyncSender<N>>,
 172      /// The spawned handles.
 173      handles: Mutex<Vec<JoinHandle<()>>>,
 174      /// The storage mode.
 175      storage_mode: StorageMode,
 176      /// If the flag is set, the node will only connect to trusted peers.
 177      trusted_peers_only: bool,
 178      /// The development mode.
 179      dev: Option<u16>,
 180  }
 181  
 182  impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
 183      const MAXIMUM_POOL_SIZE: usize = 200;
 184      const OWNER: &str = CONTEXT;
 185      const PEER_SLASHING_COUNT: usize = 20;
 186  
 187      fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
 188          &self.peer_pool
 189      }
 190  
 191      fn resolver(&self) -> &RwLock<Resolver<N>> {
 192          &self.resolver
 193      }
 194  
 195      fn is_dev(&self) -> bool {
 196          self.dev.is_some()
 197      }
 198  
 199      fn trusted_peers_only(&self) -> bool {
 200          self.trusted_peers_only
 201      }
 202  
 203      fn node_type(&self) -> NodeType {
 204          NodeType::Validator
 205      }
 206  }
 207  
 208  impl<N: Network> Gateway<N> {
 209      /// Initializes a new gateway.
 210      #[allow(clippy::too_many_arguments)]
 211      pub fn new(
 212          account: Account<N>,
 213          storage: Storage<N>,
 214          ledger: Arc<dyn LedgerService<N>>,
 215          ip: Option<SocketAddr>,
 216          trusted_validators: &[SocketAddr],
 217          trusted_peers_only: bool,
 218          storage_mode: StorageMode,
 219          dev: Option<u16>,
 220      ) -> Result<Self> {
 221          // Initialize the gateway IP.
 222          let ip = match (ip, dev) {
 223              (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
 224              (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
 225              (Some(ip), _) => ip,
 226          };
 227          // Initialize the TCP stack.
 228          let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
 229  
 230          // Prepare the collection of the initial peers.
 231          let mut initial_peers = HashMap::new();
 232  
 233          // Load entries from the validator cache (if present and if we are not in trusted peers only mode).
 234          if !trusted_peers_only {
 235              let cached_peers = Self::load_cached_peers(&storage_mode, VALIDATOR_CACHE_FILENAME)?;
 236              for addr in cached_peers {
 237                  initial_peers.insert(addr, Peer::new_candidate(addr, false));
 238              }
 239          }
 240  
 241          // Add the trusted peers to the list of the initial peers; this may promote
 242          // some of the cached validators to trusted ones.
 243          initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
 244  
 245          // Return the gateway.
 246          Ok(Self(Arc::new(InnerGateway {
 247              account,
 248              storage,
 249              ledger,
 250              tcp,
 251              cache: Default::default(),
 252              resolver: Default::default(),
 253              peer_pool: RwLock::new(initial_peers),
 254              #[cfg(feature = "telemetry")]
 255              validator_telemetry: Default::default(),
 256              primary_sender: Default::default(),
 257              worker_senders: Default::default(),
 258              sync_sender: Default::default(),
 259              handles: Default::default(),
 260              storage_mode,
 261              trusted_peers_only,
 262              dev,
 263          })))
 264      }
 265  
 266      /// Run the gateway.
 267      pub async fn run(
 268          &self,
 269          primary_sender: PrimarySender<N>,
 270          worker_senders: IndexMap<u8, WorkerSender<N>>,
 271          sync_sender: Option<SyncSender<N>>,
 272      ) {
 273          debug!("Starting the gateway for the memory pool...");
 274  
 275          // Set the primary sender.
 276          self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
 277  
 278          // Set the worker senders.
 279          self.worker_senders.set(worker_senders).expect("The worker senders are already set");
 280  
 281          // If the sync sender was provided, set the sync sender.
 282          if let Some(sync_sender) = sync_sender {
 283              self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
 284          }
 285  
 286          // Enable the TCP protocols.
 287          self.enable_handshake().await;
 288          self.enable_reading().await;
 289          self.enable_writing().await;
 290          self.enable_disconnect().await;
 291          self.enable_on_connect().await;
 292  
 293          // Enable the TCP listener. Note: This must be called after the above protocols.
 294          let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
 295          debug!("Listening for validator connections at address {listen_addr:?}");
 296  
 297          // Initialize the heartbeat.
 298          self.initialize_heartbeat();
 299  
 300          info!("Started the gateway for the memory pool at '{}'", self.local_ip());
 301      }
 302  }
 303  
 304  // Dynamic rate limiting.
 305  impl<N: Network> Gateway<N> {
 306      /// The current maximum committee size.
 307      fn max_committee_size(&self) -> usize {
 308          self.ledger.current_committee().map_or_else(
 309              |_e| Committee::<N>::max_committee_size().unwrap() as usize,
 310              |committee| committee.num_members(),
 311          )
 312      }
 313  
 314      /// The maximum number of events to cache.
 315      fn max_cache_events(&self) -> usize {
 316          self.max_cache_transmissions()
 317      }
 318  
 319      /// The maximum number of certificate requests to cache.
 320      fn max_cache_certificates(&self) -> usize {
 321          2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
 322      }
 323  
 324      /// The maximum number of transmission requests to cache.
 325      fn max_cache_transmissions(&self) -> usize {
 326          self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
 327      }
 328  
 329      /// The maximum number of duplicates for any particular request.
 330      fn max_cache_duplicates(&self) -> usize {
 331          self.max_committee_size().pow(2)
 332      }
 333  }
 334  
 335  #[async_trait]
 336  impl<N: Network> CommunicationService for Gateway<N> {
 337      /// The message type.
 338      type Message = Event<N>;
 339  
 340      /// Prepares a block request to be sent.
 341      fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
 342          debug_assert!(start_height < end_height, "Invalid block request format");
 343          Event::BlockRequest(BlockRequest { start_height, end_height })
 344      }
 345  
 346      /// Sends the given message to specified peer.
 347      ///
 348      /// This function returns as soon as the message is queued to be sent,
 349      /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
 350      /// which can be used to determine when and whether the message has been delivered.
 351      async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
 352          Transport::send(self, peer_ip, message).await
 353      }
 354  }
 355  
 356  impl<N: Network> Gateway<N> {
 357      /// Returns the account of the node.
 358      pub fn account(&self) -> &Account<N> {
 359          &self.account
 360      }
 361  
 362      /// Returns the dev identifier of the node.
 363      pub fn dev(&self) -> Option<u16> {
 364          self.dev
 365      }
 366  
 367      /// Returns the resolver.
 368      pub fn resolver(&self) -> &RwLock<Resolver<N>> {
 369          &self.resolver
 370      }
 371  
 372      /// Returns the listener IP address from the (ambiguous) peer address.
 373      pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
 374          self.resolver.read().get_listener(*connected_addr)
 375      }
 376  
 377      /// Returns the validator telemetry.
 378      #[cfg(feature = "telemetry")]
 379      pub fn validator_telemetry(&self) -> &Telemetry<N> {
 380          &self.validator_telemetry
 381      }
 382  
 383      /// Returns the primary sender.
 384      pub fn primary_sender(&self) -> &PrimarySender<N> {
 385          self.primary_sender.get().expect("Primary sender not set in gateway")
 386      }
 387  
 388      /// Returns the number of workers.
 389      pub fn num_workers(&self) -> u8 {
 390          u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
 391              .expect("Too many workers")
 392      }
 393  
 394      /// Returns the worker sender for the given worker ID.
 395      pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
 396          self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
 397      }
 398  
 399      /// Returns `true` if the given peer IP is an authorized validator.
 400      pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
 401          // If the peer IP is in the trusted validators, return early.
 402          if self.trusted_peers().contains(&ip) {
 403              return true;
 404          }
 405          // Retrieve the Alpha address of the peer IP.
 406          match self.resolve_to_alpha_addr(ip) {
 407              // Determine if the peer IP is an authorized validator.
 408              Some(address) => self.is_authorized_validator_address(address),
 409              None => false,
 410          }
 411      }
 412  
 413      /// Returns `true` if the given address is an authorized validator.
 414      pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
 415          // Determine if the validator address is a member of the committee lookback,
 416          // the current committee, or the previous committee lookbacks.
 417          // We allow leniency in this validation check in order to accommodate these two scenarios:
 418          //  1. New validators should be able to connect immediately once bonded as a committee member.
 419          //  2. Existing validators must remain connected until they are no longer bonded as a committee member.
 420          //     (i.e. meaning they must stay online until the next block has been produced)
 421  
 422          // Determine if the validator is in the current committee with lookback.
 423          if self
 424              .ledger
 425              .get_committee_lookback_for_round(self.storage.current_round())
 426              .is_ok_and(|committee| committee.is_committee_member(validator_address))
 427          {
 428              return true;
 429          }
 430  
 431          // Determine if the validator is in the latest committee on the ledger.
 432          if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
 433              return true;
 434          }
 435  
 436          // Retrieve the previous block height to consider from the sync tolerance.
 437          let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
 438          // Determine if the validator is in any of the previous committee lookbacks.
 439          match self.ledger.get_block_round(previous_block_height) {
 440              Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
 441                  self.ledger
 442                      .get_committee_lookback_for_round(round)
 443                      .is_ok_and(|committee| committee.is_committee_member(validator_address))
 444              }),
 445              Err(_) => false,
 446          }
 447      }
 448  
 449      /// Returns the list of connected addresses.
 450      pub fn connected_addresses(&self) -> HashSet<Address<N>> {
 451          self.get_connected_peers().into_iter().map(|peer| peer.alpha_addr).collect()
 452      }
 453  
 454      /// Ensure the peer is allowed to connect.
 455      fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
 456          // Ensure the peer IP is not this node.
 457          if self.is_local_ip(listener_addr) {
 458              bail!("{CONTEXT} Dropping connection request from '{listener_addr}' (attempted to self-connect)");
 459          }
 460          // Ensure the peer is not spamming connection attempts.
 461          if !listener_addr.ip().is_loopback() {
 462              // Add this connection attempt and retrieve the number of attempts.
 463              let num_attempts = self.cache.insert_inbound_connection(listener_addr.ip(), RESTRICTED_INTERVAL);
 464              // Ensure the connecting peer has not surpassed the connection attempt limit.
 465              if num_attempts > MAX_CONNECTION_ATTEMPTS {
 466                  bail!("Dropping connection request from '{listener_addr}' (tried {num_attempts} times)");
 467              }
 468          }
 469          Ok(())
 470      }
 471  
 472      #[cfg(feature = "metrics")]
 473      fn update_metrics(&self) {
 474          metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64);
 475          metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64);
 476      }
 477  
 478      /// Inserts the given peer into the connected peers. This is only used in testing.
 479      #[cfg(test)]
 480      pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
 481          // Adds a bidirectional map between the listener address and (ambiguous) peer address.
 482          self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
 483          // Add a transmission for this peer in the connected peers.
 484          self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
 485          if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
 486              peer.upgrade_to_connected(
 487                  peer_addr,
 488                  peer_ip.port(),
 489                  address,
 490                  NodeType::Validator,
 491                  0,
 492                  ConnectionMode::Gateway,
 493              );
 494          }
 495      }
 496  
 497      /// Sends the given event to specified peer.
 498      ///
 499      /// This function returns as soon as the event is queued to be sent,
 500      /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
 501      /// which can be used to determine when and whether the event has been delivered.
 502      fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
 503          // Resolve the listener IP to the (ambiguous) peer address.
 504          let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
 505              warn!("Unable to resolve the listener IP address '{peer_ip}'");
 506              return None;
 507          };
 508          // Retrieve the event name.
 509          let name = event.name();
 510          // Send the event to the peer.
 511          trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
 512          let result = self.unicast(peer_addr, event);
 513          // If the event was unable to be sent, disconnect.
 514          if let Err(e) = &result {
 515              warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
 516              debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
 517              self.disconnect(peer_ip);
 518          }
 519          result.ok()
 520      }
 521  
 522      /// Handles the inbound event from the peer. The returned value indicates whether
 523      /// the connection is still active, and errors cause a disconnect once they are
 524      /// propagated to the caller.
 525      async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
 526          // Retrieve the listener IP for the peer.
 527          let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
 528              // No longer connected to the peer.
 529              trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
 530              return Ok(false);
 531          };
 532          // Ensure that the peer is an authorized committee member or a bootstrapper.
 533          if !(self.is_authorized_validator_ip(peer_ip)
 534              || self
 535                  .get_connected_peer(peer_ip)
 536                  .map(|peer| peer.node_type == NodeType::BootstrapClient)
 537                  .unwrap_or(false))
 538          {
 539              bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
 540          }
 541          // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us).
 542          let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
 543          if num_events >= self.max_cache_events() {
 544              bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
 545          }
 546          // Rate limit for duplicate requests.
 547          match event {
 548              Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
 549                  // Retrieve the certificate ID.
 550                  let certificate_id = match &event {
 551                      Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
 552                      Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
 553                      _ => unreachable!(),
 554                  };
 555                  // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
 556                  let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
 557                  if num_events >= self.max_cache_duplicates() {
 558                      return Ok(true);
 559                  }
 560              }
 561              Event::TransmissionRequest(TransmissionRequest { transmission_id })
 562              | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
 563                  // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
 564                  let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
 565                  if num_events >= self.max_cache_duplicates() {
 566                      return Ok(true);
 567                  }
 568              }
 569              Event::BlockRequest(_) => {
 570                  let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
 571                  if num_events >= self.max_cache_duplicates() {
 572                      return Ok(true);
 573                  }
 574              }
 575              _ => {}
 576          }
 577          trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
 578  
 579          // This match statement handles the inbound event by deserializing the event,
 580          // checking the event is valid, and then calling the appropriate (trait) handler.
 581          match event {
 582              Event::BatchPropose(batch_propose) => {
 583                  // Send the batch propose to the primary.
 584                  let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
 585                  Ok(true)
 586              }
 587              Event::BatchSignature(batch_signature) => {
 588                  // Send the batch signature to the primary.
 589                  let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
 590                  Ok(true)
 591              }
 592              Event::BatchCertified(batch_certified) => {
 593                  // Send the batch certificate to the primary.
 594                  let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
 595                  Ok(true)
 596              }
 597              Event::BlockRequest(block_request) => {
 598                  let BlockRequest { start_height, end_height } = block_request;
 599  
 600                  // Ensure the block request is well-formed.
 601                  if start_height >= end_height {
 602                      bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
 603                  }
 604                  // Ensure that the block request is within the allowed bounds.
 605                  if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
 606                      bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
 607                  }
 608  
 609                  // End height is exclusive.
 610                  let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
 611  
 612                  let self_ = self.clone();
 613                  let blocks = match task::spawn_blocking(move || {
 614                      // Retrieve the blocks within the requested range.
 615                      match self_.ledger.get_blocks(start_height..end_height) {
 616                          Ok(blocks) => Ok(DataBlocks(blocks)),
 617                          Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
 618                      }
 619                  })
 620                  .await
 621                  {
 622                      Ok(Ok(blocks)) => blocks,
 623                      Ok(Err(error)) => return Err(error),
 624                      Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
 625                  };
 626  
 627                  let self_ = self.clone();
 628                  tokio::spawn(async move {
 629                      // Send the `BlockResponse` message to the peer.
 630                      let event =
 631                          Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
 632                      Transport::send(&self_, peer_ip, event).await;
 633                  });
 634                  Ok(true)
 635              }
 636              Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
 637                  // Process the block response. Except for some tests, there is always a sync sender.
 638                  if let Some(sync_sender) = self.sync_sender.get() {
 639                      // Check the response corresponds to a request.
 640                      if !self.cache.remove_outbound_block_request(peer_ip, &request) {
 641                          bail!("Unsolicited block response from '{peer_ip}'")
 642                      }
 643  
 644                      // Perform the deferred non-blocking deserialization of the blocks.
 645                      // The deserialization can take a long time (minutes). We should not be running
 646                      // this on a blocking task, but on a rayon thread pool.
 647                      let (send, recv) = tokio::sync::oneshot::channel();
 648                      rayon::spawn_fifo(move || {
 649                          let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
 650                          let _ = send.send(blocks);
 651                      });
 652                      let blocks = match recv.await {
 653                          Ok(Ok(blocks)) => blocks,
 654                          Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
 655                          Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
 656                      };
 657  
 658                      // Ensure the block response is well-formed.
 659                      blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
 660                      // Send the blocks to the sync module.
 661                      if let Err(err) =
 662                          sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await
 663                      {
 664                          warn!("Unable to process block response from '{peer_ip}' - {err}");
 665                      }
 666                  }
 667                  Ok(true)
 668              }
 669              Event::CertificateRequest(certificate_request) => {
 670                  // Send the certificate request to the sync module.
 671                  // Except for some tests, there is always a sync sender.
 672                  if let Some(sync_sender) = self.sync_sender.get() {
 673                      // Send the certificate request to the sync module.
 674                      let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
 675                  }
 676                  Ok(true)
 677              }
 678              Event::CertificateResponse(certificate_response) => {
 679                  // Send the certificate response to the sync module.
 680                  // Except for some tests, there is always a sync sender.
 681                  if let Some(sync_sender) = self.sync_sender.get() {
 682                      // Send the certificate response to the sync module.
 683                      let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
 684                  }
 685                  Ok(true)
 686              }
 687              Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
 688                  // Disconnect as the peer is not following the protocol.
 689                  bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
 690              }
 691              Event::Disconnect(message) => {
 692                  // The peer informs us that they had disconnected. Disconnect from them too.
 693                  debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
 694                  self.disconnect(peer_ip);
 695                  Ok(false)
 696              }
 697              Event::PrimaryPing(ping) => {
 698                  let PrimaryPing { version, block_locators, primary_certificate } = ping;
 699  
 700                  // Ensure the event version is not outdated.
 701                  if version < Event::<N>::VERSION {
 702                      bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
 703                  }
 704  
 705                  // Log the validator's height.
 706                  debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height());
 707  
 708                  // Update the peer locators. Except for some tests, there is always a sync sender.
 709                  if let Some(sync_sender) = self.sync_sender.get() {
 710                      // Check the block locators are valid, and update the validators in the sync module.
 711                      if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
 712                          bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
 713                      }
 714                  }
 715  
 716                  // Send the batch certificates to the primary.
 717                  let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
 718                  Ok(true)
 719              }
 720              Event::TransmissionRequest(request) => {
 721                  // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis.
 722                  // Determine the worker ID.
 723                  let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
 724                      warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
 725                      return Ok(true);
 726                  };
 727                  // Send the transmission request to the worker.
 728                  if let Some(sender) = self.get_worker_sender(worker_id) {
 729                      // Send the transmission request to the worker.
 730                      let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
 731                  }
 732                  Ok(true)
 733              }
 734              Event::TransmissionResponse(response) => {
 735                  // Determine the worker ID.
 736                  let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
 737                      warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
 738                      return Ok(true);
 739                  };
 740                  // Send the transmission response to the worker.
 741                  if let Some(sender) = self.get_worker_sender(worker_id) {
 742                      // Send the transmission response to the worker.
 743                      let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
 744                  }
 745                  Ok(true)
 746              }
 747              Event::ValidatorsRequest(_) => {
 748                  let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
 749                  connected_peers.shuffle(&mut rand::thread_rng());
 750  
 751                  let self_ = self.clone();
 752                  tokio::spawn(async move {
 753                      // Initialize the validators.
 754                      let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
 755                      // Iterate over the validators.
 756                      for validator in connected_peers.into_iter() {
 757                          // Add the validator to the list of validators.
 758                          validators.insert(validator.listener_addr, validator.alpha_addr);
 759                      }
 760                      // Send the validators response to the peer.
 761                      let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
 762                      Transport::send(&self_, peer_ip, event).await;
 763                  });
 764                  Ok(true)
 765              }
 766              Event::ValidatorsResponse(response) => {
 767                  if self.trusted_peers_only {
 768                      bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
 769                  }
 770                  let ValidatorsResponse { validators } = response;
 771                  // Ensure the number of validators is not too large.
 772                  ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
 773                  // Ensure the cache contains a validators request for this peer.
 774                  if !self.cache.contains_outbound_validators_request(peer_ip) {
 775                      bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
 776                  }
 777                  // Decrement the number of validators requests for this peer.
 778                  self.cache.decrement_outbound_validators_requests(peer_ip);
 779  
 780                  // Add valid validators as candidates to the peer pool; only validator-related
 781                  // filters need to be applied, the rest is handled by `PeerPoolHandling`.
 782                  let valid_addrs = validators
 783                      .into_iter()
 784                      .filter_map(|(listener_addr, alpha_addr)| {
 785                          (self.account.address() != alpha_addr
 786                              && !self.is_connected_address(alpha_addr)
 787                              && self.is_authorized_validator_address(alpha_addr))
 788                          .then_some((listener_addr, None))
 789                      })
 790                      .collect::<Vec<_>>();
 791                  if !valid_addrs.is_empty() {
 792                      self.insert_candidate_peers(valid_addrs);
 793                  }
 794  
 795                  #[cfg(feature = "metrics")]
 796                  self.update_metrics();
 797  
 798                  Ok(true)
 799              }
 800              Event::WorkerPing(ping) => {
 801                  // Ensure the number of transmissions is not too large.
 802                  ensure!(
 803                      ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
 804                      "{CONTEXT} Received too many transmissions"
 805                  );
 806                  // Retrieve the number of workers.
 807                  let num_workers = self.num_workers();
 808                  // Iterate over the transmission IDs.
 809                  for transmission_id in ping.transmission_ids.into_iter() {
 810                      // Determine the worker ID.
 811                      let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
 812                          warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
 813                          continue;
 814                      };
 815                      // Send the transmission ID to the worker.
 816                      if let Some(sender) = self.get_worker_sender(worker_id) {
 817                          // Send the transmission ID to the worker.
 818                          let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
 819                      }
 820                  }
 821                  Ok(true)
 822              }
 823          }
 824      }
 825  
 826      /// Initialize a new instance of the heartbeat.
 827      fn initialize_heartbeat(&self) {
 828          let self_clone = self.clone();
 829          self.spawn(async move {
 830              let start = Instant::now();
 831              // Sleep briefly to ensure the other nodes are ready to connect.
 832              tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
 833              info!("Starting the heartbeat of the gateway...");
 834              loop {
 835                  // Process a heartbeat in the gateway.
 836                  let uptime = start.elapsed();
 837                  self_clone.heartbeat(uptime).await;
 838                  // Sleep for the heartbeat interval.
 839                  tokio::time::sleep(Duration::from_secs(15)).await;
 840              }
 841          });
 842      }
 843  
 844      /// Spawns a task with the given future; it should only be used for long-running tasks.
 845      #[allow(dead_code)]
 846      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
 847          self.handles.lock().push(tokio::spawn(future));
 848      }
 849  
 850      /// Shuts down the gateway.
 851      pub async fn shut_down(&self) {
 852          info!("Shutting down the gateway...");
 853          // Save the best peers for future use.
 854          if let Err(e) = self.save_best_peers(&self.storage_mode, VALIDATOR_CACHE_FILENAME, None, true) {
 855              warn!("Failed to persist best validators to disk: {e}");
 856          }
 857          // Abort the tasks.
 858          self.handles.lock().iter().for_each(|handle| handle.abort());
 859          // Close the listener.
 860          self.tcp.shut_down().await;
 861      }
 862  }
 863  
 864  impl<N: Network> Gateway<N> {
 865      /// The uptime after which nodes log a warning about missing validator connections.
 866      const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60);
 867  
 868      /// Handles the heartbeat request.
 869      async fn heartbeat(&self, uptime: Duration) {
 870          // Log the connected validators.
 871          self.log_connected_validators(uptime);
 872          // Log the validator participation scores.
 873          #[cfg(feature = "telemetry")]
 874          self.log_participation_scores();
 875          // Keep the trusted validators connected.
 876          self.handle_trusted_validators();
 877          // Keep the bootstrap peers within the allowed range.
 878          self.handle_bootstrap_peers().await;
 879          // Removes any validators that not in the current committee.
 880          self.handle_unauthorized_validators();
 881          // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
 882          self.handle_min_connected_validators();
 883          // Unban any addresses whose ban time has expired.
 884          self.handle_banned_ips();
 885          // Update the dynamic validator whitelist.
 886          self.update_validator_whitelist();
 887      }
 888  
 889      /// Logs the connected validators.
 890      fn log_connected_validators(&self, uptime: Duration) {
 891          // Retrieve the connected validators and current committee.
 892          let connected_validators = self.connected_peers();
 893          let committee = match self.ledger.current_committee() {
 894              Ok(c) => c,
 895              Err(err) => {
 896                  error!("Failed to get current committee: {err}");
 897                  return;
 898              }
 899          };
 900  
 901          // Resolve the total number of connectable validators.
 902          let validators_total = committee.num_members().saturating_sub(1);
 903          // Format the total validators message.
 904          let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
 905          // Construct the connections message.
 906          let connections_msg = match connected_validators.len() {
 907              0 => "No connected validators".to_string(),
 908              num_connected => format!("Connected to {num_connected} validators {total_validators}"),
 909          };
 910          info!("{connections_msg}");
 911  
 912          // Collect the connected validator addresses and stake.
 913          let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
 914          // Include our own address.
 915          connected_validator_addresses.insert(self.account.address());
 916          // Include and log the connected validators.
 917          for peer_ip in &connected_validators {
 918              let address = self.resolve_to_alpha_addr(*peer_ip).map_or("Unknown".to_string(), |a| {
 919                  connected_validator_addresses.insert(a);
 920                  a.to_string()
 921              });
 922              debug!("{}", format!("  Connected to: {peer_ip} - {address}").dimmed());
 923          }
 924  
 925          // Log the validators that are not connected.
 926          let num_not_connected = validators_total.saturating_sub(connected_validators.len());
 927          if num_not_connected > 0 {
 928              // Cache the total stake for computing percentages.
 929              let total_stake = committee.total_stake();
 930              let total_stake_f64 = total_stake as f64;
 931  
 932              // Collect the committee members.
 933              let committee_members: HashSet<_> =
 934                  self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
 935  
 936              let not_connected_stake: u64 = committee_members
 937                  .difference(&connected_validator_addresses)
 938                  .map(|address| {
 939                      let address_stake = committee.get_stake(*address);
 940                      let address_stake_as_percentage =
 941                          if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 };
 942                      debug!(
 943                          "{}",
 944                          format!("  Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)")
 945                              .dimmed()
 946                      );
 947                      address_stake
 948                  })
 949                  .sum();
 950  
 951              let not_connected_stake_as_percentage =
 952                  if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 };
 953              warn!(
 954                  "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)"
 955              );
 956              #[cfg(feature = "metrics")]
 957              {
 958                  let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage;
 959                  metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage);
 960              }
 961          } else {
 962              #[cfg(feature = "metrics")]
 963              metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0);
 964          };
 965  
 966          if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
 967              // Not being connected to a quorum of validators is begning during startup.
 968              if uptime > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
 969                  error!("Not connected to a quorum of validators");
 970              } else {
 971                  debug!("Not connected to a quorum of validators");
 972              }
 973          }
 974      }
 975  
 976      // Logs the validator participation scores.
 977      #[cfg(feature = "telemetry")]
 978      fn log_participation_scores(&self) {
 979          if let Ok(current_committee) = self.ledger.current_committee() {
 980              // Retrieve the participation scores.
 981              let participation_scores = self.validator_telemetry().get_participation_scores(&current_committee);
 982              // Log the participation scores.
 983              debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
 984              for (address, score) in participation_scores {
 985                  debug!("{}", format!("  {address} - {score:.2}%").dimmed());
 986              }
 987          }
 988      }
 989  
 990      /// This function attempts to connect to any disconnected trusted validators.
 991      fn handle_trusted_validators(&self) {
 992          // Ensure that the trusted nodes are connected.
 993          for validator_ip in &self.trusted_peers() {
 994              // Attempt to connect to the trusted validator.
 995              self.connect(*validator_ip);
 996          }
 997      }
 998  
 999      /// This function keeps the number of bootstrap peers within the allowed range.
1000      async fn handle_bootstrap_peers(&self) {
1001          // Return early if we are in trusted peers only mode.
1002          if self.trusted_peers_only {
1003              return;
1004          }
1005          // Split the bootstrap peers into connected and candidate lists.
1006          let mut candidate_bootstrap = Vec::new();
1007          let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
1008          for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
1009              if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
1010                  candidate_bootstrap.push(bootstrap_ip);
1011              }
1012          }
1013          // If there are not enough connected bootstrap peers, connect to more.
1014          if connected_bootstrap.is_empty() {
1015              // Initialize an RNG.
1016              let rng = &mut OsRng;
1017              // Attempt to connect to a bootstrap peer.
1018              if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
1019                  match self.connect(peer_ip) {
1020                      Some(hdl) => {
1021                          let result = hdl.await;
1022                          if let Err(err) = result {
1023                              warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
1024                          }
1025                      }
1026                      None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
1027                  }
1028              }
1029          }
1030          // Determine if the node is connected to more bootstrap peers than allowed.
1031          let num_surplus = connected_bootstrap.len().saturating_sub(1);
1032          if num_surplus > 0 {
1033              // Initialize an RNG.
1034              let rng = &mut OsRng;
1035              // Proceed to send disconnect requests to these bootstrap peers.
1036              for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
1037                  info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
1038                  <Self as Transport<N>>::send(
1039                      self,
1040                      peer.listener_addr,
1041                      Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
1042                  )
1043                  .await;
1044                  // Disconnect from this peer.
1045                  self.disconnect(peer.listener_addr);
1046              }
1047          }
1048      }
1049  
1050      /// This function attempts to disconnect any validators that are not in the current committee.
1051      fn handle_unauthorized_validators(&self) {
1052          let self_ = self.clone();
1053          tokio::spawn(async move {
1054              // Retrieve the connected validators.
1055              let validators = self_.get_connected_peers();
1056              // Iterate over the validator IPs.
1057              for peer in validators {
1058                  // Skip bootstrapper peers.
1059                  if peer.node_type == NodeType::BootstrapClient {
1060                      continue;
1061                  }
1062                  // Disconnect any validator that is not in the current committee.
1063                  if !self_.is_authorized_validator_ip(peer.listener_addr) {
1064                      warn!(
1065                          "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1066                          peer.listener_addr
1067                      );
1068                      Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1069                      // Disconnect from this peer.
1070                      self_.disconnect(peer.listener_addr);
1071                  }
1072              }
1073          });
1074      }
1075  
1076      /// This function sends a `ValidatorsRequest` to a random validator,
1077      /// if the number of connected validators is less than the minimum.
1078      /// It also attempts to connect to known unconnected validators.
1079      fn handle_min_connected_validators(&self) {
1080          // Attempt to connect to untrusted validators we're not connected to yet.
1081          // The trusted ones are already handled by `handle_trusted_validators`.
1082          let trusted_validators = self.trusted_peers();
1083          if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
1084              for peer in self.get_candidate_peers() {
1085                  if !trusted_validators.contains(&peer.listener_addr) {
1086                      // Attempt to connect to unconnected validators.
1087                      self.connect(peer.listener_addr);
1088                  }
1089              }
1090  
1091              // Retrieve the connected validators.
1092              let validators = self.connected_peers();
1093              // If there are no validator IPs to connect to, return early.
1094              if validators.is_empty() {
1095                  return;
1096              }
1097              // Select a random validator IP.
1098              if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1099                  let self_ = self.clone();
1100                  tokio::spawn(async move {
1101                      // Increment the number of outbound validators requests for this validator.
1102                      self_.cache.increment_outbound_validators_requests(validator_ip);
1103                      // Send a `ValidatorsRequest` to the validator.
1104                      let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1105                  });
1106              }
1107          }
1108      }
1109  
1110      /// Processes a message received from the network.
1111      async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1112          // Process the message. Disconnect if the peer violated the protocol.
1113          if let Err(error) = self.inbound(peer_addr, message).await {
1114              if let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) {
1115                  warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1116                  let self_ = self.clone();
1117                  tokio::spawn(async move {
1118                      Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1119                      // Disconnect from this peer.
1120                      self_.disconnect(peer_ip);
1121                  });
1122              }
1123          }
1124      }
1125  
1126      // Remove addresses whose ban time has expired.
1127      fn handle_banned_ips(&self) {
1128          self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1129      }
1130  
1131      // Update the dynamic validator whitelist.
1132      fn update_validator_whitelist(&self) {
1133          if let Err(e) =
1134              self.save_best_peers(&self.storage_mode, VALIDATOR_WHITELIST_FILENAME, Some(MAX_VALIDATORS_TO_SEND), false)
1135          {
1136              warn!("Couldn't update the validator whitelist: {e}");
1137          }
1138      }
1139  }
1140  
1141  #[async_trait]
1142  impl<N: Network> Transport<N> for Gateway<N> {
1143      /// Sends the given event to specified peer.
1144      ///
1145      /// This method is rate limited to prevent spamming the peer.
1146      ///
1147      /// This function returns as soon as the event is queued to be sent,
1148      /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
1149      /// which can be used to determine when and whether the event has been delivered.
1150      async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1151          macro_rules! send {
1152              ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1153                  // Rate limit the number of certificate requests sent to the peer.
1154                  while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1155                      // Sleep for a short period of time to allow the cache to clear.
1156                      tokio::time::sleep(Duration::from_millis(10)).await;
1157                  }
1158                  // Send the event to the peer.
1159                  $self.send_inner(peer_ip, event)
1160              }};
1161          }
1162  
1163          // Increment the cache for certificate, transmission and block events.
1164          match event {
1165              Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1166                  // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1167                  self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1168                  // Send the event to the peer.
1169                  send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1170              }
1171              Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1172                  // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1173                  self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1174                  // Send the event to the peer.
1175                  send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1176              }
1177              Event::BlockRequest(request) => {
1178                  // Insert the outbound request so we can match it to responses.
1179                  self.cache.insert_outbound_block_request(peer_ip, request);
1180                  // Send the event to the peer and update the outbound event cache, use the general rate limit.
1181                  send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1182              }
1183              _ => {
1184                  // Send the event to the peer, use the general rate limit.
1185                  send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1186              }
1187          }
1188      }
1189  
1190      /// Broadcasts the given event to all connected peers.
1191      // TODO(ljedrz): the event should be checked for the presence of Data::Object, and
1192      // serialized in advance if it's there.
1193      fn broadcast(&self, event: Event<N>) {
1194          // Ensure there are connected peers.
1195          if self.number_of_connected_peers() > 0 {
1196              let self_ = self.clone();
1197              let connected_peers = self.connected_peers();
1198              tokio::spawn(async move {
1199                  // Iterate through all connected peers.
1200                  for peer_ip in connected_peers {
1201                      // Send the event to the peer.
1202                      let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1203                  }
1204              });
1205          }
1206      }
1207  }
1208  
1209  impl<N: Network> P2P for Gateway<N> {
1210      /// Returns a reference to the TCP instance.
1211      fn tcp(&self) -> &Tcp {
1212          &self.tcp
1213      }
1214  }
1215  
1216  #[async_trait]
1217  impl<N: Network> Reading for Gateway<N> {
1218      type Codec = EventCodec<N>;
1219      type Message = Event<N>;
1220  
1221      /// Creates a [`Decoder`] used to interpret messages from the network.
1222      /// The `side` param indicates the connection side **from the node's perspective**.
1223      fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1224          Default::default()
1225      }
1226  
1227      /// Processes a message received from the network.
1228      async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1229          if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1230              let self_ = self.clone();
1231              // Handle BlockRequest and BlockResponse messages in a separate task to not block the
1232              // inbound queue.
1233              tokio::spawn(async move {
1234                  self_.process_message_inner(peer_addr, message).await;
1235              });
1236          } else {
1237              self.process_message_inner(peer_addr, message).await;
1238          }
1239          Ok(())
1240      }
1241  
1242      /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment.
1243      /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks.
1244      fn message_queue_depth(&self) -> usize {
1245          2 * BatchHeader::<N>::MAX_GC_ROUNDS
1246              * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1247              * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1248      }
1249  }
1250  
1251  #[async_trait]
1252  impl<N: Network> Writing for Gateway<N> {
1253      type Codec = EventCodec<N>;
1254      type Message = Event<N>;
1255  
1256      /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1257      /// The `side` parameter indicates the connection side **from the node's perspective**.
1258      fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1259          Default::default()
1260      }
1261  
1262      /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment.
1263      /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation
1264      /// (like slow serialization) or network.
1265      fn message_queue_depth(&self) -> usize {
1266          2 * BatchHeader::<N>::MAX_GC_ROUNDS
1267              * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1268              * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1269      }
1270  }
1271  
1272  #[async_trait]
1273  impl<N: Network> Disconnect for Gateway<N> {
1274      /// Any extra operations to be performed during a disconnect.
1275      async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1276          if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1277              self.downgrade_peer_to_candidate(peer_ip);
1278              // Remove the peer from the sync module. Except for some tests, there is always a sync sender.
1279              if let Some(sync_sender) = self.sync_sender.get() {
1280                  let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
1281                  tokio::spawn(async move {
1282                      if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
1283                          warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
1284                      }
1285                  });
1286              }
1287              // We don't clear this map based on time but only on peer disconnect.
1288              // This is sufficient to avoid infinite growth as the committee has a fixed number
1289              // of members.
1290              self.cache.clear_outbound_validators_requests(peer_ip);
1291              self.cache.clear_outbound_block_requests(peer_ip);
1292              #[cfg(feature = "metrics")]
1293              self.update_metrics();
1294          }
1295      }
1296  }
1297  
1298  #[async_trait]
1299  impl<N: Network> OnConnect for Gateway<N> {
1300      async fn on_connect(&self, peer_addr: SocketAddr) {
1301          if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1302              if let Some(peer) = self.get_connected_peer(listener_addr) {
1303                  if peer.node_type == NodeType::BootstrapClient {
1304                      let _ =
1305                          <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1306                              .await;
1307                  }
1308              }
1309          }
1310      }
1311  }
1312  
1313  #[async_trait]
1314  impl<N: Network> Handshake for Gateway<N> {
1315      /// Performs the handshake protocol.
1316      async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1317          // Perform the handshake.
1318          let peer_addr = connection.addr();
1319          let peer_side = connection.side();
1320  
1321          // Check (or impose) IP-level bans.
1322          #[cfg(not(any(test)))]
1323          if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1324              // If the IP is already banned reject the connection.
1325              if self.is_ip_banned(peer_addr.ip()) {
1326                  trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1327                  return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1328              }
1329  
1330              let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1331  
1332              debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1333              if num_attempts > MAX_CONNECTION_ATTEMPTS {
1334                  self.update_ip_ban(peer_addr.ip());
1335                  trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1336                  return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1337              }
1338          }
1339  
1340          let stream = self.borrow_stream(&mut connection);
1341  
1342          // If this is an inbound connection, we log it, but don't know the listening address yet.
1343          // Otherwise, we can immediately register the listening address.
1344          let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1345              debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1346              None
1347          } else {
1348              debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1349              Some(peer_addr)
1350          };
1351  
1352          // Retrieve the restrictions ID.
1353          let restrictions_id = self.ledger.latest_restrictions_id();
1354  
1355          // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
1356          let handshake_result = if peer_side == ConnectionSide::Responder {
1357              self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1358          } else {
1359              self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1360          };
1361  
1362          if let Some(addr) = listener_addr {
1363              match handshake_result {
1364                  Ok(Some(ref cr)) => {
1365                      let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1366                          NodeType::BootstrapClient
1367                      } else {
1368                          NodeType::Validator
1369                      };
1370                      if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1371                          self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1372                          peer.upgrade_to_connected(
1373                              peer_addr,
1374                              cr.listener_port,
1375                              cr.address,
1376                              node_type,
1377                              cr.version,
1378                              ConnectionMode::Gateway,
1379                          );
1380                      }
1381                      #[cfg(feature = "metrics")]
1382                      self.update_metrics();
1383                      info!("{CONTEXT} Connected to '{addr}'");
1384                  }
1385                  Ok(None) => {
1386                      return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
1387                  }
1388                  Err(error) => {
1389                      if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1390                          // The peer may only be downgraded if it's a ConnectingPeer.
1391                          if peer.is_connecting() {
1392                              peer.downgrade_to_candidate(addr);
1393                          }
1394                      }
1395                      // This error needs to be "repackaged" in order to conform to the return type.
1396                      return Err(error);
1397                  }
1398              }
1399          }
1400  
1401          Ok(connection)
1402      }
1403  }
1404  
1405  /// A macro unwrapping the expected handshake event or returning an error for unexpected events.
1406  macro_rules! expect_event {
1407      ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1408          match $framed.try_next().await? {
1409              // Received the expected event, proceed.
1410              Some($event_ty(data)) => {
1411                  trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1412                  data
1413              }
1414              // Received a disconnect event, abort.
1415              Some(Event::Disconnect(DisconnectEvent { reason })) => {
1416                  return Err(error(format!("{CONTEXT} '{}' disconnected: {reason}", $peer_addr)));
1417              }
1418              // Received an unexpected event, abort.
1419              Some(ty) => {
1420                  return Err(error(format!(
1421                      "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1422                      $peer_addr,
1423                      ty.name(),
1424                      stringify!($event_ty),
1425                  )))
1426              }
1427              // Received nothing.
1428              None => {
1429                  return Err(error(format!(
1430                      "{CONTEXT} the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
1431                      stringify!($event_ty)
1432                  )))
1433              }
1434          }
1435      };
1436  }
1437  
1438  /// Send the given message to the peer.
1439  async fn send_event<N: Network>(
1440      framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1441      peer_addr: SocketAddr,
1442      event: Event<N>,
1443  ) -> io::Result<()> {
1444      trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1445      framed.send(event).await
1446  }
1447  
1448  impl<N: Network> Gateway<N> {
1449      /// The connection initiator side of the handshake.
1450      async fn handshake_inner_initiator<'a>(
1451          &'a self,
1452          peer_addr: SocketAddr,
1453          restrictions_id: Field<N>,
1454          stream: &'a mut TcpStream,
1455      ) -> io::Result<Option<ChallengeRequest<N>>> {
1456          // Introduce the peer into the peer pool.
1457          if !self.add_connecting_peer(peer_addr) {
1458              return Ok(None);
1459          }
1460  
1461          // Construct the stream.
1462          let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1463  
1464          // Initialize an RNG.
1465          let rng = &mut rand::rngs::OsRng;
1466  
1467          /* Step 1: Send the challenge request. */
1468  
1469          // Sample a random nonce.
1470          let our_nonce = rng.r#gen();
1471          // Determine the alphaos SHA to send to the peer.
1472          let current_block_height = self.ledger.latest_block_height();
1473          let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1474          let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1475              (true, Some(sha)) => Some(sha),
1476              _ => None,
1477          };
1478          // Send a challenge request to the peer.
1479          let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, alphaos_sha);
1480          send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1481  
1482          /* Step 2: Receive the peer's challenge response followed by the challenge request. */
1483  
1484          // Listen for the challenge response message.
1485          let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1486          // Listen for the challenge request message.
1487          let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1488  
1489          // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1490          if let Some(reason) = self
1491              .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1492              .await
1493          {
1494              send_event(&mut framed, peer_addr, reason.into()).await?;
1495              return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1496          }
1497          // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1498          if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1499              send_event(&mut framed, peer_addr, reason.into()).await?;
1500              if reason == DisconnectReason::NoReasonGiven {
1501                  // The Alpha address is already connected; no reason to return an error.
1502                  return Ok(None);
1503              } else {
1504                  return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1505              }
1506          }
1507  
1508          /* Step 3: Send the challenge response. */
1509  
1510          // Sign the counterparty nonce.
1511          let response_nonce: u64 = rng.r#gen();
1512          let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1513          let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1514              return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1515          };
1516          // Send the challenge response.
1517          let our_response =
1518              ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1519          send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1520  
1521          Ok(Some(peer_request))
1522      }
1523  
1524      /// The connection responder side of the handshake.
1525      async fn handshake_inner_responder<'a>(
1526          &'a self,
1527          peer_addr: SocketAddr,
1528          peer_ip: &mut Option<SocketAddr>,
1529          restrictions_id: Field<N>,
1530          stream: &'a mut TcpStream,
1531      ) -> io::Result<Option<ChallengeRequest<N>>> {
1532          // Construct the stream.
1533          let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1534  
1535          /* Step 1: Receive the challenge request. */
1536  
1537          // Listen for the challenge request message.
1538          let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1539  
1540          // Ensure the address is not the same as this node.
1541          if self.account.address() == peer_request.address {
1542              return Err(error("Skipping request to connect to self".to_string()));
1543          }
1544  
1545          // Obtain the peer's listening address.
1546          *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1547          let peer_ip = peer_ip.unwrap();
1548  
1549          // Knowing the peer's listening address, ensure it is allowed to connect.
1550          if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1551              return Err(error(format!("{forbidden_message}")));
1552          }
1553  
1554          // Introduce the peer into the peer pool.
1555          if !self.add_connecting_peer(peer_ip) {
1556              return Ok(None);
1557          }
1558  
1559          // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1560          if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1561              send_event(&mut framed, peer_addr, reason.into()).await?;
1562              if reason == DisconnectReason::NoReasonGiven {
1563                  // The Alpha address is already connected; no reason to return an error.
1564                  return Ok(None);
1565              } else {
1566                  return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1567              }
1568          }
1569  
1570          /* Step 2: Send the challenge response followed by own challenge request. */
1571  
1572          // Initialize an RNG.
1573          let rng = &mut rand::rngs::OsRng;
1574  
1575          // Sign the counterparty nonce.
1576          let response_nonce: u64 = rng.r#gen();
1577          let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1578          let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1579              return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1580          };
1581          // Send the challenge response.
1582          let our_response =
1583              ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1584          send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1585  
1586          // Sample a random nonce.
1587          let our_nonce = rng.r#gen();
1588          // Determine the alphaos SHA to send to the peer.
1589          let current_block_height = self.ledger.latest_block_height();
1590          let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1591          let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1592              (true, Some(sha)) => Some(sha),
1593              _ => None,
1594          };
1595          // Send the challenge request.
1596          let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, alphaos_sha);
1597          send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1598  
1599          /* Step 3: Receive the challenge response. */
1600  
1601          // Listen for the challenge response message.
1602          let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1603          // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1604          if let Some(reason) = self
1605              .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1606              .await
1607          {
1608              send_event(&mut framed, peer_addr, reason.into()).await?;
1609              return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1610          }
1611  
1612          Ok(Some(peer_request))
1613      }
1614  
1615      /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
1616      fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1617          // Retrieve the components of the challenge request.
1618          let &ChallengeRequest { version, listener_port, address, nonce: _, ref alphaos_sha } = event;
1619          log_repo_sha_comparison(peer_addr, alphaos_sha, CONTEXT);
1620  
1621          let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1622  
1623          // Ensure the event protocol version is not outdated.
1624          if version < Event::<N>::VERSION {
1625              warn!("{CONTEXT} Dropping '{peer_addr}' on version {version} (outdated)");
1626              return Some(DisconnectReason::OutdatedClientVersion);
1627          }
1628          // If the node is in trusted peers only mode, ensure the peer is trusted.
1629          if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1630              warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1631              return Some(DisconnectReason::ProtocolViolation);
1632          }
1633          if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1634              // Ensure the address is a current committee member.
1635              if !self.is_authorized_validator_address(address) {
1636                  warn!("{CONTEXT} Dropping '{peer_addr}' for being an unauthorized validator ({address})");
1637                  return Some(DisconnectReason::ProtocolViolation);
1638              }
1639          }
1640          // Ensure the address is not already connected.
1641          if self.is_connected_address(address) {
1642              warn!("{CONTEXT} Dropping '{peer_addr}' for being already connected ({address})");
1643              return Some(DisconnectReason::NoReasonGiven);
1644          }
1645          None
1646      }
1647  
1648      /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
1649      async fn verify_challenge_response(
1650          &self,
1651          peer_addr: SocketAddr,
1652          peer_address: Address<N>,
1653          response: ChallengeResponse<N>,
1654          expected_restrictions_id: Field<N>,
1655          expected_nonce: u64,
1656      ) -> Option<DisconnectReason> {
1657          // Retrieve the components of the challenge response.
1658          let ChallengeResponse { restrictions_id, signature, nonce } = response;
1659  
1660          // Verify the restrictions ID.
1661          if restrictions_id != expected_restrictions_id {
1662              warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1663              return Some(DisconnectReason::InvalidChallengeResponse);
1664          }
1665          // Perform the deferred non-blocking deserialization of the signature.
1666          let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1667              warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1668              return Some(DisconnectReason::InvalidChallengeResponse);
1669          };
1670          // Verify the signature.
1671          if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1672              warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1673              return Some(DisconnectReason::InvalidChallengeResponse);
1674          }
1675          None
1676      }
1677  }
1678  
1679  #[cfg(test)]
1680  mod prop_tests {
1681      use crate::{
1682          gateway::prop_tests::GatewayAddress::{Dev, Prod},
1683          helpers::{init_primary_channels, init_worker_channels, Storage},
1684          Gateway,
1685          Worker,
1686          MAX_WORKERS,
1687          MEMORY_POOL_PORT,
1688      };
1689      use alphaos_account::Account;
1690      use alphaos_node_bft_ledger_service::MockLedgerService;
1691      use alphaos_node_bft_storage_service::BFTMemoryService;
1692      use alphaos_node_network::PeerPoolHandling;
1693      use alphaos_node_tcp::P2P;
1694      use alphastd::StorageMode;
1695      use alphavm::{
1696          ledger::{
1697              committee::{
1698                  prop_tests::{CommitteeContext, ValidatorSet},
1699                  test_helpers::sample_committee_for_round_and_members,
1700                  Committee,
1701              },
1702              narwhal::{batch_certificate::test_helpers::sample_batch_certificate_for_round, BatchHeader},
1703          },
1704          prelude::{MainnetV0, PrivateKey},
1705          utilities::TestRng,
1706      };
1707  
1708      use indexmap::{IndexMap, IndexSet};
1709      use proptest::{
1710          prelude::{any, any_with, Arbitrary, BoxedStrategy, Just, Strategy},
1711          sample::Selector,
1712      };
1713      use std::{
1714          fmt::{Debug, Formatter},
1715          net::{IpAddr, Ipv4Addr, SocketAddr},
1716          sync::Arc,
1717      };
1718      use test_strategy::proptest;
1719  
1720      type CurrentNetwork = MainnetV0;
1721  
1722      impl Debug for Gateway<CurrentNetwork> {
1723          fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1724              // TODO implement Debug properly and move it over to production code
1725              f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1726          }
1727      }
1728  
1729      #[derive(Debug, test_strategy::Arbitrary)]
1730      enum GatewayAddress {
1731          Dev(u8),
1732          Prod(Option<SocketAddr>),
1733      }
1734  
1735      impl GatewayAddress {
1736          fn ip(&self) -> Option<SocketAddr> {
1737              if let GatewayAddress::Prod(ip) = self {
1738                  return *ip;
1739              }
1740              None
1741          }
1742  
1743          fn port(&self) -> Option<u16> {
1744              if let GatewayAddress::Dev(port) = self {
1745                  return Some(*port as u16);
1746              }
1747              None
1748          }
1749      }
1750  
1751      impl Arbitrary for Gateway<CurrentNetwork> {
1752          type Parameters = ();
1753          type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1754  
1755          fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1756              any_valid_dev_gateway()
1757                  .prop_map(|(storage, _, private_key, address)| {
1758                      Gateway::new(
1759                          Account::try_from(private_key).unwrap(),
1760                          storage.clone(),
1761                          storage.ledger().clone(),
1762                          address.ip(),
1763                          &[],
1764                          false,
1765                          StorageMode::new_test(None),
1766                          address.port(),
1767                      )
1768                      .unwrap()
1769                  })
1770                  .boxed()
1771          }
1772      }
1773  
1774      type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1775  
1776      fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1777          (any::<CommitteeContext>(), any::<Selector>())
1778              .prop_flat_map(|(context, account_selector)| {
1779                  let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1780                  (
1781                      any_with::<Storage<CurrentNetwork>>(context.clone()),
1782                      Just(context),
1783                      Just(account_selector.select(validators)),
1784                      0u8..,
1785                  )
1786                      .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1787              })
1788              .boxed()
1789      }
1790  
1791      fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1792          (any::<CommitteeContext>(), any::<Selector>())
1793              .prop_flat_map(|(context, account_selector)| {
1794                  let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1795                  (
1796                      any_with::<Storage<CurrentNetwork>>(context.clone()),
1797                      Just(context),
1798                      Just(account_selector.select(validators)),
1799                      any::<Option<SocketAddr>>(),
1800                  )
1801                      .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1802              })
1803              .boxed()
1804      }
1805  
1806      #[proptest]
1807      fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1808          let (storage, _, private_key, dev) = input;
1809          let account = Account::try_from(private_key).unwrap();
1810  
1811          let gateway = Gateway::new(
1812              account.clone(),
1813              storage.clone(),
1814              storage.ledger().clone(),
1815              dev.ip(),
1816              &[],
1817              false,
1818              StorageMode::new_test(None),
1819              dev.port(),
1820          )
1821          .unwrap();
1822          let tcp_config = gateway.tcp().config();
1823          assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1824          assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1825  
1826          let tcp_config = gateway.tcp().config();
1827          assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1828          assert_eq!(gateway.account().address(), account.address());
1829      }
1830  
1831      #[proptest]
1832      fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1833          let (storage, _, private_key, dev) = input;
1834          let account = Account::try_from(private_key).unwrap();
1835  
1836          let gateway = Gateway::new(
1837              account.clone(),
1838              storage.clone(),
1839              storage.ledger().clone(),
1840              dev.ip(),
1841              &[],
1842              false,
1843              StorageMode::new_test(None),
1844              dev.port(),
1845          )
1846          .unwrap();
1847          let tcp_config = gateway.tcp().config();
1848          if let Some(socket_addr) = dev.ip() {
1849              assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1850              assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1851          } else {
1852              assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1853              assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1854          }
1855  
1856          let tcp_config = gateway.tcp().config();
1857          assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1858          assert_eq!(gateway.account().address(), account.address());
1859      }
1860  
1861      #[proptest(async = "tokio")]
1862      async fn gateway_start(
1863          #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1864          #[strategy(0..MAX_WORKERS)] workers_count: u8,
1865      ) {
1866          let (storage, committee, private_key, dev) = input;
1867          let committee = committee.0;
1868          let worker_storage = storage.clone();
1869          let account = Account::try_from(private_key).unwrap();
1870  
1871          let gateway = Gateway::new(
1872              account,
1873              storage.clone(),
1874              storage.ledger().clone(),
1875              dev.ip(),
1876              &[],
1877              false,
1878              StorageMode::new_test(None),
1879              dev.port(),
1880          )
1881          .unwrap();
1882  
1883          let (primary_sender, _) = init_primary_channels();
1884  
1885          let (workers, worker_senders) = {
1886              // Construct a map of the worker senders.
1887              let mut tx_workers = IndexMap::new();
1888              let mut workers = IndexMap::new();
1889  
1890              // Initialize the workers.
1891              for id in 0..workers_count {
1892                  // Construct the worker channels.
1893                  let (tx_worker, rx_worker) = init_worker_channels();
1894                  // Construct the worker instance.
1895                  let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1896                  let worker =
1897                      Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1898                          .unwrap();
1899                  // Run the worker instance.
1900                  worker.run(rx_worker);
1901  
1902                  // Add the worker and the worker sender to maps
1903                  workers.insert(id, worker);
1904                  tx_workers.insert(id, tx_worker);
1905              }
1906              (workers, tx_workers)
1907          };
1908  
1909          gateway.run(primary_sender, worker_senders, None).await;
1910          assert_eq!(
1911              gateway.local_ip(),
1912              SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1913          );
1914          assert_eq!(gateway.num_workers(), workers.len() as u8);
1915      }
1916  
1917      #[proptest]
1918      fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1919          let rng = &mut TestRng::default();
1920  
1921          // Initialize the round parameters.
1922          let current_round = 2;
1923          let committee_size = 4;
1924          let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1925          let (_, _, private_key, dev) = input;
1926          let account = Account::try_from(private_key).unwrap();
1927  
1928          // Sample the certificates.
1929          let mut certificates = IndexSet::new();
1930          for _ in 0..committee_size {
1931              certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1932          }
1933          let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1934          // Initialize the committee.
1935          let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1936          // Sample extra certificates from non-committee members.
1937          for _ in 0..committee_size {
1938              certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1939          }
1940          // Initialize the ledger.
1941          let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1942          // Initialize the storage.
1943          let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1944          // Initialize the gateway.
1945          let gateway = Gateway::new(
1946              account.clone(),
1947              storage.clone(),
1948              ledger.clone(),
1949              dev.ip(),
1950              &[],
1951              false,
1952              StorageMode::new_test(None),
1953              dev.port(),
1954          )
1955          .unwrap();
1956          // Insert certificate to the storage.
1957          for certificate in certificates.iter() {
1958              storage.testing_only_insert_certificate_testing_only(certificate.clone());
1959          }
1960          // Check that the current committee members are authorized validators.
1961          for i in 0..certificates.clone().len() {
1962              let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1963              if i < committee_size {
1964                  assert!(is_authorized);
1965              } else {
1966                  assert!(!is_authorized);
1967              }
1968          }
1969      }
1970  }