/ node / router / src / handshake.rs
handshake.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use crate::{
 20      messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
 21      ConnectionMode,
 22      NodeType,
 23      PeerPoolHandling,
 24      Router,
 25  };
 26  use alphaos_node_network::{get_repo_commit_hash, log_repo_sha_comparison};
 27  use alphaos_node_tcp::{ConnectionSide, Tcp, P2P};
 28  use alphavm::{
 29      ledger::narwhal::Data,
 30      prelude::{block::Header, error, io_error, Address, ConsensusVersion, Field, Network},
 31  };
 32  
 33  use anyhow::{bail, Result};
 34  use futures::SinkExt;
 35  use rand::{rngs::OsRng, Rng};
 36  use std::{io, net::SocketAddr};
 37  use tokio::net::TcpStream;
 38  use tokio_stream::StreamExt;
 39  use tokio_util::codec::Framed;
 40  
 41  impl<N: Network> P2P for Router<N> {
 42      /// Returns a reference to the TCP instance.
 43      fn tcp(&self) -> &Tcp {
 44          &self.tcp
 45      }
 46  }
 47  
 48  /// A macro unwrapping the expected handshake message or returning an error for unexpected messages.
 49  #[macro_export]
 50  macro_rules! expect_message {
 51      ($msg_ty:path, $framed:expr, $peer_addr:expr) => {{
 52          use alphavm::utilities::io_error;
 53  
 54          match $framed.try_next().await? {
 55              // Received the expected message, proceed.
 56              Some($msg_ty(data)) => {
 57                  trace!("Received '{}' from '{}'", data.name(), $peer_addr);
 58                  data
 59              }
 60              // Received a disconnect message, abort.
 61              Some(Message::Disconnect($crate::messages::Disconnect { reason })) => {
 62                  return Err(io_error(format!("'{}' disconnected: {reason}", $peer_addr)));
 63              }
 64              // Received an unexpected message, abort.
 65              Some(ty) => {
 66                  return Err(io_error(format!(
 67                      "'{}' did not follow the handshake protocol: received {:?} instead of {}",
 68                      $peer_addr,
 69                      ty.name(),
 70                      stringify!($msg_ty),
 71                  )));
 72              }
 73              // Received nothing.
 74              None => {
 75                  return Err(io_error(format!(
 76                      "the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
 77                      stringify!($msg_ty),
 78                  )));
 79              }
 80          }
 81      }};
 82  }
 83  
 84  /// Send the given message to the peer.
 85  async fn send<N: Network>(
 86      framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
 87      peer_addr: SocketAddr,
 88      message: Message<N>,
 89  ) -> io::Result<()> {
 90      trace!("Sending '{}' to '{peer_addr}'", message.name());
 91      framed.send(message).await
 92  }
 93  
 94  impl<N: Network> Router<N> {
 95      /// Executes the handshake protocol.
 96      pub async fn handshake<'a>(
 97          &'a self,
 98          peer_addr: SocketAddr,
 99          stream: &'a mut TcpStream,
100          peer_side: ConnectionSide,
101          genesis_header: Header<N>,
102          restrictions_id: Field<N>,
103      ) -> io::Result<Option<ChallengeRequest<N>>> {
104          // If this is an inbound connection, we log it, but don't know the listening address yet.
105          // Otherwise, we can immediately register the listening address.
106          let mut listener_addr = if peer_side == ConnectionSide::Initiator {
107              debug!("Received a connection request from '{peer_addr}'");
108              None
109          } else {
110              debug!("Shaking hands with '{peer_addr}'...");
111              Some(peer_addr)
112          };
113  
114          // Check (or impose) IP-level bans.
115          #[cfg(not(feature = "test"))]
116          if !self.is_dev() && peer_side == ConnectionSide::Initiator {
117              // If the IP is already banned reject the connection.
118              if self.is_ip_banned(peer_addr.ip()) {
119                  trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
120                  return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
121              }
122  
123              let num_attempts =
124                  self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
125  
126              debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
127              if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
128                  self.update_ip_ban(peer_addr.ip());
129                  trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
130                  return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
131              }
132          }
133  
134          // Perform the handshake; we pass on a mutable reference to listener_addr in case the process is broken at any point in time.
135          let handshake_result = if peer_side == ConnectionSide::Responder {
136              self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
137          } else {
138              self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await
139          };
140  
141          if let Some(addr) = listener_addr {
142              match handshake_result {
143                  Ok(Some(ref cr)) => {
144                      if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
145                          self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address));
146                          peer.upgrade_to_connected(
147                              peer_addr,
148                              cr.listener_port,
149                              cr.address,
150                              cr.node_type,
151                              cr.version,
152                              ConnectionMode::Router,
153                          );
154                      }
155                      #[cfg(feature = "metrics")]
156                      self.update_metrics();
157                      debug!("Completed the handshake with '{peer_addr}'");
158                  }
159                  Ok(None) => {
160                      return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
161                  }
162                  Err(_) => {
163                      if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
164                          // The peer may only be downgraded if it's a ConnectingPeer.
165                          if peer.is_connecting() {
166                              peer.downgrade_to_candidate(addr);
167                          }
168                      }
169                  }
170              }
171          }
172  
173          handshake_result
174      }
175  
176      /// The connection initiator side of the handshake.
177      async fn handshake_inner_initiator<'a>(
178          &'a self,
179          peer_addr: SocketAddr,
180          stream: &'a mut TcpStream,
181          genesis_header: Header<N>,
182          restrictions_id: Field<N>,
183      ) -> io::Result<Option<ChallengeRequest<N>>> {
184          // Introduce the peer into the peer pool.
185          if !self.add_connecting_peer(peer_addr) {
186              // Return early if already being connected to.
187              return Ok(None);
188          }
189  
190          // Construct the stream.
191          let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
192  
193          // Initialize an RNG.
194          let rng = &mut OsRng;
195  
196          // Determine the alphaos SHA to send to the peer.
197          let current_block_height = self.ledger.latest_block_height();
198          let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
199          let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
200              (true, Some(sha)) => Some(sha),
201              _ => None,
202          };
203  
204          /* Step 1: Send the challenge request. */
205  
206          // Sample a random nonce.
207          let our_nonce = rng.r#gen();
208          // Send a challenge request to the peer.
209          let our_request =
210              ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, alphaos_sha);
211          send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
212  
213          /* Step 2: Receive the peer's challenge response followed by the challenge request. */
214  
215          // Listen for the challenge response message.
216          let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
217          // Listen for the challenge request message.
218          let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
219  
220          // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
221          if let Some(reason) = self
222              .verify_challenge_response(
223                  peer_addr,
224                  peer_request.address,
225                  peer_request.node_type,
226                  peer_response,
227                  genesis_header,
228                  restrictions_id,
229                  our_nonce,
230              )
231              .await
232          {
233              send(&mut framed, peer_addr, reason.into()).await?;
234              return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
235          }
236          // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
237          if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
238              send(&mut framed, peer_addr, reason.into()).await?;
239              return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
240          }
241  
242          /* Step 3: Send the challenge response. */
243  
244          let response_nonce: u64 = rng.r#gen();
245          let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
246          // Sign the counterparty nonce.
247          let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
248              return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
249          };
250          // Send the challenge response.
251          let our_response = ChallengeResponse {
252              genesis_header,
253              restrictions_id,
254              signature: Data::Object(our_signature),
255              nonce: response_nonce,
256          };
257          send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
258  
259          Ok(Some(peer_request))
260      }
261  
262      /// The connection responder side of the handshake.
263      async fn handshake_inner_responder<'a>(
264          &'a self,
265          peer_addr: SocketAddr,
266          listener_addr: &mut Option<SocketAddr>,
267          stream: &'a mut TcpStream,
268          genesis_header: Header<N>,
269          restrictions_id: Field<N>,
270      ) -> io::Result<Option<ChallengeRequest<N>>> {
271          // Construct the stream.
272          let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
273  
274          /* Step 1: Receive the challenge request. */
275  
276          // Listen for the challenge request message.
277          let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
278  
279          // Determine the alphaos SHA to send to the peer.
280          let current_block_height = self.ledger.latest_block_height();
281          let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
282          let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
283              (true, Some(sha)) => Some(sha),
284              _ => None,
285          };
286  
287          // Obtain the peer's listening address.
288          *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
289          let listener_addr = listener_addr.unwrap();
290  
291          // Knowing the peer's listening address, ensure it is allowed to connect.
292          if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) {
293              return Err(error(format!("{forbidden_message}")));
294          }
295  
296          // Introduce the peer into the peer pool.
297          if !self.add_connecting_peer(listener_addr) {
298              // Return early if already being connected to.
299              return Ok(None);
300          }
301  
302          // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
303          if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
304              send(&mut framed, peer_addr, reason.into()).await?;
305              return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
306          }
307  
308          /* Step 2: Send the challenge response followed by own challenge request. */
309  
310          // Initialize an RNG.
311          let rng = &mut OsRng;
312  
313          // Sign the counterparty nonce.
314          let response_nonce: u64 = rng.r#gen();
315          let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
316          let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
317              return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
318          };
319          // Send the challenge response.
320          let our_response = ChallengeResponse {
321              genesis_header,
322              restrictions_id,
323              signature: Data::Object(our_signature),
324              nonce: response_nonce,
325          };
326          send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
327  
328          // Sample a random nonce.
329          let our_nonce = rng.r#gen();
330          // Send the challenge request.
331          let our_request =
332              ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, alphaos_sha);
333          send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
334  
335          /* Step 3: Receive the challenge response. */
336  
337          // Listen for the challenge response message.
338          let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
339          // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
340          if let Some(reason) = self
341              .verify_challenge_response(
342                  peer_addr,
343                  peer_request.address,
344                  peer_request.node_type,
345                  peer_response,
346                  genesis_header,
347                  restrictions_id,
348                  our_nonce,
349              )
350              .await
351          {
352              send(&mut framed, peer_addr, reason.into()).await?;
353              return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
354          }
355  
356          Ok(Some(peer_request))
357      }
358  
359      /// Ensure the peer is allowed to connect.
360      fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
361          // Ensure that it's not a self-connect attempt.
362          if self.is_local_ip(listener_addr) {
363              bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)");
364          }
365          // As a validator, only accept connections from trusted peers and bootstrap nodes.
366          if self.node_type() == NodeType::Validator
367              && !self.is_trusted(listener_addr)
368              && !crate::bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr)
369          {
370              bail!("Dropping connection request from '{listener_addr}' (untrusted)");
371          }
372          // If the node is in trusted peers only mode, ensure the peer is explicitly trusted.
373          if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
374              bail!("Dropping connection request from '{listener_addr}' (untrusted)");
375          }
376          Ok(())
377      }
378  
379      /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
380      fn verify_challenge_request(
381          &self,
382          peer_addr: SocketAddr,
383          message: &ChallengeRequest<N>,
384      ) -> Option<DisconnectReason> {
385          // Retrieve the components of the challenge request.
386          let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref alphaos_sha } = message;
387          log_repo_sha_comparison(peer_addr, alphaos_sha, Self::OWNER);
388  
389          // Ensure the message protocol version is not outdated.
390          if !self.is_valid_message_version(version) {
391              warn!("Dropping '{peer_addr}' on version {version} (outdated)");
392              return Some(DisconnectReason::OutdatedClientVersion);
393          }
394  
395          // Ensure there are no validators connected with the given Alpha address.
396          if self.node_type() == NodeType::Validator
397              && node_type == NodeType::Validator
398              && self.is_connected_address(address)
399          {
400              warn!("Dropping '{peer_addr}' for being already connected ({address})");
401              return Some(DisconnectReason::NoReasonGiven);
402          }
403  
404          None
405      }
406  
407      /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
408      #[allow(clippy::too_many_arguments)]
409      async fn verify_challenge_response(
410          &self,
411          peer_addr: SocketAddr,
412          peer_address: Address<N>,
413          peer_node_type: NodeType,
414          response: ChallengeResponse<N>,
415          expected_genesis_header: Header<N>,
416          expected_restrictions_id: Field<N>,
417          expected_nonce: u64,
418      ) -> Option<DisconnectReason> {
419          // Retrieve the components of the challenge response.
420          let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
421  
422          // Check our current genesis state to determine verification behavior.
423          match self.genesis_state() {
424              crate::GenesisState::Known(_) => {
425                  // We have a known genesis - verify exact match (original behavior).
426                  if genesis_header != expected_genesis_header {
427                      warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
428                      return Some(DisconnectReason::InvalidChallengeResponse);
429                  }
430              }
431              crate::GenesisState::Unknown => {
432                  // We don't have genesis yet - accept peer's genesis and mark as pending.
433                  info!("Accepting genesis from peer '{peer_addr}' (height: {})", genesis_header.height());
434                  self.set_genesis_pending(genesis_header);
435                  // Note: Genesis fetch will be triggered by the caller after handshake succeeds.
436              }
437              crate::GenesisState::Pending(pending_header) => {
438                  // We're fetching genesis - verify peer agrees with what we're fetching.
439                  if genesis_header != pending_header {
440                      warn!("Handshake with '{peer_addr}' failed (peer disagrees on pending genesis)");
441                      return Some(DisconnectReason::InvalidChallengeResponse);
442                  }
443                  info!("Peer '{peer_addr}' confirms pending genesis (height: {})", genesis_header.height());
444              }
445          }
446          // Verify the restrictions ID.
447          if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
448              warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
449              return Some(DisconnectReason::InvalidChallengeResponse);
450          }
451          // Perform the deferred non-blocking deserialization of the signature.
452          let Ok(signature) = signature.deserialize().await else {
453              warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
454              return Some(DisconnectReason::InvalidChallengeResponse);
455          };
456          // Verify the signature.
457          if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
458              warn!("Handshake with '{peer_addr}' failed (invalid signature)");
459              return Some(DisconnectReason::InvalidChallengeResponse);
460          }
461          None
462      }
463  }