test_peer.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::common::CurrentNetwork; 20 use alphaos_node_bft_events::{Event, EventCodec}; 21 22 use std::{ 23 io, 24 net::{IpAddr, Ipv4Addr, SocketAddr}, 25 time::Duration, 26 }; 27 28 use pea2pea::{ 29 protocols::{Handshake, OnDisconnect, Reading, Writing}, 30 Config, 31 Connection, 32 ConnectionSide, 33 Node, 34 Pea2Pea, 35 }; 36 37 use tokio::{ 38 sync::mpsc::{self, Receiver, Sender}, 39 time::timeout, 40 }; 41 42 pub struct TestPeer { 43 inner_node: InnerNode, 44 inbound_rx: Receiver<(SocketAddr, Event<CurrentNetwork>)>, 45 } 46 47 #[derive(Clone)] 48 struct InnerNode { 49 // The pea2pea node instance. 50 node: Node, 51 // The inbound channel sender, used to consolidate inbound messages into a single queue so they 52 // can be read in order in tests. 53 inbound_tx: Sender<(SocketAddr, Event<CurrentNetwork>)>, 54 } 55 56 impl TestPeer { 57 pub async fn new() -> Self { 58 let (tx, rx) = mpsc::channel(100); 59 let inner_node = InnerNode { 60 node: Node::new(Config { 61 max_connections: 200, 62 listener_addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)), 63 ..Default::default() 64 }), 65 inbound_tx: tx, 66 }; 67 68 inner_node.enable_handshake().await; 69 inner_node.enable_reading().await; 70 inner_node.enable_writing().await; 71 inner_node.enable_disconnect().await; 72 inner_node.node().start_listening().await.unwrap(); 73 74 Self { inner_node, inbound_rx: rx } 75 } 76 77 pub fn listening_addr(&self) -> SocketAddr { 78 self.inner_node.node().listening_addr().expect("addr should be present") 79 } 80 81 pub async fn connect(&self, target: SocketAddr) -> io::Result<()> { 82 self.inner_node.node().connect(target).await?; 83 Ok(()) 84 } 85 86 // Note: the codec doesn't actually support sending bytes post-handshake, perhaps this should 87 // be relaxed by making a test-only codec in future. 88 pub fn unicast(&self, target: SocketAddr, message: Event<CurrentNetwork>) -> io::Result<()> { 89 self.inner_node.unicast(target, message)?; 90 Ok(()) 91 } 92 93 pub async fn recv(&mut self) -> (SocketAddr, Event<CurrentNetwork>) { 94 match self.inbound_rx.recv().await { 95 Some(message) => message, 96 None => panic!("all senders dropped!"), 97 } 98 } 99 100 pub async fn recv_timeout(&mut self, duration: Duration) -> (SocketAddr, Event<CurrentNetwork>) { 101 match timeout(duration, self.recv()).await { 102 Ok(message) => message, 103 _ => panic!("timed out waiting for message"), 104 } 105 } 106 } 107 108 impl Pea2Pea for InnerNode { 109 fn node(&self) -> &Node { 110 &self.node 111 } 112 } 113 114 impl Handshake for InnerNode { 115 // Set the timeout on the test peer to be longer than the gateway's timeout. 116 const TIMEOUT_MS: u64 = 10_000; 117 118 async fn perform_handshake(&self, connection: Connection) -> io::Result<Connection> { 119 // Don't perform the Alpha handshake so we can test the edge cases fully. 120 Ok(connection) 121 } 122 } 123 124 impl Writing for InnerNode { 125 type Codec = EventCodec<CurrentNetwork>; 126 type Message = Event<CurrentNetwork>; 127 128 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 129 Default::default() 130 } 131 } 132 133 impl Reading for InnerNode { 134 type Codec = EventCodec<CurrentNetwork>; 135 type Message = Event<CurrentNetwork>; 136 137 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 138 Default::default() 139 } 140 141 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { 142 self.inbound_tx 143 .send((peer_addr, message)) 144 .await 145 .map_err(|_| io::Error::other("failed to send message to test peer, all receivers have been dropped")) 146 } 147 } 148 149 impl OnDisconnect for InnerNode { 150 async fn on_disconnect(&self, _peer_addr: SocketAddr) {} 151 }