/ node / src / validator / router.rs
router.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use super::*;
 20  use alphaos_node_network::PeerPoolHandling;
 21  use alphaos_node_router::messages::{
 22      BlockRequest,
 23      BlockResponse,
 24      DataBlocks,
 25      DisconnectReason,
 26      Message,
 27      MessageCodec,
 28      Ping,
 29      Pong,
 30      UnconfirmedTransaction,
 31  };
 32  use alphaos_node_sync::{load_or_fetch_genesis, GenesisConfig};
 33  use alphaos_node_tcp::{Connection, ConnectionSide, Tcp};
 34  use alphavm::{
 35      console::network::{ConsensusVersion, Network},
 36      ledger::{block::Transaction, narwhal::Data},
 37      prelude::{
 38          block::{Header, Metadata},
 39          Field,
 40          Zero,
 41      },
 42      utilities::{flatten_error, io_error},
 43  };
 44  
 45  use std::{io, net::SocketAddr};
 46  
 47  impl<N: Network, C: ConsensusStorage<N>> P2P for Validator<N, C> {
 48      /// Returns a reference to the TCP instance.
 49      fn tcp(&self) -> &Tcp {
 50          self.router.tcp()
 51      }
 52  }
 53  
 54  #[async_trait]
 55  impl<N: Network, C: ConsensusStorage<N>> Handshake for Validator<N, C> {
 56      /// Performs the handshake protocol.
 57      async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
 58          // Perform the handshake.
 59          let peer_addr = connection.addr();
 60          let conn_side = connection.side();
 61          let stream = self.borrow_stream(&mut connection);
 62  
 63          // Try to get genesis header from ledger.
 64          // If it exists, mark genesis as Known and use it.
 65          // If it doesn't exist, we're a late-joining validator - genesis will be learned from peers.
 66          let genesis_header = match self.ledger.get_header(0) {
 67              Ok(header) => {
 68                  // We have genesis - mark as Known if not already.
 69                  self.router.set_genesis_known(header);
 70                  header
 71              }
 72              Err(_) => {
 73                  // No genesis yet - keep state as Unknown, use a placeholder header.
 74                  // Create a minimal valid genesis header that will be accepted.
 75                  // The peer's genesis will be learned during handshake and trigger fetch.
 76                  let network_id = N::ID;
 77                  let round = 0u64;
 78                  let height = 0u32;
 79                  let cumulative_weight = 0u128;
 80                  let cumulative_proof_target = 0u128;
 81                  let coinbase_target = 0u64;
 82                  let proof_target = 0u64;
 83                  let last_coinbase_target = 0u64;
 84                  let last_coinbase_timestamp = 0i64;
 85                  let timestamp = 0i64;
 86  
 87                  let metadata = Metadata::<N>::new(
 88                      network_id,
 89                      round,
 90                      height,
 91                      cumulative_weight,
 92                      cumulative_proof_target,
 93                      coinbase_target,
 94                      proof_target,
 95                      last_coinbase_target,
 96                      last_coinbase_timestamp,
 97                      timestamp,
 98                  )
 99                  .map_err(io_error)?;
100  
101                  Header::<N>::from(
102                      Field::<N>::zero().into(), // previous_state_root (N::StateRoot)
103                      Field::<N>::zero(),        // transactions_root
104                      Field::<N>::zero(),        // finalize_root
105                      Field::<N>::zero(),        // ratifications_root
106                      Field::<N>::zero(),        // solutions_root
107                      Field::<N>::zero(),        // subdag_root
108                      metadata,
109                  )
110                  .map_err(io_error)?
111              }
112          };
113  
114          let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
115          self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
116  
117          Ok(connection)
118      }
119  }
120  
121  #[async_trait]
122  impl<N: Network, C: ConsensusStorage<N>> OnConnect for Validator<N, C>
123  where
124      Self: Outbound<N>,
125  {
126      async fn on_connect(&self, peer_addr: SocketAddr) {
127          // Check if we need to fetch genesis (Section 6: Genesis fetch trigger)
128          match self.router().genesis_state() {
129              alphaos_node_router::GenesisState::Pending(_header) => {
130                  info!("🔄 Genesis state is Pending - triggering automatic fetch from peer {}", peer_addr);
131  
132                  // Use the connected peer as bootstrap peer for genesis fetch
133                  let config = GenesisConfig::<N>::default();
134  
135                  match load_or_fetch_genesis::<N>(peer_addr, &config).await {
136                      Ok(genesis) => {
137                          info!("✅ Successfully fetched genesis: height={}, hash={}", genesis.height(), genesis.hash());
138  
139                          // TODO: Insert genesis into ledger
140                          // This requires modifying the ledger to support genesis insertion
141                          // For now, just mark as known
142                          self.router().set_genesis_known(*genesis.header());
143  
144                          info!("🎉 Genesis fetch complete - node is now operational");
145                      }
146                      Err(e) => {
147                          error!("❌ Failed to fetch genesis from {}: {}", peer_addr, e);
148                          // Don't disconnect - keep trying with other peers
149                      }
150                  }
151              }
152              alphaos_node_router::GenesisState::Known(_) => {
153                  // Genesis already known - normal operation
154              }
155              alphaos_node_router::GenesisState::Unknown => {
156                  // Still unknown - handshake hasn't completed yet or no peers connected
157              }
158          }
159  
160          // Resolve the peer address to the listener address.
161          if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) {
162              if let Some(peer) = self.router().get_connected_peer(listener_addr) {
163                  if peer.node_type != NodeType::BootstrapClient {
164                      // Send the first `Ping` message to the peer.
165                      self.ping.on_peer_connected(listener_addr);
166                  }
167              }
168          }
169      }
170  }
171  
172  #[async_trait]
173  impl<N: Network, C: ConsensusStorage<N>> Disconnect for Validator<N, C> {
174      /// Any extra operations to be performed during a disconnect.
175      async fn handle_disconnect(&self, peer_addr: SocketAddr) {
176          if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) {
177              let was_connected = self.router.downgrade_peer_to_candidate(peer_ip);
178  
179              // Only remove the peer from sync if the handshake was successful.
180              // This handles the cases where a validator unsuccessfully tries to connect to another validator using the router.
181              if was_connected {
182                  self.sync.remove_peer(&peer_ip);
183              }
184  
185              // Clear cached entries applicable to the peer.
186              self.router.cache().clear_peer_entries(peer_ip);
187              #[cfg(feature = "metrics")]
188              self.router.update_metrics();
189          }
190      }
191  }
192  
193  #[async_trait]
194  impl<N: Network, C: ConsensusStorage<N>> Reading for Validator<N, C> {
195      type Codec = MessageCodec<N>;
196      type Message = Message<N>;
197  
198      /// Creates a [`Decoder`] used to interpret messages from the network.
199      /// The `side` param indicates the connection side **from the node's perspective**.
200      fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
201          Default::default()
202      }
203  
204      /// Processes a message received from the network.
205      async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
206          let clone = self.clone();
207          if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
208              // Handle BlockRequest and BlockResponse messages in a separate task to not block the
209              // inbound queue.
210              tokio::spawn(async move {
211                  clone.process_message_inner(peer_addr, message).await;
212              });
213          } else {
214              self.process_message_inner(peer_addr, message).await;
215          }
216          Ok(())
217      }
218  }
219  
220  impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
221      async fn process_message_inner(
222          &self,
223          peer_addr: SocketAddr,
224          message: <Validator<N, C> as alphaos_node_tcp::protocols::Reading>::Message,
225      ) {
226          // Process the message. Disconnect if the peer violated the protocol.
227          if let Err(error) = self.inbound(peer_addr, message).await {
228              warn!("Failed to process inbound message from '{peer_addr}' - {error}");
229              if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) {
230                  warn!("Disconnecting from '{peer_ip}' for protocol violation");
231                  self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
232                  // Disconnect from this peer.
233                  self.router().disconnect(peer_ip);
234              }
235          }
236      }
237  }
238  
239  #[async_trait]
240  impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Validator<N, C> {}
241  
242  impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Validator<N, C> {}
243  
244  impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Validator<N, C> {
245      /// Returns a reference to the router.
246      fn router(&self) -> &Router<N> {
247          &self.router
248      }
249  
250      /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
251      fn is_block_synced(&self) -> bool {
252          self.sync.is_block_synced()
253      }
254  
255      /// Returns the number of blocks this node is behind the greatest peer height,
256      /// or `None` if not connected to peers yet.
257      fn num_blocks_behind(&self) -> Option<u32> {
258          self.sync.num_blocks_behind()
259      }
260  
261      /// Returns the current sync speed in blocks per second.
262      fn get_sync_speed(&self) -> f64 {
263          self.sync.get_sync_speed()
264      }
265  }
266  
267  #[async_trait]
268  impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Validator<N, C> {
269      /// Returns `true` if the message version is valid.
270      fn is_valid_message_version(&self, message_version: u32) -> bool {
271          self.router().is_valid_message_version(message_version)
272      }
273  
274      /// Retrieves the blocks within the block request range, and returns the block response to the peer.
275      fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
276          let BlockRequest { start_height, end_height } = &message;
277  
278          // Get the latest consensus version, i.e., the one for the last block's height.
279          let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) {
280              Ok(version) => version,
281              Err(err) => {
282                  error!("{}", flatten_error(err.context("Failed to retrieve consensus version")));
283                  return false;
284              }
285          };
286  
287          // Retrieve the blocks within the requested range.
288          let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
289              Ok(blocks) => DataBlocks(blocks),
290              Err(err) => {
291                  let err =
292                      err.context(format!("Failed to retrieve blocks {start_height} to {end_height} from the ledger"));
293                  error!("{}", flatten_error(err));
294                  return false;
295              }
296          };
297          // Send the `BlockResponse` message to the peer.
298          self.router()
299              .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version)));
300          true
301      }
302  
303      /// Handles a `BlockResponse` message.
304      fn block_response(
305          &self,
306          peer_ip: SocketAddr,
307          _blocks: Vec<Block<N>>,
308          _latest_consensus_version: Option<ConsensusVersion>,
309      ) -> bool {
310          warn!("Received a block response through P2P, not BFT, from {peer_ip}");
311          false
312      }
313  
314      /// Processes a ping message from a client (or prover) and sends back a `Pong` message.
315      fn ping(&self, peer_ip: SocketAddr, _message: Ping<N>) -> bool {
316          // In gateway/validator mode, we do not need to process client block locators.
317          // Instead, locators are fetched from other validators in `Gateway` using `PrimaryPing` messages.
318  
319          // Send a `Pong` message to the peer.
320          self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
321          true
322      }
323  
324      /// Process a Pong message (response to a Ping).
325      fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
326          self.ping.on_pong_received(peer_ip);
327          true
328      }
329  
330      /// Retrieves the latest epoch hash and latest block header, and returns the puzzle response to the peer.
331      fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
332          // Retrieve the latest epoch hash.
333          let epoch_hash = match self.ledger.latest_epoch_hash() {
334              Ok(epoch_hash) => epoch_hash,
335              Err(error) => {
336                  error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
337                  return false;
338              }
339          };
340          // Retrieve the latest block header.
341          let block_header = Data::Object(self.ledger.latest_header());
342          // Send the `PuzzleResponse` message to the peer.
343          self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
344          true
345      }
346  
347      /// Disconnects on receipt of a `PuzzleResponse` message.
348      fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
349          debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
350          false
351      }
352  
353      /// Propagates the unconfirmed solution to all connected validators.
354      async fn unconfirmed_solution(
355          &self,
356          peer_ip: SocketAddr,
357          serialized: UnconfirmedSolution<N>,
358          solution: Solution<N>,
359      ) -> bool {
360          // Add the unconfirmed solution to the memory pool.
361          if let Err(error) = self.consensus.add_unconfirmed_solution(solution).await {
362              trace!("[UnconfirmedSolution] {error}");
363              return true; // Maintain the connection.
364          }
365          let message = Message::UnconfirmedSolution(serialized);
366          // Propagate the "UnconfirmedSolution" to the connected validators.
367          self.propagate_to_validators(message, &[peer_ip]);
368          true
369      }
370  
371      /// Handles an `UnconfirmedTransaction` message.
372      async fn unconfirmed_transaction(
373          &self,
374          peer_ip: SocketAddr,
375          serialized: UnconfirmedTransaction<N>,
376          transaction: Transaction<N>,
377      ) -> bool {
378          // Add the unconfirmed transaction to the memory pool.
379          if let Err(error) = self.consensus.add_unconfirmed_transaction(transaction).await {
380              trace!("[UnconfirmedTransaction] {error}");
381              return true; // Maintain the connection.
382          }
383          let message = Message::UnconfirmedTransaction(serialized);
384          // Propagate the "UnconfirmedTransaction" to the connected validators.
385          self.propagate_to_validators(message, &[peer_ip]);
386          true
387      }
388  }