router.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use super::*; 20 use alphaos_node_network::PeerPoolHandling; 21 use alphaos_node_router::{ 22 messages::{ 23 BlockRequest, 24 BlockResponse, 25 DataBlocks, 26 DisconnectReason, 27 MessageCodec, 28 PeerRequest, 29 Ping, 30 Pong, 31 PuzzleResponse, 32 UnconfirmedTransaction, 33 }, 34 Routing, 35 }; 36 use alphaos_node_tcp::{Connection, ConnectionSide, Tcp}; 37 use alphavm::{ 38 console::network::{ConsensusVersion, Network}, 39 ledger::{block::Transaction, narwhal::Data}, 40 utilities::flatten_error, 41 }; 42 43 use std::{io, net::SocketAddr}; 44 45 impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> { 46 /// Returns a reference to the TCP instance. 47 fn tcp(&self) -> &Tcp { 48 self.router.tcp() 49 } 50 } 51 52 #[async_trait] 53 impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> { 54 /// Performs the handshake protocol. 55 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> { 56 // Perform the handshake. 57 let peer_addr = connection.addr(); 58 let conn_side = connection.side(); 59 let stream = self.borrow_stream(&mut connection); 60 let genesis_header = *self.genesis.header(); 61 let restrictions_id = self.ledger.vm().restrictions().restrictions_id(); 62 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?; 63 64 Ok(connection) 65 } 66 } 67 68 #[async_trait] 69 impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C> { 70 async fn on_connect(&self, peer_addr: SocketAddr) { 71 // Resolve the peer address to the listener address. 72 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) { 73 if let Some(peer) = self.router().get_connected_peer(listener_addr) { 74 // If it's a bootstrap client, only request its peers. 75 if peer.node_type == NodeType::BootstrapClient { 76 self.router().send(listener_addr, Message::PeerRequest(PeerRequest)); 77 } else { 78 // Send the first `Ping` message to the peer. 79 self.ping.on_peer_connected(listener_addr); 80 } 81 } 82 } 83 } 84 } 85 86 #[async_trait] 87 impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> { 88 /// Any extra operations to be performed during a disconnect. 89 async fn handle_disconnect(&self, peer_addr: SocketAddr) { 90 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) { 91 self.sync.remove_peer(&peer_ip); 92 93 self.router.downgrade_peer_to_candidate(peer_ip); 94 95 // Clear cached entries applicable to the peer. 96 self.router.cache().clear_peer_entries(peer_ip); 97 #[cfg(feature = "metrics")] 98 self.router.update_metrics(); 99 } 100 } 101 } 102 103 #[async_trait] 104 impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> { 105 type Codec = MessageCodec<N>; 106 type Message = Message<N>; 107 108 /// Creates a [`Decoder`] used to interpret messages from the network. 109 /// The `side` param indicates the connection side **from the node's perspective**. 110 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 111 Default::default() 112 } 113 114 /// Processes a message received from the network. 115 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 116 let clone = self.clone(); 117 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) { 118 // Handle BlockRequest and BlockResponse messages in a separate task to not block the 119 // inbound queue. 120 tokio::spawn(async move { 121 clone.process_message_inner(peer_addr, message).await; 122 }); 123 } else { 124 self.process_message_inner(peer_addr, message).await; 125 } 126 Ok(()) 127 } 128 } 129 130 impl<N: Network, C: ConsensusStorage<N>> Client<N, C> { 131 async fn process_message_inner( 132 &self, 133 peer_addr: SocketAddr, 134 message: <Client<N, C> as alphaos_node_tcp::protocols::Reading>::Message, 135 ) { 136 // Process the message. Disconnect if the peer violated the protocol. 137 if let Err(error) = self.inbound(peer_addr, message).await { 138 warn!("Failed to process inbound message from '{peer_addr}' - {error}"); 139 if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { 140 warn!("Disconnecting from '{peer_ip}' for protocol violation"); 141 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into())); 142 // Disconnect from this peer. 143 self.router().disconnect(peer_ip); 144 } 145 } 146 } 147 } 148 149 #[async_trait] 150 impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {} 151 152 impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {} 153 154 impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> { 155 /// Returns a reference to the router. 156 fn router(&self) -> &Router<N> { 157 &self.router 158 } 159 160 /// Returns `true` if the node is synced up to the latest block (within the given tolerance). 161 fn is_block_synced(&self) -> bool { 162 self.sync.is_block_synced() 163 } 164 165 /// Returns the number of blocks this node is behind the greatest peer height, 166 /// or `None` if not connected to peers yet. 167 fn num_blocks_behind(&self) -> Option<u32> { 168 self.sync.num_blocks_behind() 169 } 170 171 /// Returns the current sync speed in blocks per second. 172 fn get_sync_speed(&self) -> f64 { 173 self.sync.get_sync_speed() 174 } 175 } 176 177 #[async_trait] 178 impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> { 179 /// Returns `true` if the message version is valid. 180 fn is_valid_message_version(&self, message_version: u32) -> bool { 181 self.router().is_valid_message_version(message_version) 182 } 183 184 /// Handles a `BlockRequest` message. 185 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool { 186 let BlockRequest { start_height, end_height } = &message; 187 188 // Get the latest consensus version, i.e., the one for the last block's height. 189 let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) { 190 Ok(version) => version, 191 Err(err) => { 192 let err = err.context("Failed to retrieve consensus version"); 193 error!("{}", flatten_error(&err)); 194 return false; 195 } 196 }; 197 198 // Retrieve the blocks within the requested range. 199 let blocks = match self.ledger.get_blocks(*start_height..*end_height) { 200 Ok(blocks) => DataBlocks(blocks), 201 Err(error) => { 202 let err = 203 error.context(format!("Failed to retrieve blocks {start_height} to {end_height} from the ledger")); 204 error!("{}", flatten_error(&err)); 205 return false; 206 } 207 }; 208 209 // Send the `BlockResponse` message to the peer. 210 self.router() 211 .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version))); 212 true 213 } 214 215 /// Handles a `BlockResponse` message. 216 fn block_response( 217 &self, 218 peer_ip: SocketAddr, 219 blocks: Vec<Block<N>>, 220 latest_consensus_version: Option<ConsensusVersion>, 221 ) -> bool { 222 // We do not need to explicitly sync here because insert_block_response, will wake up the sync task. 223 if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) { 224 warn!("{}", flatten_error(err.context("Failed to insert block response"))); 225 false 226 } else { 227 true 228 } 229 } 230 231 /// Processes the block locators and sends back a `Pong` message. 232 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool { 233 // If block locators were provided, then update the peer in the sync pool. 234 if let Some(block_locators) = message.block_locators { 235 // Check the block locators are valid, and update the peer in the sync pool. 236 if let Err(err) = self.sync.update_peer_locators(peer_ip, &block_locators) { 237 warn!("{}", flatten_error(err.context(format!("Peer '{peer_ip}' sent invalid block locators")))); 238 return false; 239 } 240 241 let last_peer_height = Some(block_locators.latest_locator_height()); 242 self.router().update_connected_peer(&peer_ip, |peer| peer.last_height_seen = last_peer_height); 243 } 244 245 // Send a `Pong` message to the peer. 246 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) })); 247 true 248 } 249 250 /// Sleeps for a period and then sends a `Ping` message to the peer. 251 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool { 252 self.ping.on_pong_received(peer_ip); 253 true 254 } 255 256 /// Retrieves the latest epoch hash and latest block header, and returns the puzzle response to the peer. 257 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool { 258 // Retrieve the latest epoch hash. 259 let epoch_hash = match self.ledger.latest_epoch_hash() { 260 Ok(epoch_hash) => epoch_hash, 261 Err(err) => { 262 let err = err.context(format!("Failed to prepare a puzzle request for '{peer_ip}'")); 263 error!("{}", flatten_error(err)); 264 return false; 265 } 266 }; 267 // Retrieve the latest block header. 268 let block_header = Data::Object(self.ledger.latest_header()); 269 // Send the `PuzzleResponse` message to the peer. 270 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header })); 271 true 272 } 273 274 /// Saves the latest epoch hash and latest block header in the node. 275 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool { 276 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation); 277 false 278 } 279 280 /// Propagates the unconfirmed solution to all connected validators. 281 async fn unconfirmed_solution( 282 &self, 283 peer_ip: SocketAddr, 284 serialized: UnconfirmedSolution<N>, 285 solution: Solution<N>, 286 ) -> bool { 287 // Try to add the solution to the verification queue, without changing LRU status of known solutions. 288 let mut solution_queue = self.solution_queue.lock(); 289 if !solution_queue.contains(&solution.id()) { 290 solution_queue.put(solution.id(), (peer_ip, serialized, solution)); 291 } 292 293 true // Maintain the connection 294 } 295 296 /// Handles an `UnconfirmedTransaction` message. 297 async fn unconfirmed_transaction( 298 &self, 299 peer_ip: SocketAddr, 300 serialized: UnconfirmedTransaction<N>, 301 transaction: Transaction<N>, 302 ) -> bool { 303 // Try to add the transaction to a verification queue, without changing LRU status of known transactions. 304 match &transaction { 305 Transaction::<N>::Fee(..) => (), // Fee Transactions are not valid. 306 Transaction::<N>::Deploy(..) => { 307 let mut deploy_queue = self.deploy_queue.lock(); 308 if !deploy_queue.contains(&transaction.id()) { 309 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction)); 310 } 311 } 312 Transaction::<N>::Execute(..) => { 313 let mut execute_queue = self.execute_queue.lock(); 314 if !execute_queue.contains(&transaction.id()) { 315 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction)); 316 } 317 } 318 } 319 320 true // Maintain the connection 321 } 322 }