router.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use super::*; 20 use alphaos_node_network::PeerPoolHandling; 21 use alphaos_node_router::messages::{ 22 BlockRequest, 23 BlockResponse, 24 DataBlocks, 25 DisconnectReason, 26 Message, 27 MessageCodec, 28 Ping, 29 Pong, 30 UnconfirmedTransaction, 31 }; 32 use alphaos_node_sync::{load_or_fetch_genesis, GenesisConfig}; 33 use alphaos_node_tcp::{Connection, ConnectionSide, Tcp}; 34 use alphavm::{ 35 console::network::{ConsensusVersion, Network}, 36 ledger::{block::Transaction, narwhal::Data}, 37 prelude::{ 38 block::{Header, Metadata}, 39 Field, 40 Zero, 41 }, 42 utilities::{flatten_error, io_error}, 43 }; 44 45 use std::{io, net::SocketAddr}; 46 47 impl<N: Network, C: ConsensusStorage<N>> P2P for Validator<N, C> { 48 /// Returns a reference to the TCP instance. 49 fn tcp(&self) -> &Tcp { 50 self.router.tcp() 51 } 52 } 53 54 #[async_trait] 55 impl<N: Network, C: ConsensusStorage<N>> Handshake for Validator<N, C> { 56 /// Performs the handshake protocol. 57 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> { 58 // Perform the handshake. 59 let peer_addr = connection.addr(); 60 let conn_side = connection.side(); 61 let stream = self.borrow_stream(&mut connection); 62 63 // Try to get genesis header from ledger. 64 // If it exists, mark genesis as Known and use it. 65 // If it doesn't exist, we're a late-joining validator - genesis will be learned from peers. 66 let genesis_header = match self.ledger.get_header(0) { 67 Ok(header) => { 68 // We have genesis - mark as Known if not already. 69 self.router.set_genesis_known(header); 70 header 71 } 72 Err(_) => { 73 // No genesis yet - keep state as Unknown, use a placeholder header. 74 // Create a minimal valid genesis header that will be accepted. 75 // The peer's genesis will be learned during handshake and trigger fetch. 76 let network_id = N::ID; 77 let round = 0u64; 78 let height = 0u32; 79 let cumulative_weight = 0u128; 80 let cumulative_proof_target = 0u128; 81 let coinbase_target = 0u64; 82 let proof_target = 0u64; 83 let last_coinbase_target = 0u64; 84 let last_coinbase_timestamp = 0i64; 85 let timestamp = 0i64; 86 87 let metadata = Metadata::<N>::new( 88 network_id, 89 round, 90 height, 91 cumulative_weight, 92 cumulative_proof_target, 93 coinbase_target, 94 proof_target, 95 last_coinbase_target, 96 last_coinbase_timestamp, 97 timestamp, 98 ) 99 .map_err(io_error)?; 100 101 Header::<N>::from( 102 Field::<N>::zero().into(), // previous_state_root (N::StateRoot) 103 Field::<N>::zero(), // transactions_root 104 Field::<N>::zero(), // finalize_root 105 Field::<N>::zero(), // ratifications_root 106 Field::<N>::zero(), // solutions_root 107 Field::<N>::zero(), // subdag_root 108 metadata, 109 ) 110 .map_err(io_error)? 111 } 112 }; 113 114 let restrictions_id = self.ledger.vm().restrictions().restrictions_id(); 115 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?; 116 117 Ok(connection) 118 } 119 } 120 121 #[async_trait] 122 impl<N: Network, C: ConsensusStorage<N>> OnConnect for Validator<N, C> 123 where 124 Self: Outbound<N>, 125 { 126 async fn on_connect(&self, peer_addr: SocketAddr) { 127 // Check if we need to fetch genesis (Section 6: Genesis fetch trigger) 128 match self.router().genesis_state() { 129 alphaos_node_router::GenesisState::Pending(_header) => { 130 info!("🔄 Genesis state is Pending - triggering automatic fetch from peer {}", peer_addr); 131 132 // Use the connected peer as bootstrap peer for genesis fetch 133 let config = GenesisConfig::<N>::default(); 134 135 match load_or_fetch_genesis::<N>(peer_addr, &config).await { 136 Ok(genesis) => { 137 info!("✅ Successfully fetched genesis: height={}, hash={}", genesis.height(), genesis.hash()); 138 139 // TODO: Insert genesis into ledger 140 // This requires modifying the ledger to support genesis insertion 141 // For now, just mark as known 142 self.router().set_genesis_known(*genesis.header()); 143 144 info!("🎉 Genesis fetch complete - node is now operational"); 145 } 146 Err(e) => { 147 error!("❌ Failed to fetch genesis from {}: {}", peer_addr, e); 148 // Don't disconnect - keep trying with other peers 149 } 150 } 151 } 152 alphaos_node_router::GenesisState::Known(_) => { 153 // Genesis already known - normal operation 154 } 155 alphaos_node_router::GenesisState::Unknown => { 156 // Still unknown - handshake hasn't completed yet or no peers connected 157 } 158 } 159 160 // Resolve the peer address to the listener address. 161 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) { 162 if let Some(peer) = self.router().get_connected_peer(listener_addr) { 163 if peer.node_type != NodeType::BootstrapClient { 164 // Send the first `Ping` message to the peer. 165 self.ping.on_peer_connected(listener_addr); 166 } 167 } 168 } 169 } 170 } 171 172 #[async_trait] 173 impl<N: Network, C: ConsensusStorage<N>> Disconnect for Validator<N, C> { 174 /// Any extra operations to be performed during a disconnect. 175 async fn handle_disconnect(&self, peer_addr: SocketAddr) { 176 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) { 177 let was_connected = self.router.downgrade_peer_to_candidate(peer_ip); 178 179 // Only remove the peer from sync if the handshake was successful. 180 // This handles the cases where a validator unsuccessfully tries to connect to another validator using the router. 181 if was_connected { 182 self.sync.remove_peer(&peer_ip); 183 } 184 185 // Clear cached entries applicable to the peer. 186 self.router.cache().clear_peer_entries(peer_ip); 187 #[cfg(feature = "metrics")] 188 self.router.update_metrics(); 189 } 190 } 191 } 192 193 #[async_trait] 194 impl<N: Network, C: ConsensusStorage<N>> Reading for Validator<N, C> { 195 type Codec = MessageCodec<N>; 196 type Message = Message<N>; 197 198 /// Creates a [`Decoder`] used to interpret messages from the network. 199 /// The `side` param indicates the connection side **from the node's perspective**. 200 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 201 Default::default() 202 } 203 204 /// Processes a message received from the network. 205 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 206 let clone = self.clone(); 207 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) { 208 // Handle BlockRequest and BlockResponse messages in a separate task to not block the 209 // inbound queue. 210 tokio::spawn(async move { 211 clone.process_message_inner(peer_addr, message).await; 212 }); 213 } else { 214 self.process_message_inner(peer_addr, message).await; 215 } 216 Ok(()) 217 } 218 } 219 220 impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> { 221 async fn process_message_inner( 222 &self, 223 peer_addr: SocketAddr, 224 message: <Validator<N, C> as alphaos_node_tcp::protocols::Reading>::Message, 225 ) { 226 // Process the message. Disconnect if the peer violated the protocol. 227 if let Err(error) = self.inbound(peer_addr, message).await { 228 warn!("Failed to process inbound message from '{peer_addr}' - {error}"); 229 if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { 230 warn!("Disconnecting from '{peer_ip}' for protocol violation"); 231 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into())); 232 // Disconnect from this peer. 233 self.router().disconnect(peer_ip); 234 } 235 } 236 } 237 } 238 239 #[async_trait] 240 impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Validator<N, C> {} 241 242 impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Validator<N, C> {} 243 244 impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Validator<N, C> { 245 /// Returns a reference to the router. 246 fn router(&self) -> &Router<N> { 247 &self.router 248 } 249 250 /// Returns `true` if the node is synced up to the latest block (within the given tolerance). 251 fn is_block_synced(&self) -> bool { 252 self.sync.is_block_synced() 253 } 254 255 /// Returns the number of blocks this node is behind the greatest peer height, 256 /// or `None` if not connected to peers yet. 257 fn num_blocks_behind(&self) -> Option<u32> { 258 self.sync.num_blocks_behind() 259 } 260 261 /// Returns the current sync speed in blocks per second. 262 fn get_sync_speed(&self) -> f64 { 263 self.sync.get_sync_speed() 264 } 265 } 266 267 #[async_trait] 268 impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Validator<N, C> { 269 /// Returns `true` if the message version is valid. 270 fn is_valid_message_version(&self, message_version: u32) -> bool { 271 self.router().is_valid_message_version(message_version) 272 } 273 274 /// Retrieves the blocks within the block request range, and returns the block response to the peer. 275 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool { 276 let BlockRequest { start_height, end_height } = &message; 277 278 // Get the latest consensus version, i.e., the one for the last block's height. 279 let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) { 280 Ok(version) => version, 281 Err(err) => { 282 error!("{}", flatten_error(err.context("Failed to retrieve consensus version"))); 283 return false; 284 } 285 }; 286 287 // Retrieve the blocks within the requested range. 288 let blocks = match self.ledger.get_blocks(*start_height..*end_height) { 289 Ok(blocks) => DataBlocks(blocks), 290 Err(err) => { 291 let err = 292 err.context(format!("Failed to retrieve blocks {start_height} to {end_height} from the ledger")); 293 error!("{}", flatten_error(err)); 294 return false; 295 } 296 }; 297 // Send the `BlockResponse` message to the peer. 298 self.router() 299 .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version))); 300 true 301 } 302 303 /// Handles a `BlockResponse` message. 304 fn block_response( 305 &self, 306 peer_ip: SocketAddr, 307 _blocks: Vec<Block<N>>, 308 _latest_consensus_version: Option<ConsensusVersion>, 309 ) -> bool { 310 warn!("Received a block response through P2P, not BFT, from {peer_ip}"); 311 false 312 } 313 314 /// Processes a ping message from a client (or prover) and sends back a `Pong` message. 315 fn ping(&self, peer_ip: SocketAddr, _message: Ping<N>) -> bool { 316 // In gateway/validator mode, we do not need to process client block locators. 317 // Instead, locators are fetched from other validators in `Gateway` using `PrimaryPing` messages. 318 319 // Send a `Pong` message to the peer. 320 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) })); 321 true 322 } 323 324 /// Process a Pong message (response to a Ping). 325 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool { 326 self.ping.on_pong_received(peer_ip); 327 true 328 } 329 330 /// Retrieves the latest epoch hash and latest block header, and returns the puzzle response to the peer. 331 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool { 332 // Retrieve the latest epoch hash. 333 let epoch_hash = match self.ledger.latest_epoch_hash() { 334 Ok(epoch_hash) => epoch_hash, 335 Err(error) => { 336 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}"); 337 return false; 338 } 339 }; 340 // Retrieve the latest block header. 341 let block_header = Data::Object(self.ledger.latest_header()); 342 // Send the `PuzzleResponse` message to the peer. 343 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header })); 344 true 345 } 346 347 /// Disconnects on receipt of a `PuzzleResponse` message. 348 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool { 349 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation); 350 false 351 } 352 353 /// Propagates the unconfirmed solution to all connected validators. 354 async fn unconfirmed_solution( 355 &self, 356 peer_ip: SocketAddr, 357 serialized: UnconfirmedSolution<N>, 358 solution: Solution<N>, 359 ) -> bool { 360 // Add the unconfirmed solution to the memory pool. 361 if let Err(error) = self.consensus.add_unconfirmed_solution(solution).await { 362 trace!("[UnconfirmedSolution] {error}"); 363 return true; // Maintain the connection. 364 } 365 let message = Message::UnconfirmedSolution(serialized); 366 // Propagate the "UnconfirmedSolution" to the connected validators. 367 self.propagate_to_validators(message, &[peer_ip]); 368 true 369 } 370 371 /// Handles an `UnconfirmedTransaction` message. 372 async fn unconfirmed_transaction( 373 &self, 374 peer_ip: SocketAddr, 375 serialized: UnconfirmedTransaction<N>, 376 transaction: Transaction<N>, 377 ) -> bool { 378 // Add the unconfirmed transaction to the memory pool. 379 if let Err(error) = self.consensus.add_unconfirmed_transaction(transaction).await { 380 trace!("[UnconfirmedTransaction] {error}"); 381 return true; // Maintain the connection. 382 } 383 let message = Message::UnconfirmedTransaction(serialized); 384 // Propagate the "UnconfirmedTransaction" to the connected validators. 385 self.propagate_to_validators(message, &[peer_ip]); 386 true 387 } 388 }