handshake.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use std::{io, time::Duration}; 20 21 use tokio::{ 22 io::{split, AsyncRead, AsyncWrite}, 23 net::TcpStream, 24 sync::{mpsc, oneshot}, 25 time::timeout, 26 }; 27 use tracing::*; 28 29 use crate::{ 30 protocols::{ProtocolHandler, ReturnableConnection}, 31 Connection, 32 P2P, 33 }; 34 35 /// Can be used to specify and enable network handshakes. Upon establishing a connection, both sides will 36 /// need to adhere to the specified handshake rules in order to finalize the connection and be able to send 37 /// or receive any messages. 38 #[async_trait::async_trait] 39 pub trait Handshake: P2P 40 where 41 Self: Clone + Send + Sync + 'static, 42 { 43 /// The maximum time allowed for a connection to perform a handshake before it is rejected. 44 /// 45 /// The default value is 3000ms. 46 const TIMEOUT_MS: u64 = 3_000; 47 48 /// Prepares the node to perform specified network handshakes. 49 async fn enable_handshake(&self) { 50 let (from_node_sender, mut from_node_receiver) = mpsc::unbounded_channel::<ReturnableConnection>(); 51 52 // use a channel to know when the handshake task is ready 53 let (tx, rx) = oneshot::channel(); 54 55 // spawn a background task dedicated to handling the handshakes 56 let self_clone = self.clone(); 57 let handshake_task = tokio::spawn(async move { 58 trace!(parent: self_clone.tcp().span(), "spawned the Handshake handler task"); 59 tx.send(()).unwrap(); // safe; the channel was just opened 60 61 while let Some((conn, result_sender)) = from_node_receiver.recv().await { 62 let addr = conn.addr(); 63 64 let node = self_clone.clone(); 65 tokio::spawn(async move { 66 debug!(parent: node.tcp().span(), "shaking hands with {} as the {:?}", addr, !conn.side()); 67 let result = timeout(Duration::from_millis(Self::TIMEOUT_MS), node.perform_handshake(conn)).await; 68 69 let ret = match result { 70 Ok(Ok(conn)) => { 71 debug!(parent: node.tcp().span(), "successfully handshaken with {}", addr); 72 Ok(conn) 73 } 74 Ok(Err(e)) => { 75 debug!(parent: node.tcp().span(), "handshake with {addr} failed: {e}"); 76 Err(e) 77 } 78 Err(_) => { 79 debug!(parent: node.tcp().span(), "handshake with {} timed out", addr); 80 Err(io::ErrorKind::TimedOut.into()) 81 } 82 }; 83 84 // return the Connection to the Tcp, resuming Tcp::adapt_stream 85 if result_sender.send(ret).is_err() { 86 unreachable!("couldn't return a Connection to the Tcp"); 87 } 88 }); 89 } 90 }); 91 let _ = rx.await; 92 self.tcp().tasks.lock().push(handshake_task); 93 94 // register the Handshake handler with the Tcp 95 let hdl = Box::new(ProtocolHandler(from_node_sender)); 96 assert!(self.tcp().protocols.handshake.set(hdl).is_ok(), "the Handshake protocol was enabled more than once!"); 97 } 98 99 /// Performs the handshake; temporarily assumes control of the [`Connection`] and returns it if the handshake is 100 /// successful. 101 async fn perform_handshake(&self, conn: Connection) -> io::Result<Connection>; 102 103 /// Borrows the full connection stream to be used in the implementation of [`Handshake::perform_handshake`]. 104 fn borrow_stream<'a>(&self, conn: &'a mut Connection) -> &'a mut TcpStream { 105 conn.stream.as_mut().unwrap() 106 } 107 108 /// Assumes full control of a connection's stream in the implementation of [`Handshake::perform_handshake`], by 109 /// the end of which it *must* be followed by [`Handshake::return_stream`]. 110 fn take_stream(&self, conn: &mut Connection) -> TcpStream { 111 conn.stream.take().unwrap() 112 } 113 114 /// This method only needs to be called if [`Handshake::take_stream`] had been called before; it is used to 115 /// return a (potentially modified) stream back to the applicable connection. 116 fn return_stream<T: AsyncRead + AsyncWrite + Send + Sync + 'static>(&self, conn: &mut Connection, stream: T) { 117 let (reader, writer) = split(stream); 118 conn.reader = Some(Box::new(reader)); 119 conn.writer = Some(Box::new(writer)); 120 } 121 }