router.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use super::*; 17 use alphaos_node_network::PeerPoolHandling; 18 use alphaos_node_router::{ 19 Routing, 20 messages::{ 21 BlockRequest, 22 BlockResponse, 23 DataBlocks, 24 DisconnectReason, 25 MessageCodec, 26 PeerRequest, 27 Ping, 28 Pong, 29 PuzzleResponse, 30 UnconfirmedTransaction, 31 }, 32 }; 33 use alphaos_node_tcp::{Connection, ConnectionSide, Tcp}; 34 use alphavm::{ 35 console::network::{ConsensusVersion, Network}, 36 ledger::{block::Transaction, narwhal::Data}, 37 utilities::flatten_error, 38 }; 39 40 use std::{io, net::SocketAddr}; 41 42 impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> { 43 /// Returns a reference to the TCP instance. 44 fn tcp(&self) -> &Tcp { 45 self.router.tcp() 46 } 47 } 48 49 #[async_trait] 50 impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> { 51 /// Performs the handshake protocol. 52 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> { 53 // Perform the handshake. 54 let peer_addr = connection.addr(); 55 let conn_side = connection.side(); 56 let stream = self.borrow_stream(&mut connection); 57 let genesis_header = *self.genesis.header(); 58 let restrictions_id = self.ledger.vm().restrictions().restrictions_id(); 59 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?; 60 61 Ok(connection) 62 } 63 } 64 65 #[async_trait] 66 impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C> { 67 async fn on_connect(&self, peer_addr: SocketAddr) { 68 // Resolve the peer address to the listener address. 69 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) { 70 if let Some(peer) = self.router().get_connected_peer(listener_addr) { 71 // If it's a bootstrap client, only request its peers. 72 if peer.node_type == NodeType::BootstrapClient { 73 self.router().send(listener_addr, Message::PeerRequest(PeerRequest)); 74 } else { 75 // Send the first `Ping` message to the peer. 76 self.ping.on_peer_connected(listener_addr); 77 } 78 } 79 } 80 } 81 } 82 83 #[async_trait] 84 impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> { 85 /// Any extra operations to be performed during a disconnect. 86 async fn handle_disconnect(&self, peer_addr: SocketAddr) { 87 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) { 88 self.sync.remove_peer(&peer_ip); 89 90 self.router.downgrade_peer_to_candidate(peer_ip); 91 92 // Clear cached entries applicable to the peer. 93 self.router.cache().clear_peer_entries(peer_ip); 94 #[cfg(feature = "metrics")] 95 self.router.update_metrics(); 96 } 97 } 98 } 99 100 #[async_trait] 101 impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> { 102 type Codec = MessageCodec<N>; 103 type Message = Message<N>; 104 105 /// Creates a [`Decoder`] used to interpret messages from the network. 106 /// The `side` param indicates the connection side **from the node's perspective**. 107 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 108 Default::default() 109 } 110 111 /// Processes a message received from the network. 112 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 113 let clone = self.clone(); 114 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) { 115 // Handle BlockRequest and BlockResponse messages in a separate task to not block the 116 // inbound queue. 117 tokio::spawn(async move { 118 clone.process_message_inner(peer_addr, message).await; 119 }); 120 } else { 121 self.process_message_inner(peer_addr, message).await; 122 } 123 Ok(()) 124 } 125 } 126 127 impl<N: Network, C: ConsensusStorage<N>> Client<N, C> { 128 async fn process_message_inner( 129 &self, 130 peer_addr: SocketAddr, 131 message: <Client<N, C> as alphaos_node_tcp::protocols::Reading>::Message, 132 ) { 133 // Process the message. Disconnect if the peer violated the protocol. 134 if let Err(error) = self.inbound(peer_addr, message).await { 135 warn!("Failed to process inbound message from '{peer_addr}' - {error}"); 136 if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { 137 warn!("Disconnecting from '{peer_ip}' for protocol violation"); 138 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into())); 139 // Disconnect from this peer. 140 self.router().disconnect(peer_ip); 141 } 142 } 143 } 144 } 145 146 #[async_trait] 147 impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {} 148 149 impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {} 150 151 impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> { 152 /// Returns a reference to the router. 153 fn router(&self) -> &Router<N> { 154 &self.router 155 } 156 157 /// Returns `true` if the node is synced up to the latest block (within the given tolerance). 158 fn is_block_synced(&self) -> bool { 159 self.sync.is_block_synced() 160 } 161 162 /// Returns the number of blocks this node is behind the greatest peer height, 163 /// or `None` if not connected to peers yet. 164 fn num_blocks_behind(&self) -> Option<u32> { 165 self.sync.num_blocks_behind() 166 } 167 168 /// Returns the current sync speed in blocks per second. 169 fn get_sync_speed(&self) -> f64 { 170 self.sync.get_sync_speed() 171 } 172 } 173 174 #[async_trait] 175 impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> { 176 /// Returns `true` if the message version is valid. 177 fn is_valid_message_version(&self, message_version: u32) -> bool { 178 self.router().is_valid_message_version(message_version) 179 } 180 181 /// Handles a `BlockRequest` message. 182 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool { 183 let BlockRequest { start_height, end_height } = &message; 184 185 // Get the latest consensus version, i.e., the one for the last block's height. 186 let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) { 187 Ok(version) => version, 188 Err(err) => { 189 let err = err.context("Failed to retrieve consensus version"); 190 error!("{}", flatten_error(&err)); 191 return false; 192 } 193 }; 194 195 // Retrieve the blocks within the requested range. 196 let blocks = match self.ledger.get_blocks(*start_height..*end_height) { 197 Ok(blocks) => DataBlocks(blocks), 198 Err(error) => { 199 let err = 200 error.context(format!("Failed to retrieve blocks {start_height} to {end_height} from the ledger")); 201 error!("{}", flatten_error(&err)); 202 return false; 203 } 204 }; 205 206 // Send the `BlockResponse` message to the peer. 207 self.router() 208 .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version))); 209 true 210 } 211 212 /// Handles a `BlockResponse` message. 213 fn block_response( 214 &self, 215 peer_ip: SocketAddr, 216 blocks: Vec<Block<N>>, 217 latest_consensus_version: Option<ConsensusVersion>, 218 ) -> bool { 219 // We do not need to explicitly sync here because insert_block_response, will wake up the sync task. 220 if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) { 221 warn!("{}", flatten_error(err.context("Failed to insert block response"))); 222 false 223 } else { 224 true 225 } 226 } 227 228 /// Processes the block locators and sends back a `Pong` message. 229 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool { 230 // If block locators were provided, then update the peer in the sync pool. 231 if let Some(block_locators) = message.block_locators { 232 // Check the block locators are valid, and update the peer in the sync pool. 233 if let Err(err) = self.sync.update_peer_locators(peer_ip, &block_locators) { 234 warn!("{}", flatten_error(err.context(format!("Peer '{peer_ip}' sent invalid block locators")))); 235 return false; 236 } 237 238 let last_peer_height = Some(block_locators.latest_locator_height()); 239 self.router().update_connected_peer(&peer_ip, |peer| peer.last_height_seen = last_peer_height); 240 } 241 242 // Send a `Pong` message to the peer. 243 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) })); 244 true 245 } 246 247 /// Sleeps for a period and then sends a `Ping` message to the peer. 248 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool { 249 self.ping.on_pong_received(peer_ip); 250 true 251 } 252 253 /// Retrieves the latest epoch hash and latest block header, and returns the puzzle response to the peer. 254 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool { 255 // Retrieve the latest epoch hash. 256 let epoch_hash = match self.ledger.latest_epoch_hash() { 257 Ok(epoch_hash) => epoch_hash, 258 Err(err) => { 259 let err = err.context(format!("Failed to prepare a puzzle request for '{peer_ip}'")); 260 error!("{}", flatten_error(err)); 261 return false; 262 } 263 }; 264 // Retrieve the latest block header. 265 let block_header = Data::Object(self.ledger.latest_header()); 266 // Send the `PuzzleResponse` message to the peer. 267 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header })); 268 true 269 } 270 271 /// Saves the latest epoch hash and latest block header in the node. 272 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool { 273 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation); 274 false 275 } 276 277 /// Propagates the unconfirmed solution to all connected validators. 278 async fn unconfirmed_solution( 279 &self, 280 peer_ip: SocketAddr, 281 serialized: UnconfirmedSolution<N>, 282 solution: Solution<N>, 283 ) -> bool { 284 // Try to add the solution to the verification queue, without changing LRU status of known solutions. 285 let mut solution_queue = self.solution_queue.lock(); 286 if !solution_queue.contains(&solution.id()) { 287 solution_queue.put(solution.id(), (peer_ip, serialized, solution)); 288 } 289 290 true // Maintain the connection 291 } 292 293 /// Handles an `UnconfirmedTransaction` message. 294 async fn unconfirmed_transaction( 295 &self, 296 peer_ip: SocketAddr, 297 serialized: UnconfirmedTransaction<N>, 298 transaction: Transaction<N>, 299 ) -> bool { 300 // Try to add the transaction to a verification queue, without changing LRU status of known transactions. 301 match &transaction { 302 Transaction::<N>::Fee(..) => (), // Fee Transactions are not valid. 303 Transaction::<N>::Deploy(..) => { 304 let mut deploy_queue = self.deploy_queue.lock(); 305 if !deploy_queue.contains(&transaction.id()) { 306 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction)); 307 } 308 } 309 Transaction::<N>::Execute(..) => { 310 let mut execute_queue = self.execute_queue.lock(); 311 if !execute_queue.contains(&transaction.id()) { 312 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction)); 313 } 314 } 315 } 316 317 true // Maintain the connection 318 } 319 }