network.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::{ 20 bft::{ 21 events::{self, Event}, 22 MAX_VALIDATORS_TO_SEND, 23 }, 24 bootstrap_client::codec::BootstrapClientCodec, 25 network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver}, 26 router::{ 27 messages::{self, Message}, 28 MAX_PEERS_TO_SEND, 29 }, 30 tcp::{protocols::*, ConnectionSide, Tcp, P2P}, 31 BootstrapClient, 32 }; 33 use alphavm::prelude::Network; 34 35 use indexmap::IndexMap; 36 #[cfg(feature = "locktick")] 37 use locktick::parking_lot::RwLock; 38 #[cfg(not(feature = "locktick"))] 39 use parking_lot::RwLock; 40 use std::{collections::HashMap, io, net::SocketAddr}; 41 use tokio::time::sleep; 42 use tokio_util::codec::Decoder; 43 44 impl<N: Network> P2P for BootstrapClient<N> { 45 fn tcp(&self) -> &Tcp { 46 &self.tcp 47 } 48 } 49 50 impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> { 51 const MAXIMUM_POOL_SIZE: usize = 10_000; 52 const OWNER: &'static str = "[Network]"; 53 const PEER_SLASHING_COUNT: usize = 200; 54 55 fn is_dev(&self) -> bool { 56 self.dev.is_some() 57 } 58 59 fn trusted_peers_only(&self) -> bool { 60 false 61 } 62 63 fn node_type(&self) -> NodeType { 64 NodeType::BootstrapClient 65 } 66 67 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> { 68 &self.peer_pool 69 } 70 71 fn resolver(&self) -> &RwLock<Resolver<N>> { 72 &self.resolver 73 } 74 } 75 76 /// The bootstrap client can handle both validator and non-validator messages. 77 #[derive(Debug)] 78 pub enum MessageOrEvent<N: Network> { 79 Message(Message<N>), 80 Event(Event<N>), 81 } 82 83 #[async_trait] 84 impl<N: Network> OnConnect for BootstrapClient<N> { 85 async fn on_connect(&self, peer_addr: SocketAddr) { 86 // If the peer is connected in validator (Gateway) mode, save it to the collection 87 // of known validators. 88 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) { 89 if let Some(peer) = self.get_connected_peer(listener_addr) { 90 if peer.node_type == NodeType::Validator { 91 self.known_validators.write().insert(listener_addr, (peer.alpha_addr, peer.connection_mode)); 92 } 93 } 94 } 95 // The peers should only ask us for the peer list; spawn a task that will 96 // terminate the connection after a while. 97 let tcp = self.tcp().clone(); 98 tokio::spawn(async move { 99 sleep(Self::CONNECTION_LIFETIME).await; 100 tcp.disconnect(peer_addr).await; 101 }); 102 } 103 } 104 105 #[async_trait] 106 impl<N: Network> Disconnect for BootstrapClient<N> { 107 /// Any extra operations to be performed during a disconnect. 108 async fn handle_disconnect(&self, peer_addr: SocketAddr) { 109 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) { 110 self.downgrade_peer_to_candidate(listener_addr); 111 } 112 } 113 } 114 115 #[async_trait] 116 impl<N: Network> Reading for BootstrapClient<N> { 117 type Codec = BootstrapClientCodec<N>; 118 type Message = <BootstrapClientCodec<N> as Decoder>::Item; 119 120 /// Creates a [`Decoder`] used to interpret messages from the network. 121 /// The `side` param indicates the connection side **from the node's perspective**. 122 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 123 Default::default() 124 } 125 126 /// Processes a message received from the network. 127 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 128 // Identify the connected peer. 129 let Some(listener_addr) = self.resolve_to_listener(peer_addr) else { 130 // Already disconnecting, ignore. 131 return Ok(()); 132 }; 133 134 // Handle the right peer request. 135 match message { 136 MessageOrEvent::Message(Message::PeerRequest(_)) => { 137 debug!("Received a PeerRequest from '{listener_addr}'"); 138 let mut peers = self.get_candidate_peers(); 139 140 // In order to filter out validators properly, we'll need the 141 // peer's node type and the list of validators. 142 let Some(peer) = self.get_connected_peer(listener_addr) else { 143 return Ok(()); 144 }; 145 let validators = self.get_validator_addrs().await; 146 147 if peer.node_type == NodeType::Validator { 148 // Filter out Gateway addresses. 149 peers.retain(|peer| { 150 validators 151 .get(&peer.listener_addr) 152 .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway) 153 .unwrap_or(true) 154 }); 155 } else { 156 // Filter out all validator addresses. 157 peers.retain(|peer| !validators.contains_key(&peer.listener_addr)); 158 } 159 peers.truncate(MAX_PEERS_TO_SEND); 160 let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>(); 161 162 debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len()); 163 let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers })); 164 if let Err(err) = self.unicast(peer_addr, msg)?.await { 165 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting"); 166 } else { 167 debug!("Disconnecting from '{listener_addr}' - peers provided"); 168 } 169 170 self.tcp().disconnect(peer_addr).await; 171 } 172 MessageOrEvent::Event(Event::ValidatorsRequest(_)) => { 173 debug!("Received a ValidatorsRequest from '{listener_addr}'"); 174 175 // Procure a list of applicable validator addresses. 176 let validators = self.get_validator_addrs().await; 177 let validators = validators 178 .into_iter() 179 .filter_map(|(listener_addr, (alpha_addr, connection_mode))| { 180 // Only pick addresses connected in Gateway mode. 181 (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, alpha_addr)) 182 }) 183 .take(MAX_VALIDATORS_TO_SEND) 184 .collect::<IndexMap<_, _>>(); 185 186 debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len()); 187 let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators })); 188 if let Err(err) = self.unicast(peer_addr, msg)?.await { 189 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting"); 190 } else { 191 debug!("Disconnecting from '{listener_addr}' - peers provided"); 192 } 193 194 self.tcp().disconnect(peer_addr).await; 195 } 196 msg => { 197 let name = match msg { 198 MessageOrEvent::Message(msg) => msg.name(), 199 MessageOrEvent::Event(msg) => msg.name(), 200 }; 201 trace!("Ignoring an unhandled message ({name}) from {listener_addr}"); 202 } 203 } 204 205 Ok(()) 206 } 207 } 208 209 #[async_trait] 210 impl<N: Network> Writing for BootstrapClient<N> { 211 type Codec = BootstrapClientCodec<N>; 212 type Message = MessageOrEvent<N>; 213 214 /// Creates an [`Encoder`] used to write the outbound messages to the target stream. 215 /// The `side` parameter indicates the connection side **from the node's perspective**. 216 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 217 Default::default() 218 } 219 }