writing.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use std::{any::Any, collections::HashMap, io, net::SocketAddr, sync::Arc}; 20 21 use async_trait::async_trait; 22 use futures_util::sink::SinkExt; 23 #[cfg(feature = "locktick")] 24 use locktick::parking_lot::RwLock; 25 #[cfg(not(feature = "locktick"))] 26 use parking_lot::RwLock; 27 use tokio::{ 28 io::AsyncWrite, 29 sync::{mpsc, oneshot}, 30 }; 31 use tokio_util::codec::{Encoder, FramedWrite}; 32 use tracing::*; 33 34 #[cfg(doc)] 35 use crate::{protocols::Handshake, Config, Tcp}; 36 use crate::{ 37 protocols::{Protocol, ProtocolHandler, ReturnableConnection}, 38 Connection, 39 ConnectionSide, 40 P2P, 41 }; 42 43 type WritingSenders = Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<WrappedMessage>>>>; 44 45 /// Can be used to specify and enable writing, i.e. sending outbound messages. If the [`Handshake`] 46 /// protocol is enabled too, it goes into force only after the handshake has been concluded. 47 #[async_trait] 48 pub trait Writing: P2P 49 where 50 Self: Clone + Send + Sync + 'static, 51 { 52 /// The depth of per-connection queues used to send outbound messages; the greater it is, the more outbound 53 /// messages the node can enqueue. Setting it to a large value is not recommended, as doing it might 54 /// obscure potential issues with your implementation (like slow serialization) or network. 55 /// 56 /// The default value is 1024. 57 fn message_queue_depth(&self) -> usize { 58 1024 59 } 60 61 /// The type of the outbound messages; unless their serialization is expensive and the message 62 /// is broadcasted (in which case it would get serialized multiple times), serialization should 63 /// be done in the implementation of [`Self::Codec`]. 64 type Message: Send; 65 66 /// The user-supplied [`Encoder`] used to write outbound messages to the target stream. 67 type Codec: Encoder<Self::Message, Error = io::Error> + Send; 68 69 /// Prepares the node to send messages. 70 async fn enable_writing(&self) { 71 let (conn_sender, mut conn_receiver) = mpsc::unbounded_channel(); 72 73 // the conn_senders are used to send messages from the Tcp to individual connections 74 let conn_senders: WritingSenders = Default::default(); 75 // procure a clone to create the WritingHandler with 76 let senders = conn_senders.clone(); 77 78 // use a channel to know when the writing task is ready 79 let (tx_writing, rx_writing) = oneshot::channel(); 80 81 // the task spawning tasks sending messages to all the streams 82 let self_clone = self.clone(); 83 let writing_task = tokio::spawn(async move { 84 trace!(parent: self_clone.tcp().span(), "spawned the Writing handler task"); 85 tx_writing.send(()).unwrap(); // safe; the channel was just opened 86 87 // these objects are sent from `Tcp::adapt_stream` 88 while let Some(returnable_conn) = conn_receiver.recv().await { 89 self_clone.handle_new_connection(returnable_conn, &conn_senders).await; 90 } 91 }); 92 let _ = rx_writing.await; 93 self.tcp().tasks.lock().push(writing_task); 94 95 // register the WritingHandler with the Tcp 96 let hdl = Box::new(WritingHandler { handler: ProtocolHandler(conn_sender), senders }); 97 assert!(self.tcp().protocols.writing.set(hdl).is_ok(), "the Writing protocol was enabled more than once!"); 98 } 99 100 /// Creates an [`Encoder`] used to write the outbound messages to the target stream. 101 /// The `side` param indicates the connection side **from the node's perspective**. 102 fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec; 103 104 /// Sends the provided message to the specified [`SocketAddr`]. Returns as soon as the message is queued to 105 /// be sent, without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] 106 /// which can be used to determine when and whether the message has been delivered. 107 /// 108 /// # Errors 109 /// 110 /// The following errors can be returned: 111 /// - [`io::ErrorKind::NotConnected`] if the node is not connected to the provided address 112 /// - [`io::ErrorKind::Other`] if the outbound message queue for this address is full 113 /// - [`io::ErrorKind::Unsupported`] if [`Writing::enable_writing`] hadn't been called yet 114 fn unicast(&self, addr: SocketAddr, message: Self::Message) -> io::Result<oneshot::Receiver<io::Result<()>>> { 115 // access the protocol handler 116 if let Some(handler) = self.tcp().protocols.writing.get() { 117 // find the message sender for the given address 118 if let Some(sender) = handler.senders.read().get(&addr).cloned() { 119 let (msg, delivery) = WrappedMessage::new(Box::new(message)); 120 sender 121 .try_send(msg) 122 .map_err(|e| { 123 error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e); 124 self.tcp().stats().register_failure(); 125 io::ErrorKind::Other.into() 126 }) 127 .map(|_| delivery) 128 } else { 129 Err(io::ErrorKind::NotConnected.into()) 130 } 131 } else { 132 Err(io::ErrorKind::Unsupported.into()) 133 } 134 } 135 136 /// Broadcasts the provided message to all connected peers. Returns as soon as the message is queued to 137 /// be sent to all the peers, without waiting for the actual delivery. This method doesn't provide the 138 /// means to check when and if the messages actually get delivered; you can achieve that by calling 139 /// [`Writing::unicast`] for each address returned by [`Tcp::connected_addrs`]. 140 /// 141 /// # Errors 142 /// 143 /// Returns [`io::ErrorKind::Unsupported`] if [`Writing::enable_writing`] hadn't been called yet. 144 fn broadcast(&self, message: Self::Message) -> io::Result<()> 145 where 146 Self::Message: Clone, 147 { 148 // access the protocol handler 149 if let Some(handler) = self.tcp().protocols.writing.get() { 150 let senders = handler.senders.read().clone(); 151 for (addr, message_sender) in senders { 152 let (msg, _delivery) = WrappedMessage::new(Box::new(message.clone())); 153 let _ = message_sender.try_send(msg).map_err(|e| { 154 error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e); 155 self.tcp().stats().register_failure(); 156 }); 157 } 158 159 Ok(()) 160 } else { 161 Err(io::ErrorKind::Unsupported.into()) 162 } 163 } 164 } 165 166 /// This trait is used to restrict access to methods that would otherwise be public in [`Writing`]. 167 #[async_trait] 168 trait WritingInternal: Writing { 169 /// Writes the given message to the network stream and returns the number of written bytes. 170 async fn write_to_stream<W: AsyncWrite + Unpin + Send>( 171 &self, 172 message: Self::Message, 173 writer: &mut FramedWrite<W, Self::Codec>, 174 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error>; 175 176 /// Applies the [`Writing`] protocol to a single connection. 177 async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection, conn_senders: &WritingSenders); 178 } 179 180 #[async_trait] 181 impl<W: Writing> WritingInternal for W { 182 async fn write_to_stream<A: AsyncWrite + Unpin + Send>( 183 &self, 184 message: Self::Message, 185 writer: &mut FramedWrite<A, Self::Codec>, 186 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error> { 187 writer.feed(message).await?; 188 let len = writer.write_buffer().len(); 189 writer.flush().await?; 190 191 Ok(len) 192 } 193 194 async fn handle_new_connection( 195 &self, 196 (mut conn, conn_returner): ReturnableConnection, 197 conn_senders: &WritingSenders, 198 ) { 199 let addr = conn.addr(); 200 let codec = self.codec(addr, !conn.side()); 201 let writer = conn.writer.take().expect("missing connection writer!"); 202 let mut framed = FramedWrite::new(writer, codec); 203 204 let (outbound_message_sender, mut outbound_message_receiver) = mpsc::channel(self.message_queue_depth()); 205 206 // register the connection's message sender with the Writing protocol handler 207 conn_senders.write().insert(addr, outbound_message_sender); 208 209 // this will automatically drop the sender upon a disconnect 210 let auto_cleanup = SenderCleanup { addr, senders: Arc::clone(conn_senders) }; 211 212 // use a channel to know when the writer task is ready 213 let (tx_writer, rx_writer) = oneshot::channel(); 214 215 // the task for writing outbound messages 216 let self_clone = self.clone(); 217 let writer_task = tokio::spawn(Box::pin(async move { 218 let node = self_clone.tcp(); 219 trace!(parent: node.span(), "spawned a task for writing messages to {}", addr); 220 tx_writer.send(()).unwrap(); // safe; the channel was just opened 221 222 // move the cleanup into the task that gets aborted on disconnect 223 let _auto_cleanup = auto_cleanup; 224 225 while let Some(wrapped_msg) = outbound_message_receiver.recv().await { 226 let msg = wrapped_msg.msg.downcast().unwrap(); 227 228 match self_clone.write_to_stream(*msg, &mut framed).await { 229 Ok(len) => { 230 let _ = wrapped_msg.delivery_notification.send(Ok(())); 231 node.known_peers().register_sent_message(addr.ip(), len); 232 node.stats().register_sent_message(len); 233 trace!(parent: node.span(), "sent {}B to {}", len, addr); 234 } 235 Err(e) => { 236 node.known_peers().register_failure(addr.ip()); 237 error!(parent: node.span(), "couldn't send a message to {}: {}", addr, e); 238 let is_fatal = node.config().fatal_io_errors.contains(&e.kind()); 239 let _ = wrapped_msg.delivery_notification.send(Err(e)); 240 if is_fatal { 241 break; 242 } 243 } 244 } 245 } 246 247 node.disconnect(addr).await; 248 })); 249 let _ = rx_writer.await; 250 conn.tasks.push(writer_task); 251 252 // return the Connection to the Tcp, resuming Tcp::adapt_stream 253 if conn_returner.send(Ok(conn)).is_err() { 254 unreachable!("couldn't return a Connection to the Tcp"); 255 } 256 } 257 } 258 259 /// Used to queue messages for delivery. 260 struct WrappedMessage { 261 msg: Box<dyn Any + Send>, 262 delivery_notification: oneshot::Sender<io::Result<()>>, 263 } 264 265 impl WrappedMessage { 266 fn new(msg: Box<dyn Any + Send>) -> (Self, oneshot::Receiver<io::Result<()>>) { 267 let (tx, rx) = oneshot::channel(); 268 let wrapped_msg = Self { msg, delivery_notification: tx }; 269 270 (wrapped_msg, rx) 271 } 272 } 273 274 /// The handler object dedicated to the [`Writing`] protocol. 275 pub(crate) struct WritingHandler { 276 handler: ProtocolHandler<Connection, io::Result<Connection>>, 277 senders: WritingSenders, 278 } 279 280 impl Protocol<Connection, io::Result<Connection>> for WritingHandler { 281 fn trigger(&self, item: ReturnableConnection) { 282 self.handler.trigger(item); 283 } 284 } 285 286 struct SenderCleanup { 287 addr: SocketAddr, 288 senders: WritingSenders, 289 } 290 291 impl Drop for SenderCleanup { 292 fn drop(&mut self) { 293 self.senders.write().remove(&self.addr); 294 } 295 }