/ node / tcp / src / protocols / handshake.rs
handshake.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use std::{io, time::Duration};
 20  
 21  use tokio::{
 22      io::{split, AsyncRead, AsyncWrite},
 23      net::TcpStream,
 24      sync::{mpsc, oneshot},
 25      time::timeout,
 26  };
 27  use tracing::*;
 28  
 29  use crate::{
 30      protocols::{ProtocolHandler, ReturnableConnection},
 31      Connection,
 32      P2P,
 33  };
 34  
 35  /// Can be used to specify and enable network handshakes. Upon establishing a connection, both sides will
 36  /// need to adhere to the specified handshake rules in order to finalize the connection and be able to send
 37  /// or receive any messages.
 38  #[async_trait::async_trait]
 39  pub trait Handshake: P2P
 40  where
 41      Self: Clone + Send + Sync + 'static,
 42  {
 43      /// The maximum time allowed for a connection to perform a handshake before it is rejected.
 44      ///
 45      /// The default value is 3000ms.
 46      const TIMEOUT_MS: u64 = 3_000;
 47  
 48      /// Prepares the node to perform specified network handshakes.
 49      async fn enable_handshake(&self) {
 50          let (from_node_sender, mut from_node_receiver) = mpsc::unbounded_channel::<ReturnableConnection>();
 51  
 52          // use a channel to know when the handshake task is ready
 53          let (tx, rx) = oneshot::channel();
 54  
 55          // spawn a background task dedicated to handling the handshakes
 56          let self_clone = self.clone();
 57          let handshake_task = tokio::spawn(async move {
 58              trace!(parent: self_clone.tcp().span(), "spawned the Handshake handler task");
 59              tx.send(()).unwrap(); // safe; the channel was just opened
 60  
 61              while let Some((conn, result_sender)) = from_node_receiver.recv().await {
 62                  let addr = conn.addr();
 63  
 64                  let node = self_clone.clone();
 65                  tokio::spawn(async move {
 66                      debug!(parent: node.tcp().span(), "shaking hands with {} as the {:?}", addr, !conn.side());
 67                      let result = timeout(Duration::from_millis(Self::TIMEOUT_MS), node.perform_handshake(conn)).await;
 68  
 69                      let ret = match result {
 70                          Ok(Ok(conn)) => {
 71                              debug!(parent: node.tcp().span(), "successfully handshaken with {}", addr);
 72                              Ok(conn)
 73                          }
 74                          Ok(Err(e)) => {
 75                              debug!(parent: node.tcp().span(), "handshake with {addr} failed: {e}");
 76                              Err(e)
 77                          }
 78                          Err(_) => {
 79                              debug!(parent: node.tcp().span(), "handshake with {} timed out", addr);
 80                              Err(io::ErrorKind::TimedOut.into())
 81                          }
 82                      };
 83  
 84                      // return the Connection to the Tcp, resuming Tcp::adapt_stream
 85                      if result_sender.send(ret).is_err() {
 86                          unreachable!("couldn't return a Connection to the Tcp");
 87                      }
 88                  });
 89              }
 90          });
 91          let _ = rx.await;
 92          self.tcp().tasks.lock().push(handshake_task);
 93  
 94          // register the Handshake handler with the Tcp
 95          let hdl = Box::new(ProtocolHandler(from_node_sender));
 96          assert!(self.tcp().protocols.handshake.set(hdl).is_ok(), "the Handshake protocol was enabled more than once!");
 97      }
 98  
 99      /// Performs the handshake; temporarily assumes control of the [`Connection`] and returns it if the handshake is
100      /// successful.
101      async fn perform_handshake(&self, conn: Connection) -> io::Result<Connection>;
102  
103      /// Borrows the full connection stream to be used in the implementation of [`Handshake::perform_handshake`].
104      fn borrow_stream<'a>(&self, conn: &'a mut Connection) -> &'a mut TcpStream {
105          conn.stream.as_mut().unwrap()
106      }
107  
108      /// Assumes full control of a connection's stream in the implementation of [`Handshake::perform_handshake`], by
109      /// the end of which it *must* be followed by [`Handshake::return_stream`].
110      fn take_stream(&self, conn: &mut Connection) -> TcpStream {
111          conn.stream.take().unwrap()
112      }
113  
114      /// This method only needs to be called if [`Handshake::take_stream`] had been called before; it is used to
115      /// return a (potentially modified) stream back to the applicable connection.
116      fn return_stream<T: AsyncRead + AsyncWrite + Send + Sync + 'static>(&self, conn: &mut Connection, stream: T) {
117          let (reader, writer) = split(stream);
118          conn.reader = Some(Box::new(reader));
119          conn.writer = Some(Box::new(writer));
120      }
121  }