writing.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use super::*; 20 21 use alphaos_node_sync_locators::BlockLocators; 22 use alphaos_node_tcp::protocols::Writing; 23 24 use std::io; 25 use tokio::sync::oneshot; 26 27 impl<N: Network> Router<N> { 28 /// Sends a "Ping" message to the given peer. 29 /// 30 /// Returns false if the peer does not exist or disconnected. 31 #[must_use] 32 pub fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) -> bool { 33 let result = self.send(peer_ip, Message::Ping(messages::Ping::new(self.node_type(), block_locators))); 34 result.is_some() 35 } 36 37 /// Sends the given message to specified peer. 38 /// 39 /// This function returns as soon as the message is queued to be sent, 40 /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] 41 /// which can be used to determine when and whether the message has been delivered. 42 /// 43 /// This returns None, if the peer does not exist or disconnected. 44 pub fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> { 45 // Determine whether to send the message. 46 if !self.can_send(peer_ip, &message) { 47 return None; 48 } 49 // Resolve the listener IP to the (ambiguous) peer address. 50 let peer_addr = match self.resolve_to_ambiguous(peer_ip) { 51 Some(peer_addr) => peer_addr, 52 None => { 53 warn!("Unable to resolve the listener IP address '{peer_ip}'"); 54 return None; 55 } 56 }; 57 // If the message type is a block request, add it to the cache. 58 if let Message::BlockRequest(request) = message { 59 self.cache.insert_outbound_block_request(peer_ip, request); 60 } 61 // If the message type is a puzzle request, increment the cache. 62 if matches!(message, Message::PuzzleRequest(_)) { 63 self.cache.increment_outbound_puzzle_requests(peer_ip); 64 } 65 // If the message type is a peer request, increment the cache. 66 if matches!(message, Message::PeerRequest(_)) { 67 self.cache.increment_outbound_peer_requests(peer_ip); 68 } 69 // Retrieve the message name. 70 let name = message.name(); 71 // Send the message to the peer. 72 trace!("Sending '{name}' to '{peer_ip}'"); 73 let result = self.unicast(peer_addr, message); 74 // If the message was unable to be sent, disconnect. 75 if let Err(e) = &result { 76 warn!("Failed to send '{name}' to '{peer_ip}': {e}"); 77 debug!("Disconnecting from '{peer_ip}' (unable to send)"); 78 self.disconnect(peer_ip); 79 } 80 result.ok() 81 } 82 83 /// Returns `true` if the message can be sent. 84 fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool { 85 // Ensure the peer is connected before sending. 86 if !self.is_connected(peer_ip) { 87 warn!("Attempted to send to a non-connected peer {peer_ip}"); 88 return false; 89 } 90 // Determine whether to send the message. 91 match message { 92 Message::UnconfirmedSolution(message) => { 93 // Update the timestamp for the unconfirmed solution. 94 let seen_before = self.cache.insert_outbound_solution(peer_ip, message.solution_id).is_some(); 95 // Determine whether to send the solution. 96 !seen_before 97 } 98 Message::UnconfirmedTransaction(message) => { 99 // Update the timestamp for the unconfirmed transaction. 100 let seen_before = self.cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some(); 101 // Determine whether to send the transaction. 102 !seen_before 103 } 104 // For all other message types, return `true`. 105 _ => true, 106 } 107 } 108 } 109 #[async_trait] 110 impl<N: Network> Writing for Router<N> { 111 type Codec = MessageCodec<N>; 112 type Message = Message<N>; 113 114 /// Creates an [`Encoder`] used to write the outbound messages to the target stream. 115 /// The `side` parameter indicates the connection side **from the node's perspective**. 116 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 117 Default::default() 118 } 119 }