/ node / bft / tests / common / test_peer.rs
test_peer.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use crate::common::CurrentNetwork;
 20  use alphaos_node_bft_events::{Event, EventCodec};
 21  
 22  use std::{
 23      io,
 24      net::{IpAddr, Ipv4Addr, SocketAddr},
 25      time::Duration,
 26  };
 27  
 28  use pea2pea::{
 29      protocols::{Handshake, OnDisconnect, Reading, Writing},
 30      Config,
 31      Connection,
 32      ConnectionSide,
 33      Node,
 34      Pea2Pea,
 35  };
 36  
 37  use tokio::{
 38      sync::mpsc::{self, Receiver, Sender},
 39      time::timeout,
 40  };
 41  
 42  pub struct TestPeer {
 43      inner_node: InnerNode,
 44      inbound_rx: Receiver<(SocketAddr, Event<CurrentNetwork>)>,
 45  }
 46  
 47  #[derive(Clone)]
 48  struct InnerNode {
 49      // The pea2pea node instance.
 50      node: Node,
 51      // The inbound channel sender, used to consolidate inbound messages into a single queue so they
 52      // can be read in order in tests.
 53      inbound_tx: Sender<(SocketAddr, Event<CurrentNetwork>)>,
 54  }
 55  
 56  impl TestPeer {
 57      pub async fn new() -> Self {
 58          let (tx, rx) = mpsc::channel(100);
 59          let inner_node = InnerNode {
 60              node: Node::new(Config {
 61                  max_connections: 200,
 62                  listener_addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)),
 63                  ..Default::default()
 64              }),
 65              inbound_tx: tx,
 66          };
 67  
 68          inner_node.enable_handshake().await;
 69          inner_node.enable_reading().await;
 70          inner_node.enable_writing().await;
 71          inner_node.enable_disconnect().await;
 72          inner_node.node().start_listening().await.unwrap();
 73  
 74          Self { inner_node, inbound_rx: rx }
 75      }
 76  
 77      pub fn listening_addr(&self) -> SocketAddr {
 78          self.inner_node.node().listening_addr().expect("addr should be present")
 79      }
 80  
 81      pub async fn connect(&self, target: SocketAddr) -> io::Result<()> {
 82          self.inner_node.node().connect(target).await?;
 83          Ok(())
 84      }
 85  
 86      // Note: the codec doesn't actually support sending bytes post-handshake, perhaps this should
 87      // be relaxed by making a test-only codec in future.
 88      pub fn unicast(&self, target: SocketAddr, message: Event<CurrentNetwork>) -> io::Result<()> {
 89          self.inner_node.unicast(target, message)?;
 90          Ok(())
 91      }
 92  
 93      pub async fn recv(&mut self) -> (SocketAddr, Event<CurrentNetwork>) {
 94          match self.inbound_rx.recv().await {
 95              Some(message) => message,
 96              None => panic!("all senders dropped!"),
 97          }
 98      }
 99  
100      pub async fn recv_timeout(&mut self, duration: Duration) -> (SocketAddr, Event<CurrentNetwork>) {
101          match timeout(duration, self.recv()).await {
102              Ok(message) => message,
103              _ => panic!("timed out waiting for message"),
104          }
105      }
106  }
107  
108  impl Pea2Pea for InnerNode {
109      fn node(&self) -> &Node {
110          &self.node
111      }
112  }
113  
114  impl Handshake for InnerNode {
115      // Set the timeout on the test peer to be longer than the gateway's timeout.
116      const TIMEOUT_MS: u64 = 10_000;
117  
118      async fn perform_handshake(&self, connection: Connection) -> io::Result<Connection> {
119          // Don't perform the Alpha handshake so we can test the edge cases fully.
120          Ok(connection)
121      }
122  }
123  
124  impl Writing for InnerNode {
125      type Codec = EventCodec<CurrentNetwork>;
126      type Message = Event<CurrentNetwork>;
127  
128      fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
129          Default::default()
130      }
131  }
132  
133  impl Reading for InnerNode {
134      type Codec = EventCodec<CurrentNetwork>;
135      type Message = Event<CurrentNetwork>;
136  
137      fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
138          Default::default()
139      }
140  
141      async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
142          self.inbound_tx
143              .send((peer_addr, message))
144              .await
145              .map_err(|_| io::Error::other("failed to send message to test peer, all receivers have been dropped"))
146      }
147  }
148  
149  impl OnDisconnect for InnerNode {
150      async fn on_disconnect(&self, _peer_addr: SocketAddr) {}
151  }