handshake.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::{ 20 messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait}, 21 ConnectionMode, 22 NodeType, 23 PeerPoolHandling, 24 Router, 25 }; 26 use alphaos_node_network::{get_repo_commit_hash, log_repo_sha_comparison}; 27 use alphaos_node_tcp::{ConnectionSide, Tcp, P2P}; 28 use alphavm::{ 29 ledger::narwhal::Data, 30 prelude::{block::Header, error, io_error, Address, ConsensusVersion, Field, Network}, 31 }; 32 33 use anyhow::{bail, Result}; 34 use futures::SinkExt; 35 use rand::{rngs::OsRng, Rng}; 36 use std::{io, net::SocketAddr}; 37 use tokio::net::TcpStream; 38 use tokio_stream::StreamExt; 39 use tokio_util::codec::Framed; 40 41 impl<N: Network> P2P for Router<N> { 42 /// Returns a reference to the TCP instance. 43 fn tcp(&self) -> &Tcp { 44 &self.tcp 45 } 46 } 47 48 /// A macro unwrapping the expected handshake message or returning an error for unexpected messages. 49 #[macro_export] 50 macro_rules! expect_message { 51 ($msg_ty:path, $framed:expr, $peer_addr:expr) => {{ 52 use alphavm::utilities::io_error; 53 54 match $framed.try_next().await? { 55 // Received the expected message, proceed. 56 Some($msg_ty(data)) => { 57 trace!("Received '{}' from '{}'", data.name(), $peer_addr); 58 data 59 } 60 // Received a disconnect message, abort. 61 Some(Message::Disconnect($crate::messages::Disconnect { reason })) => { 62 return Err(io_error(format!("'{}' disconnected: {reason}", $peer_addr))); 63 } 64 // Received an unexpected message, abort. 65 Some(ty) => { 66 return Err(io_error(format!( 67 "'{}' did not follow the handshake protocol: received {:?} instead of {}", 68 $peer_addr, 69 ty.name(), 70 stringify!($msg_ty), 71 ))); 72 } 73 // Received nothing. 74 None => { 75 return Err(io_error(format!( 76 "the peer disconnected before sending {:?}, likely due to peer saturation or shutdown", 77 stringify!($msg_ty), 78 ))); 79 } 80 } 81 }}; 82 } 83 84 /// Send the given message to the peer. 85 async fn send<N: Network>( 86 framed: &mut Framed<&mut TcpStream, MessageCodec<N>>, 87 peer_addr: SocketAddr, 88 message: Message<N>, 89 ) -> io::Result<()> { 90 trace!("Sending '{}' to '{peer_addr}'", message.name()); 91 framed.send(message).await 92 } 93 94 impl<N: Network> Router<N> { 95 /// Executes the handshake protocol. 96 pub async fn handshake<'a>( 97 &'a self, 98 peer_addr: SocketAddr, 99 stream: &'a mut TcpStream, 100 peer_side: ConnectionSide, 101 genesis_header: Header<N>, 102 restrictions_id: Field<N>, 103 ) -> io::Result<Option<ChallengeRequest<N>>> { 104 // If this is an inbound connection, we log it, but don't know the listening address yet. 105 // Otherwise, we can immediately register the listening address. 106 let mut listener_addr = if peer_side == ConnectionSide::Initiator { 107 debug!("Received a connection request from '{peer_addr}'"); 108 None 109 } else { 110 debug!("Shaking hands with '{peer_addr}'..."); 111 Some(peer_addr) 112 }; 113 114 // Check (or impose) IP-level bans. 115 #[cfg(not(feature = "test"))] 116 if !self.is_dev() && peer_side == ConnectionSide::Initiator { 117 // If the IP is already banned reject the connection. 118 if self.is_ip_banned(peer_addr.ip()) { 119 trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip()); 120 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); 121 } 122 123 let num_attempts = 124 self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS); 125 126 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); 127 if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS { 128 self.update_ip_ban(peer_addr.ip()); 129 trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip()); 130 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); 131 } 132 } 133 134 // Perform the handshake; we pass on a mutable reference to listener_addr in case the process is broken at any point in time. 135 let handshake_result = if peer_side == ConnectionSide::Responder { 136 self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await 137 } else { 138 self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await 139 }; 140 141 if let Some(addr) = listener_addr { 142 match handshake_result { 143 Ok(Some(ref cr)) => { 144 if let Some(peer) = self.peer_pool.write().get_mut(&addr) { 145 self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address)); 146 peer.upgrade_to_connected( 147 peer_addr, 148 cr.listener_port, 149 cr.address, 150 cr.node_type, 151 cr.version, 152 ConnectionMode::Router, 153 ); 154 } 155 #[cfg(feature = "metrics")] 156 self.update_metrics(); 157 debug!("Completed the handshake with '{peer_addr}'"); 158 } 159 Ok(None) => { 160 return Err(error(format!("Duplicate handshake attempt with '{addr}'"))); 161 } 162 Err(_) => { 163 if let Some(peer) = self.peer_pool.write().get_mut(&addr) { 164 // The peer may only be downgraded if it's a ConnectingPeer. 165 if peer.is_connecting() { 166 peer.downgrade_to_candidate(addr); 167 } 168 } 169 } 170 } 171 } 172 173 handshake_result 174 } 175 176 /// The connection initiator side of the handshake. 177 async fn handshake_inner_initiator<'a>( 178 &'a self, 179 peer_addr: SocketAddr, 180 stream: &'a mut TcpStream, 181 genesis_header: Header<N>, 182 restrictions_id: Field<N>, 183 ) -> io::Result<Option<ChallengeRequest<N>>> { 184 // Introduce the peer into the peer pool. 185 if !self.add_connecting_peer(peer_addr) { 186 // Return early if already being connected to. 187 return Ok(None); 188 } 189 190 // Construct the stream. 191 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake()); 192 193 // Initialize an RNG. 194 let rng = &mut OsRng; 195 196 // Determine the alphaos SHA to send to the peer. 197 let current_block_height = self.ledger.latest_block_height(); 198 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap(); 199 let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) { 200 (true, Some(sha)) => Some(sha), 201 _ => None, 202 }; 203 204 /* Step 1: Send the challenge request. */ 205 206 // Sample a random nonce. 207 let our_nonce = rng.r#gen(); 208 // Send a challenge request to the peer. 209 let our_request = 210 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, alphaos_sha); 211 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?; 212 213 /* Step 2: Receive the peer's challenge response followed by the challenge request. */ 214 215 // Listen for the challenge response message. 216 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr); 217 // Listen for the challenge request message. 218 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr); 219 220 // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort. 221 if let Some(reason) = self 222 .verify_challenge_response( 223 peer_addr, 224 peer_request.address, 225 peer_request.node_type, 226 peer_response, 227 genesis_header, 228 restrictions_id, 229 our_nonce, 230 ) 231 .await 232 { 233 send(&mut framed, peer_addr, reason.into()).await?; 234 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 235 } 236 // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort. 237 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) { 238 send(&mut framed, peer_addr, reason.into()).await?; 239 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 240 } 241 242 /* Step 3: Send the challenge response. */ 243 244 let response_nonce: u64 = rng.r#gen(); 245 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat(); 246 // Sign the counterparty nonce. 247 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else { 248 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'"))); 249 }; 250 // Send the challenge response. 251 let our_response = ChallengeResponse { 252 genesis_header, 253 restrictions_id, 254 signature: Data::Object(our_signature), 255 nonce: response_nonce, 256 }; 257 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?; 258 259 Ok(Some(peer_request)) 260 } 261 262 /// The connection responder side of the handshake. 263 async fn handshake_inner_responder<'a>( 264 &'a self, 265 peer_addr: SocketAddr, 266 listener_addr: &mut Option<SocketAddr>, 267 stream: &'a mut TcpStream, 268 genesis_header: Header<N>, 269 restrictions_id: Field<N>, 270 ) -> io::Result<Option<ChallengeRequest<N>>> { 271 // Construct the stream. 272 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake()); 273 274 /* Step 1: Receive the challenge request. */ 275 276 // Listen for the challenge request message. 277 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr); 278 279 // Determine the alphaos SHA to send to the peer. 280 let current_block_height = self.ledger.latest_block_height(); 281 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap(); 282 let alphaos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) { 283 (true, Some(sha)) => Some(sha), 284 _ => None, 285 }; 286 287 // Obtain the peer's listening address. 288 *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port)); 289 let listener_addr = listener_addr.unwrap(); 290 291 // Knowing the peer's listening address, ensure it is allowed to connect. 292 if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) { 293 return Err(error(format!("{forbidden_message}"))); 294 } 295 296 // Introduce the peer into the peer pool. 297 if !self.add_connecting_peer(listener_addr) { 298 // Return early if already being connected to. 299 return Ok(None); 300 } 301 302 // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort. 303 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) { 304 send(&mut framed, peer_addr, reason.into()).await?; 305 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 306 } 307 308 /* Step 2: Send the challenge response followed by own challenge request. */ 309 310 // Initialize an RNG. 311 let rng = &mut OsRng; 312 313 // Sign the counterparty nonce. 314 let response_nonce: u64 = rng.r#gen(); 315 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat(); 316 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else { 317 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'"))); 318 }; 319 // Send the challenge response. 320 let our_response = ChallengeResponse { 321 genesis_header, 322 restrictions_id, 323 signature: Data::Object(our_signature), 324 nonce: response_nonce, 325 }; 326 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?; 327 328 // Sample a random nonce. 329 let our_nonce = rng.r#gen(); 330 // Send the challenge request. 331 let our_request = 332 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, alphaos_sha); 333 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?; 334 335 /* Step 3: Receive the challenge response. */ 336 337 // Listen for the challenge response message. 338 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr); 339 // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort. 340 if let Some(reason) = self 341 .verify_challenge_response( 342 peer_addr, 343 peer_request.address, 344 peer_request.node_type, 345 peer_response, 346 genesis_header, 347 restrictions_id, 348 our_nonce, 349 ) 350 .await 351 { 352 send(&mut framed, peer_addr, reason.into()).await?; 353 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}"))); 354 } 355 356 Ok(Some(peer_request)) 357 } 358 359 /// Ensure the peer is allowed to connect. 360 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> { 361 // Ensure that it's not a self-connect attempt. 362 if self.is_local_ip(listener_addr) { 363 bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)"); 364 } 365 // As a validator, only accept connections from trusted peers and bootstrap nodes. 366 if self.node_type() == NodeType::Validator 367 && !self.is_trusted(listener_addr) 368 && !crate::bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr) 369 { 370 bail!("Dropping connection request from '{listener_addr}' (untrusted)"); 371 } 372 // If the node is in trusted peers only mode, ensure the peer is explicitly trusted. 373 if self.trusted_peers_only() && !self.is_trusted(listener_addr) { 374 bail!("Dropping connection request from '{listener_addr}' (untrusted)"); 375 } 376 Ok(()) 377 } 378 379 /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid. 380 fn verify_challenge_request( 381 &self, 382 peer_addr: SocketAddr, 383 message: &ChallengeRequest<N>, 384 ) -> Option<DisconnectReason> { 385 // Retrieve the components of the challenge request. 386 let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref alphaos_sha } = message; 387 log_repo_sha_comparison(peer_addr, alphaos_sha, Self::OWNER); 388 389 // Ensure the message protocol version is not outdated. 390 if !self.is_valid_message_version(version) { 391 warn!("Dropping '{peer_addr}' on version {version} (outdated)"); 392 return Some(DisconnectReason::OutdatedClientVersion); 393 } 394 395 // Ensure there are no validators connected with the given Alpha address. 396 if self.node_type() == NodeType::Validator 397 && node_type == NodeType::Validator 398 && self.is_connected_address(address) 399 { 400 warn!("Dropping '{peer_addr}' for being already connected ({address})"); 401 return Some(DisconnectReason::NoReasonGiven); 402 } 403 404 None 405 } 406 407 /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid. 408 #[allow(clippy::too_many_arguments)] 409 async fn verify_challenge_response( 410 &self, 411 peer_addr: SocketAddr, 412 peer_address: Address<N>, 413 peer_node_type: NodeType, 414 response: ChallengeResponse<N>, 415 expected_genesis_header: Header<N>, 416 expected_restrictions_id: Field<N>, 417 expected_nonce: u64, 418 ) -> Option<DisconnectReason> { 419 // Retrieve the components of the challenge response. 420 let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response; 421 422 // Check our current genesis state to determine verification behavior. 423 match self.genesis_state() { 424 crate::GenesisState::Known(_) => { 425 // We have a known genesis - verify exact match (original behavior). 426 if genesis_header != expected_genesis_header { 427 warn!("Handshake with '{peer_addr}' failed (incorrect block header)"); 428 return Some(DisconnectReason::InvalidChallengeResponse); 429 } 430 } 431 crate::GenesisState::Unknown => { 432 // We don't have genesis yet - accept peer's genesis and mark as pending. 433 info!("Accepting genesis from peer '{peer_addr}' (height: {})", genesis_header.height()); 434 self.set_genesis_pending(genesis_header); 435 // Note: Genesis fetch will be triggered by the caller after handshake succeeds. 436 } 437 crate::GenesisState::Pending(pending_header) => { 438 // We're fetching genesis - verify peer agrees with what we're fetching. 439 if genesis_header != pending_header { 440 warn!("Handshake with '{peer_addr}' failed (peer disagrees on pending genesis)"); 441 return Some(DisconnectReason::InvalidChallengeResponse); 442 } 443 info!("Peer '{peer_addr}' confirms pending genesis (height: {})", genesis_header.height()); 444 } 445 } 446 // Verify the restrictions ID. 447 if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id { 448 warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)"); 449 return Some(DisconnectReason::InvalidChallengeResponse); 450 } 451 // Perform the deferred non-blocking deserialization of the signature. 452 let Ok(signature) = signature.deserialize().await else { 453 warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)"); 454 return Some(DisconnectReason::InvalidChallengeResponse); 455 }; 456 // Verify the signature. 457 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) { 458 warn!("Handshake with '{peer_addr}' failed (invalid signature)"); 459 return Some(DisconnectReason::InvalidChallengeResponse); 460 } 461 None 462 } 463 }