router.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use super::*; 17 use alphaos_node_network::PeerPoolHandling; 18 use alphaos_node_router::messages::{ 19 BlockRequest, 20 BlockResponse, 21 DataBlocks, 22 DisconnectReason, 23 Message, 24 MessageCodec, 25 Ping, 26 Pong, 27 UnconfirmedTransaction, 28 }; 29 use alphaos_node_tcp::{Connection, ConnectionSide, Tcp}; 30 use alphavm::{ 31 console::network::{ConsensusVersion, Network}, 32 ledger::{block::Transaction, narwhal::Data}, 33 utilities::{flatten_error, io_error}, 34 }; 35 36 use std::{io, net::SocketAddr}; 37 38 impl<N: Network, C: ConsensusStorage<N>> P2P for Validator<N, C> { 39 /// Returns a reference to the TCP instance. 40 fn tcp(&self) -> &Tcp { 41 self.router.tcp() 42 } 43 } 44 45 #[async_trait] 46 impl<N: Network, C: ConsensusStorage<N>> Handshake for Validator<N, C> { 47 /// Performs the handshake protocol. 48 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> { 49 // Perform the handshake. 50 let peer_addr = connection.addr(); 51 let conn_side = connection.side(); 52 let stream = self.borrow_stream(&mut connection); 53 let genesis_header = self.ledger.get_header(0).map_err(io_error)?; 54 let restrictions_id = self.ledger.vm().restrictions().restrictions_id(); 55 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?; 56 57 Ok(connection) 58 } 59 } 60 61 #[async_trait] 62 impl<N: Network, C: ConsensusStorage<N>> OnConnect for Validator<N, C> 63 where 64 Self: Outbound<N>, 65 { 66 async fn on_connect(&self, peer_addr: SocketAddr) { 67 // Resolve the peer address to the listener address. 68 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) { 69 if let Some(peer) = self.router().get_connected_peer(listener_addr) { 70 if peer.node_type != NodeType::BootstrapClient { 71 // Send the first `Ping` message to the peer. 72 self.ping.on_peer_connected(listener_addr); 73 } 74 } 75 } 76 } 77 } 78 79 #[async_trait] 80 impl<N: Network, C: ConsensusStorage<N>> Disconnect for Validator<N, C> { 81 /// Any extra operations to be performed during a disconnect. 82 async fn handle_disconnect(&self, peer_addr: SocketAddr) { 83 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) { 84 let was_connected = self.router.downgrade_peer_to_candidate(peer_ip); 85 86 // Only remove the peer from sync if the handshake was successful. 87 // This handles the cases where a validator unsuccessfully tries to connect to another validator using the router. 88 if was_connected { 89 self.sync.remove_peer(&peer_ip); 90 } 91 92 // Clear cached entries applicable to the peer. 93 self.router.cache().clear_peer_entries(peer_ip); 94 #[cfg(feature = "metrics")] 95 self.router.update_metrics(); 96 } 97 } 98 } 99 100 #[async_trait] 101 impl<N: Network, C: ConsensusStorage<N>> Reading for Validator<N, C> { 102 type Codec = MessageCodec<N>; 103 type Message = Message<N>; 104 105 /// Creates a [`Decoder`] used to interpret messages from the network. 106 /// The `side` param indicates the connection side **from the node's perspective**. 107 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 108 Default::default() 109 } 110 111 /// Processes a message received from the network. 112 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 113 let clone = self.clone(); 114 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) { 115 // Handle BlockRequest and BlockResponse messages in a separate task to not block the 116 // inbound queue. 117 tokio::spawn(async move { 118 clone.process_message_inner(peer_addr, message).await; 119 }); 120 } else { 121 self.process_message_inner(peer_addr, message).await; 122 } 123 Ok(()) 124 } 125 } 126 127 impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> { 128 async fn process_message_inner( 129 &self, 130 peer_addr: SocketAddr, 131 message: <Validator<N, C> as alphaos_node_tcp::protocols::Reading>::Message, 132 ) { 133 // Process the message. Disconnect if the peer violated the protocol. 134 if let Err(error) = self.inbound(peer_addr, message).await { 135 warn!("Failed to process inbound message from '{peer_addr}' - {error}"); 136 if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { 137 warn!("Disconnecting from '{peer_ip}' for protocol violation"); 138 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into())); 139 // Disconnect from this peer. 140 self.router().disconnect(peer_ip); 141 } 142 } 143 } 144 } 145 146 #[async_trait] 147 impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Validator<N, C> {} 148 149 impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Validator<N, C> {} 150 151 impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Validator<N, C> { 152 /// Returns a reference to the router. 153 fn router(&self) -> &Router<N> { 154 &self.router 155 } 156 157 /// Returns `true` if the node is synced up to the latest block (within the given tolerance). 158 fn is_block_synced(&self) -> bool { 159 self.sync.is_block_synced() 160 } 161 162 /// Returns the number of blocks this node is behind the greatest peer height, 163 /// or `None` if not connected to peers yet. 164 fn num_blocks_behind(&self) -> Option<u32> { 165 self.sync.num_blocks_behind() 166 } 167 168 /// Returns the current sync speed in blocks per second. 169 fn get_sync_speed(&self) -> f64 { 170 self.sync.get_sync_speed() 171 } 172 } 173 174 #[async_trait] 175 impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Validator<N, C> { 176 /// Returns `true` if the message version is valid. 177 fn is_valid_message_version(&self, message_version: u32) -> bool { 178 self.router().is_valid_message_version(message_version) 179 } 180 181 /// Retrieves the blocks within the block request range, and returns the block response to the peer. 182 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool { 183 let BlockRequest { start_height, end_height } = &message; 184 185 // Get the latest consensus version, i.e., the one for the last block's height. 186 let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) { 187 Ok(version) => version, 188 Err(err) => { 189 error!("{}", flatten_error(err.context("Failed to retrieve consensus version"))); 190 return false; 191 } 192 }; 193 194 // Retrieve the blocks within the requested range. 195 let blocks = match self.ledger.get_blocks(*start_height..*end_height) { 196 Ok(blocks) => DataBlocks(blocks), 197 Err(err) => { 198 let err = 199 err.context(format!("Failed to retrieve blocks {start_height} to {end_height} from the ledger")); 200 error!("{}", flatten_error(err)); 201 return false; 202 } 203 }; 204 // Send the `BlockResponse` message to the peer. 205 self.router() 206 .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version))); 207 true 208 } 209 210 /// Handles a `BlockResponse` message. 211 fn block_response( 212 &self, 213 peer_ip: SocketAddr, 214 _blocks: Vec<Block<N>>, 215 _latest_consensus_version: Option<ConsensusVersion>, 216 ) -> bool { 217 warn!("Received a block response through P2P, not BFT, from {peer_ip}"); 218 false 219 } 220 221 /// Processes a ping message from a client (or prover) and sends back a `Pong` message. 222 fn ping(&self, peer_ip: SocketAddr, _message: Ping<N>) -> bool { 223 // In gateway/validator mode, we do not need to process client block locators. 224 // Instead, locators are fetched from other validators in `Gateway` using `PrimaryPing` messages. 225 226 // Send a `Pong` message to the peer. 227 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) })); 228 true 229 } 230 231 /// Process a Pong message (response to a Ping). 232 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool { 233 self.ping.on_pong_received(peer_ip); 234 true 235 } 236 237 /// Retrieves the latest epoch hash and latest block header, and returns the puzzle response to the peer. 238 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool { 239 // Retrieve the latest epoch hash. 240 let epoch_hash = match self.ledger.latest_epoch_hash() { 241 Ok(epoch_hash) => epoch_hash, 242 Err(error) => { 243 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}"); 244 return false; 245 } 246 }; 247 // Retrieve the latest block header. 248 let block_header = Data::Object(self.ledger.latest_header()); 249 // Send the `PuzzleResponse` message to the peer. 250 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header })); 251 true 252 } 253 254 /// Disconnects on receipt of a `PuzzleResponse` message. 255 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool { 256 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation); 257 false 258 } 259 260 /// Propagates the unconfirmed solution to all connected validators. 261 async fn unconfirmed_solution( 262 &self, 263 peer_ip: SocketAddr, 264 serialized: UnconfirmedSolution<N>, 265 solution: Solution<N>, 266 ) -> bool { 267 // Add the unconfirmed solution to the memory pool. 268 if let Err(error) = self.consensus.add_unconfirmed_solution(solution).await { 269 trace!("[UnconfirmedSolution] {error}"); 270 return true; // Maintain the connection. 271 } 272 let message = Message::UnconfirmedSolution(serialized); 273 // Propagate the "UnconfirmedSolution" to the connected validators. 274 self.propagate_to_validators(message, &[peer_ip]); 275 true 276 } 277 278 /// Handles an `UnconfirmedTransaction` message. 279 async fn unconfirmed_transaction( 280 &self, 281 peer_ip: SocketAddr, 282 serialized: UnconfirmedTransaction<N>, 283 transaction: Transaction<N>, 284 ) -> bool { 285 // Add the unconfirmed transaction to the memory pool. 286 if let Err(error) = self.consensus.add_unconfirmed_transaction(transaction).await { 287 trace!("[UnconfirmedTransaction] {error}"); 288 return true; // Maintain the connection. 289 } 290 let message = Message::UnconfirmedTransaction(serialized); 291 // Propagate the "UnconfirmedTransaction" to the connected validators. 292 self.propagate_to_validators(message, &[peer_ip]); 293 true 294 } 295 }