writing.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use super::*; 17 18 use alphaos_node_sync_locators::BlockLocators; 19 use alphaos_node_tcp::protocols::Writing; 20 21 use std::io; 22 use tokio::sync::oneshot; 23 24 impl<N: Network> Router<N> { 25 /// Sends a "Ping" message to the given peer. 26 /// 27 /// Returns false if the peer does not exist or disconnected. 28 #[must_use] 29 pub fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) -> bool { 30 let result = self.send(peer_ip, Message::Ping(messages::Ping::new(self.node_type(), block_locators))); 31 result.is_some() 32 } 33 34 /// Sends the given message to specified peer. 35 /// 36 /// This function returns as soon as the message is queued to be sent, 37 /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] 38 /// which can be used to determine when and whether the message has been delivered. 39 /// 40 /// This returns None, if the peer does not exist or disconnected. 41 pub fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> { 42 // Determine whether to send the message. 43 if !self.can_send(peer_ip, &message) { 44 return None; 45 } 46 // Resolve the listener IP to the (ambiguous) peer address. 47 let peer_addr = match self.resolve_to_ambiguous(peer_ip) { 48 Some(peer_addr) => peer_addr, 49 None => { 50 warn!("Unable to resolve the listener IP address '{peer_ip}'"); 51 return None; 52 } 53 }; 54 // If the message type is a block request, add it to the cache. 55 if let Message::BlockRequest(request) = message { 56 self.cache.insert_outbound_block_request(peer_ip, request); 57 } 58 // If the message type is a puzzle request, increment the cache. 59 if matches!(message, Message::PuzzleRequest(_)) { 60 self.cache.increment_outbound_puzzle_requests(peer_ip); 61 } 62 // If the message type is a peer request, increment the cache. 63 if matches!(message, Message::PeerRequest(_)) { 64 self.cache.increment_outbound_peer_requests(peer_ip); 65 } 66 // Retrieve the message name. 67 let name = message.name(); 68 // Send the message to the peer. 69 trace!("Sending '{name}' to '{peer_ip}'"); 70 let result = self.unicast(peer_addr, message); 71 // If the message was unable to be sent, disconnect. 72 if let Err(e) = &result { 73 warn!("Failed to send '{name}' to '{peer_ip}': {e}"); 74 debug!("Disconnecting from '{peer_ip}' (unable to send)"); 75 self.disconnect(peer_ip); 76 } 77 result.ok() 78 } 79 80 /// Returns `true` if the message can be sent. 81 fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool { 82 // Ensure the peer is connected before sending. 83 if !self.is_connected(peer_ip) { 84 warn!("Attempted to send to a non-connected peer {peer_ip}"); 85 return false; 86 } 87 // Determine whether to send the message. 88 match message { 89 Message::UnconfirmedSolution(message) => { 90 // Update the timestamp for the unconfirmed solution. 91 let seen_before = self.cache.insert_outbound_solution(peer_ip, message.solution_id).is_some(); 92 // Determine whether to send the solution. 93 !seen_before 94 } 95 Message::UnconfirmedTransaction(message) => { 96 // Update the timestamp for the unconfirmed transaction. 97 let seen_before = self.cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some(); 98 // Determine whether to send the transaction. 99 !seen_before 100 } 101 // For all other message types, return `true`. 102 _ => true, 103 } 104 } 105 } 106 #[async_trait] 107 impl<N: Network> Writing for Router<N> { 108 type Codec = MessageCodec<N>; 109 type Message = Message<N>; 110 111 /// Creates an [`Encoder`] used to write the outbound messages to the target stream. 112 /// The `side` parameter indicates the connection side **from the node's perspective**. 113 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 114 Default::default() 115 } 116 }