/ node / src / bootstrap_client / network.rs
network.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use crate::{
 20      bft::{
 21          events::{self, Event},
 22          MAX_VALIDATORS_TO_SEND,
 23      },
 24      bootstrap_client::codec::BootstrapClientCodec,
 25      network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver},
 26      router::{
 27          messages::{self, Message},
 28          MAX_PEERS_TO_SEND,
 29      },
 30      tcp::{protocols::*, ConnectionSide, Tcp, P2P},
 31      BootstrapClient,
 32  };
 33  use alphavm::prelude::Network;
 34  
 35  use indexmap::IndexMap;
 36  #[cfg(feature = "locktick")]
 37  use locktick::parking_lot::RwLock;
 38  #[cfg(not(feature = "locktick"))]
 39  use parking_lot::RwLock;
 40  use std::{collections::HashMap, io, net::SocketAddr};
 41  use tokio::time::sleep;
 42  use tokio_util::codec::Decoder;
 43  
 44  impl<N: Network> P2P for BootstrapClient<N> {
 45      fn tcp(&self) -> &Tcp {
 46          &self.tcp
 47      }
 48  }
 49  
 50  impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
 51      const MAXIMUM_POOL_SIZE: usize = 10_000;
 52      const OWNER: &'static str = "[Network]";
 53      const PEER_SLASHING_COUNT: usize = 200;
 54  
 55      fn is_dev(&self) -> bool {
 56          self.dev.is_some()
 57      }
 58  
 59      fn trusted_peers_only(&self) -> bool {
 60          false
 61      }
 62  
 63      fn node_type(&self) -> NodeType {
 64          NodeType::BootstrapClient
 65      }
 66  
 67      fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
 68          &self.peer_pool
 69      }
 70  
 71      fn resolver(&self) -> &RwLock<Resolver<N>> {
 72          &self.resolver
 73      }
 74  }
 75  
 76  /// The bootstrap client can handle both validator and non-validator messages.
 77  #[derive(Debug)]
 78  pub enum MessageOrEvent<N: Network> {
 79      Message(Message<N>),
 80      Event(Event<N>),
 81  }
 82  
 83  #[async_trait]
 84  impl<N: Network> OnConnect for BootstrapClient<N> {
 85      async fn on_connect(&self, peer_addr: SocketAddr) {
 86          // If the peer is connected in validator (Gateway) mode, save it to the collection
 87          // of known validators.
 88          if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
 89              if let Some(peer) = self.get_connected_peer(listener_addr) {
 90                  if peer.node_type == NodeType::Validator {
 91                      self.known_validators.write().insert(listener_addr, (peer.alpha_addr, peer.connection_mode));
 92                  }
 93              }
 94          }
 95          // The peers should only ask us for the peer list; spawn a task that will
 96          // terminate the connection after a while.
 97          let tcp = self.tcp().clone();
 98          tokio::spawn(async move {
 99              sleep(Self::CONNECTION_LIFETIME).await;
100              tcp.disconnect(peer_addr).await;
101          });
102      }
103  }
104  
105  #[async_trait]
106  impl<N: Network> Disconnect for BootstrapClient<N> {
107      /// Any extra operations to be performed during a disconnect.
108      async fn handle_disconnect(&self, peer_addr: SocketAddr) {
109          if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
110              self.downgrade_peer_to_candidate(listener_addr);
111          }
112      }
113  }
114  
115  #[async_trait]
116  impl<N: Network> Reading for BootstrapClient<N> {
117      type Codec = BootstrapClientCodec<N>;
118      type Message = <BootstrapClientCodec<N> as Decoder>::Item;
119  
120      /// Creates a [`Decoder`] used to interpret messages from the network.
121      /// The `side` param indicates the connection side **from the node's perspective**.
122      fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
123          Default::default()
124      }
125  
126      /// Processes a message received from the network.
127      async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
128          // Identify the connected peer.
129          let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
130              // Already disconnecting, ignore.
131              return Ok(());
132          };
133  
134          // Handle the right peer request.
135          match message {
136              MessageOrEvent::Message(Message::PeerRequest(_)) => {
137                  debug!("Received a PeerRequest from '{listener_addr}'");
138                  let mut peers = self.get_candidate_peers();
139  
140                  // In order to filter out validators properly, we'll need the
141                  // peer's node type and the list of validators.
142                  let Some(peer) = self.get_connected_peer(listener_addr) else {
143                      return Ok(());
144                  };
145                  let validators = self.get_validator_addrs().await;
146  
147                  if peer.node_type == NodeType::Validator {
148                      // Filter out Gateway addresses.
149                      peers.retain(|peer| {
150                          validators
151                              .get(&peer.listener_addr)
152                              .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway)
153                              .unwrap_or(true)
154                      });
155                  } else {
156                      // Filter out all validator addresses.
157                      peers.retain(|peer| !validators.contains_key(&peer.listener_addr));
158                  }
159                  peers.truncate(MAX_PEERS_TO_SEND);
160                  let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
161  
162                  debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
163                  let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
164                  if let Err(err) = self.unicast(peer_addr, msg)?.await {
165                      warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
166                  } else {
167                      debug!("Disconnecting from '{listener_addr}' - peers provided");
168                  }
169  
170                  self.tcp().disconnect(peer_addr).await;
171              }
172              MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
173                  debug!("Received a ValidatorsRequest from '{listener_addr}'");
174  
175                  // Procure a list of applicable validator addresses.
176                  let validators = self.get_validator_addrs().await;
177                  let validators = validators
178                      .into_iter()
179                      .filter_map(|(listener_addr, (alpha_addr, connection_mode))| {
180                          // Only pick addresses connected in Gateway mode.
181                          (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, alpha_addr))
182                      })
183                      .take(MAX_VALIDATORS_TO_SEND)
184                      .collect::<IndexMap<_, _>>();
185  
186                  debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
187                  let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
188                  if let Err(err) = self.unicast(peer_addr, msg)?.await {
189                      warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
190                  } else {
191                      debug!("Disconnecting from '{listener_addr}' - peers provided");
192                  }
193  
194                  self.tcp().disconnect(peer_addr).await;
195              }
196              msg => {
197                  let name = match msg {
198                      MessageOrEvent::Message(msg) => msg.name(),
199                      MessageOrEvent::Event(msg) => msg.name(),
200                  };
201                  trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
202              }
203          }
204  
205          Ok(())
206      }
207  }
208  
209  #[async_trait]
210  impl<N: Network> Writing for BootstrapClient<N> {
211      type Codec = BootstrapClientCodec<N>;
212      type Message = MessageOrEvent<N>;
213  
214      /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
215      /// The `side` parameter indicates the connection side **from the node's perspective**.
216      fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
217          Default::default()
218      }
219  }