/ node / router / src / writing.rs
writing.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use super::*;
 17  
 18  use alphaos_node_sync_locators::BlockLocators;
 19  use alphaos_node_tcp::protocols::Writing;
 20  
 21  use std::io;
 22  use tokio::sync::oneshot;
 23  
 24  impl<N: Network> Router<N> {
 25      /// Sends a "Ping" message to the given peer.
 26      ///
 27      /// Returns false if the peer does not exist or disconnected.
 28      #[must_use]
 29      pub fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) -> bool {
 30          let result = self.send(peer_ip, Message::Ping(messages::Ping::new(self.node_type(), block_locators)));
 31          result.is_some()
 32      }
 33  
 34      /// Sends the given message to specified peer.
 35      ///
 36      /// This function returns as soon as the message is queued to be sent,
 37      /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
 38      /// which can be used to determine when and whether the message has been delivered.
 39      ///
 40      /// This returns None, if the peer does not exist or disconnected.
 41      pub fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
 42          // Determine whether to send the message.
 43          if !self.can_send(peer_ip, &message) {
 44              return None;
 45          }
 46          // Resolve the listener IP to the (ambiguous) peer address.
 47          let peer_addr = match self.resolve_to_ambiguous(peer_ip) {
 48              Some(peer_addr) => peer_addr,
 49              None => {
 50                  warn!("Unable to resolve the listener IP address '{peer_ip}'");
 51                  return None;
 52              }
 53          };
 54          // If the message type is a block request, add it to the cache.
 55          if let Message::BlockRequest(request) = message {
 56              self.cache.insert_outbound_block_request(peer_ip, request);
 57          }
 58          // If the message type is a puzzle request, increment the cache.
 59          if matches!(message, Message::PuzzleRequest(_)) {
 60              self.cache.increment_outbound_puzzle_requests(peer_ip);
 61          }
 62          // If the message type is a peer request, increment the cache.
 63          if matches!(message, Message::PeerRequest(_)) {
 64              self.cache.increment_outbound_peer_requests(peer_ip);
 65          }
 66          // Retrieve the message name.
 67          let name = message.name();
 68          // Send the message to the peer.
 69          trace!("Sending '{name}' to '{peer_ip}'");
 70          let result = self.unicast(peer_addr, message);
 71          // If the message was unable to be sent, disconnect.
 72          if let Err(e) = &result {
 73              warn!("Failed to send '{name}' to '{peer_ip}': {e}");
 74              debug!("Disconnecting from '{peer_ip}' (unable to send)");
 75              self.disconnect(peer_ip);
 76          }
 77          result.ok()
 78      }
 79  
 80      /// Returns `true` if the message can be sent.
 81      fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool {
 82          // Ensure the peer is connected before sending.
 83          if !self.is_connected(peer_ip) {
 84              warn!("Attempted to send to a non-connected peer {peer_ip}");
 85              return false;
 86          }
 87          // Determine whether to send the message.
 88          match message {
 89              Message::UnconfirmedSolution(message) => {
 90                  // Update the timestamp for the unconfirmed solution.
 91                  let seen_before = self.cache.insert_outbound_solution(peer_ip, message.solution_id).is_some();
 92                  // Determine whether to send the solution.
 93                  !seen_before
 94              }
 95              Message::UnconfirmedTransaction(message) => {
 96                  // Update the timestamp for the unconfirmed transaction.
 97                  let seen_before = self.cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some();
 98                  // Determine whether to send the transaction.
 99                  !seen_before
100              }
101              // For all other message types, return `true`.
102              _ => true,
103          }
104      }
105  }
106  #[async_trait]
107  impl<N: Network> Writing for Router<N> {
108      type Codec = MessageCodec<N>;
109      type Message = Message<N>;
110  
111      /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
112      /// The `side` parameter indicates the connection side **from the node's perspective**.
113      fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
114          Default::default()
115      }
116  }