/ node / tcp / src / protocols / writing.rs
writing.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use std::{any::Any, collections::HashMap, io, net::SocketAddr, sync::Arc};
 20  
 21  use async_trait::async_trait;
 22  use futures_util::sink::SinkExt;
 23  #[cfg(feature = "locktick")]
 24  use locktick::parking_lot::RwLock;
 25  #[cfg(not(feature = "locktick"))]
 26  use parking_lot::RwLock;
 27  use tokio::{
 28      io::AsyncWrite,
 29      sync::{mpsc, oneshot},
 30  };
 31  use tokio_util::codec::{Encoder, FramedWrite};
 32  use tracing::*;
 33  
 34  #[cfg(doc)]
 35  use crate::{protocols::Handshake, Config, Tcp};
 36  use crate::{
 37      protocols::{Protocol, ProtocolHandler, ReturnableConnection},
 38      Connection,
 39      ConnectionSide,
 40      P2P,
 41  };
 42  
 43  type WritingSenders = Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<WrappedMessage>>>>;
 44  
 45  /// Can be used to specify and enable writing, i.e. sending outbound messages. If the [`Handshake`]
 46  /// protocol is enabled too, it goes into force only after the handshake has been concluded.
 47  #[async_trait]
 48  pub trait Writing: P2P
 49  where
 50      Self: Clone + Send + Sync + 'static,
 51  {
 52      /// The depth of per-connection queues used to send outbound messages; the greater it is, the more outbound
 53      /// messages the node can enqueue. Setting it to a large value is not recommended, as doing it might
 54      /// obscure potential issues with your implementation (like slow serialization) or network.
 55      ///
 56      /// The default value is 1024.
 57      fn message_queue_depth(&self) -> usize {
 58          1024
 59      }
 60  
 61      /// The type of the outbound messages; unless their serialization is expensive and the message
 62      /// is broadcasted (in which case it would get serialized multiple times), serialization should
 63      /// be done in the implementation of [`Self::Codec`].
 64      type Message: Send;
 65  
 66      /// The user-supplied [`Encoder`] used to write outbound messages to the target stream.
 67      type Codec: Encoder<Self::Message, Error = io::Error> + Send;
 68  
 69      /// Prepares the node to send messages.
 70      async fn enable_writing(&self) {
 71          let (conn_sender, mut conn_receiver) = mpsc::unbounded_channel();
 72  
 73          // the conn_senders are used to send messages from the Tcp to individual connections
 74          let conn_senders: WritingSenders = Default::default();
 75          // procure a clone to create the WritingHandler with
 76          let senders = conn_senders.clone();
 77  
 78          // use a channel to know when the writing task is ready
 79          let (tx_writing, rx_writing) = oneshot::channel();
 80  
 81          // the task spawning tasks sending messages to all the streams
 82          let self_clone = self.clone();
 83          let writing_task = tokio::spawn(async move {
 84              trace!(parent: self_clone.tcp().span(), "spawned the Writing handler task");
 85              tx_writing.send(()).unwrap(); // safe; the channel was just opened
 86  
 87              // these objects are sent from `Tcp::adapt_stream`
 88              while let Some(returnable_conn) = conn_receiver.recv().await {
 89                  self_clone.handle_new_connection(returnable_conn, &conn_senders).await;
 90              }
 91          });
 92          let _ = rx_writing.await;
 93          self.tcp().tasks.lock().push(writing_task);
 94  
 95          // register the WritingHandler with the Tcp
 96          let hdl = Box::new(WritingHandler { handler: ProtocolHandler(conn_sender), senders });
 97          assert!(self.tcp().protocols.writing.set(hdl).is_ok(), "the Writing protocol was enabled more than once!");
 98      }
 99  
100      /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
101      /// The `side` param indicates the connection side **from the node's perspective**.
102      fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec;
103  
104      /// Sends the provided message to the specified [`SocketAddr`]. Returns as soon as the message is queued to
105      /// be sent, without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
106      /// which can be used to determine when and whether the message has been delivered.
107      ///
108      /// # Errors
109      ///
110      /// The following errors can be returned:
111      /// - [`io::ErrorKind::NotConnected`] if the node is not connected to the provided address
112      /// - [`io::ErrorKind::Other`] if the outbound message queue for this address is full
113      /// - [`io::ErrorKind::Unsupported`] if [`Writing::enable_writing`] hadn't been called yet
114      fn unicast(&self, addr: SocketAddr, message: Self::Message) -> io::Result<oneshot::Receiver<io::Result<()>>> {
115          // access the protocol handler
116          if let Some(handler) = self.tcp().protocols.writing.get() {
117              // find the message sender for the given address
118              if let Some(sender) = handler.senders.read().get(&addr).cloned() {
119                  let (msg, delivery) = WrappedMessage::new(Box::new(message));
120                  sender
121                      .try_send(msg)
122                      .map_err(|e| {
123                          error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e);
124                          self.tcp().stats().register_failure();
125                          io::ErrorKind::Other.into()
126                      })
127                      .map(|_| delivery)
128              } else {
129                  Err(io::ErrorKind::NotConnected.into())
130              }
131          } else {
132              Err(io::ErrorKind::Unsupported.into())
133          }
134      }
135  
136      /// Broadcasts the provided message to all connected peers. Returns as soon as the message is queued to
137      /// be sent to all the peers, without waiting for the actual delivery. This method doesn't provide the
138      /// means to check when and if the messages actually get delivered; you can achieve that by calling
139      /// [`Writing::unicast`] for each address returned by [`Tcp::connected_addrs`].
140      ///
141      /// # Errors
142      ///
143      /// Returns [`io::ErrorKind::Unsupported`] if [`Writing::enable_writing`] hadn't been called yet.
144      fn broadcast(&self, message: Self::Message) -> io::Result<()>
145      where
146          Self::Message: Clone,
147      {
148          // access the protocol handler
149          if let Some(handler) = self.tcp().protocols.writing.get() {
150              let senders = handler.senders.read().clone();
151              for (addr, message_sender) in senders {
152                  let (msg, _delivery) = WrappedMessage::new(Box::new(message.clone()));
153                  let _ = message_sender.try_send(msg).map_err(|e| {
154                      error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e);
155                      self.tcp().stats().register_failure();
156                  });
157              }
158  
159              Ok(())
160          } else {
161              Err(io::ErrorKind::Unsupported.into())
162          }
163      }
164  }
165  
166  /// This trait is used to restrict access to methods that would otherwise be public in [`Writing`].
167  #[async_trait]
168  trait WritingInternal: Writing {
169      /// Writes the given message to the network stream and returns the number of written bytes.
170      async fn write_to_stream<W: AsyncWrite + Unpin + Send>(
171          &self,
172          message: Self::Message,
173          writer: &mut FramedWrite<W, Self::Codec>,
174      ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error>;
175  
176      /// Applies the [`Writing`] protocol to a single connection.
177      async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection, conn_senders: &WritingSenders);
178  }
179  
180  #[async_trait]
181  impl<W: Writing> WritingInternal for W {
182      async fn write_to_stream<A: AsyncWrite + Unpin + Send>(
183          &self,
184          message: Self::Message,
185          writer: &mut FramedWrite<A, Self::Codec>,
186      ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error> {
187          writer.feed(message).await?;
188          let len = writer.write_buffer().len();
189          writer.flush().await?;
190  
191          Ok(len)
192      }
193  
194      async fn handle_new_connection(
195          &self,
196          (mut conn, conn_returner): ReturnableConnection,
197          conn_senders: &WritingSenders,
198      ) {
199          let addr = conn.addr();
200          let codec = self.codec(addr, !conn.side());
201          let writer = conn.writer.take().expect("missing connection writer!");
202          let mut framed = FramedWrite::new(writer, codec);
203  
204          let (outbound_message_sender, mut outbound_message_receiver) = mpsc::channel(self.message_queue_depth());
205  
206          // register the connection's message sender with the Writing protocol handler
207          conn_senders.write().insert(addr, outbound_message_sender);
208  
209          // this will automatically drop the sender upon a disconnect
210          let auto_cleanup = SenderCleanup { addr, senders: Arc::clone(conn_senders) };
211  
212          // use a channel to know when the writer task is ready
213          let (tx_writer, rx_writer) = oneshot::channel();
214  
215          // the task for writing outbound messages
216          let self_clone = self.clone();
217          let writer_task = tokio::spawn(Box::pin(async move {
218              let node = self_clone.tcp();
219              trace!(parent: node.span(), "spawned a task for writing messages to {}", addr);
220              tx_writer.send(()).unwrap(); // safe; the channel was just opened
221  
222              // move the cleanup into the task that gets aborted on disconnect
223              let _auto_cleanup = auto_cleanup;
224  
225              while let Some(wrapped_msg) = outbound_message_receiver.recv().await {
226                  let msg = wrapped_msg.msg.downcast().unwrap();
227  
228                  match self_clone.write_to_stream(*msg, &mut framed).await {
229                      Ok(len) => {
230                          let _ = wrapped_msg.delivery_notification.send(Ok(()));
231                          node.known_peers().register_sent_message(addr.ip(), len);
232                          node.stats().register_sent_message(len);
233                          trace!(parent: node.span(), "sent {}B to {}", len, addr);
234                      }
235                      Err(e) => {
236                          node.known_peers().register_failure(addr.ip());
237                          error!(parent: node.span(), "couldn't send a message to {}: {}", addr, e);
238                          let is_fatal = node.config().fatal_io_errors.contains(&e.kind());
239                          let _ = wrapped_msg.delivery_notification.send(Err(e));
240                          if is_fatal {
241                              break;
242                          }
243                      }
244                  }
245              }
246  
247              node.disconnect(addr).await;
248          }));
249          let _ = rx_writer.await;
250          conn.tasks.push(writer_task);
251  
252          // return the Connection to the Tcp, resuming Tcp::adapt_stream
253          if conn_returner.send(Ok(conn)).is_err() {
254              unreachable!("couldn't return a Connection to the Tcp");
255          }
256      }
257  }
258  
259  /// Used to queue messages for delivery.
260  struct WrappedMessage {
261      msg: Box<dyn Any + Send>,
262      delivery_notification: oneshot::Sender<io::Result<()>>,
263  }
264  
265  impl WrappedMessage {
266      fn new(msg: Box<dyn Any + Send>) -> (Self, oneshot::Receiver<io::Result<()>>) {
267          let (tx, rx) = oneshot::channel();
268          let wrapped_msg = Self { msg, delivery_notification: tx };
269  
270          (wrapped_msg, rx)
271      }
272  }
273  
274  /// The handler object dedicated to the [`Writing`] protocol.
275  pub(crate) struct WritingHandler {
276      handler: ProtocolHandler<Connection, io::Result<Connection>>,
277      senders: WritingSenders,
278  }
279  
280  impl Protocol<Connection, io::Result<Connection>> for WritingHandler {
281      fn trigger(&self, item: ReturnableConnection) {
282          self.handler.trigger(item);
283      }
284  }
285  
286  struct SenderCleanup {
287      addr: SocketAddr,
288      senders: WritingSenders,
289  }
290  
291  impl Drop for SenderCleanup {
292      fn drop(&mut self) {
293          self.senders.write().remove(&self.addr);
294      }
295  }