reading.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 #[cfg(doc)] 20 use crate::{protocols::Handshake, Config}; 21 use crate::{ 22 protocols::{ProtocolHandler, ReturnableConnection}, 23 ConnectionSide, 24 Tcp, 25 P2P, 26 }; 27 28 use async_trait::async_trait; 29 use bytes::BytesMut; 30 use futures_util::StreamExt; 31 use std::{io, net::SocketAddr}; 32 use tokio::{ 33 io::AsyncRead, 34 sync::{mpsc, oneshot}, 35 }; 36 use tokio_util::codec::{Decoder, FramedRead}; 37 use tracing::*; 38 39 /// Can be used to specify and enable reading, i.e. receiving inbound messages. If the [`Handshake`] 40 /// protocol is enabled too, it goes into force only after the handshake has been concluded. 41 /// 42 /// Each inbound message is isolated by the user-supplied [`Reading::Codec`], creating a [`Reading::Message`], 43 /// which is immediately queued (with a [`Reading::MESSAGE_QUEUE_DEPTH`] limit) to be processed by 44 /// [`Reading::process_message`]. The configured fatal IO errors result in an immediate disconnect 45 /// (in order to e.g. avoid accidentally reading "borked" messages). 46 #[async_trait] 47 pub trait Reading: P2P 48 where 49 Self: Clone + Send + Sync + 'static, 50 { 51 /// The depth of per-connection queues used to process inbound messages; the greater it is, the more inbound 52 /// messages the node can enqueue, but setting it to a large value can make the node more susceptible to DoS 53 /// attacks. 54 /// 55 /// The default value is 1024. 56 fn message_queue_depth(&self) -> usize { 57 1024 58 } 59 60 /// The initial size of a per-connection buffer for reading inbound messages. Can be set to the maximum expected size 61 /// of the inbound message in order to only allocate it once. 62 /// 63 /// The default value is 1024KiB. 64 const INITIAL_BUFFER_SIZE: usize = 1024 * 1024; 65 66 /// The final (deserialized) type of inbound messages. 67 type Message: Send; 68 69 /// The user-supplied [`Decoder`] used to interpret inbound messages. 70 type Codec: Decoder<Item = Self::Message, Error = io::Error> + Send; 71 72 /// Prepares the node to receive messages. 73 async fn enable_reading(&self) { 74 let (conn_sender, mut conn_receiver) = mpsc::unbounded_channel(); 75 76 // use a channel to know when the reading task is ready 77 let (tx_reading, rx_reading) = oneshot::channel(); 78 79 // the main task spawning per-connection tasks reading messages from their streams 80 let self_clone = self.clone(); 81 let reading_task = tokio::spawn(async move { 82 trace!(parent: self_clone.tcp().span(), "spawned the Reading handler task"); 83 tx_reading.send(()).unwrap(); // safe; the channel was just opened 84 85 // these objects are sent from `Tcp::adapt_stream` 86 while let Some(returnable_conn) = conn_receiver.recv().await { 87 self_clone.handle_new_connection(returnable_conn).await; 88 } 89 }); 90 let _ = rx_reading.await; 91 self.tcp().tasks.lock().push(reading_task); 92 93 // register the Reading handler with the Tcp 94 let hdl = Box::new(ProtocolHandler(conn_sender)); 95 assert!(self.tcp().protocols.reading.set(hdl).is_ok(), "the Reading protocol was enabled more than once!"); 96 } 97 98 /// Creates a [`Decoder`] used to interpret messages from the network. 99 /// The `side` param indicates the connection side **from the node's perspective**. 100 fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec; 101 102 /// Processes an inbound message. Can be used to update state, send replies etc. 103 async fn process_message(&self, source: SocketAddr, message: Self::Message) -> io::Result<()>; 104 } 105 106 /// This trait is used to restrict access to methods that would otherwise be public in [`Reading`]. 107 #[async_trait] 108 trait ReadingInternal: Reading { 109 /// Applies the [`Reading`] protocol to a single connection. 110 async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection); 111 112 /// Wraps the user-supplied [`Decoder`] ([`Reading::Codec`]) in another one used for message accounting. 113 fn map_codec<T: AsyncRead>( 114 &self, 115 framed: FramedRead<T, Self::Codec>, 116 addr: SocketAddr, 117 ) -> FramedRead<T, CountingCodec<Self::Codec>>; 118 } 119 120 #[async_trait] 121 impl<R: Reading> ReadingInternal for R { 122 async fn handle_new_connection(&self, (mut conn, conn_returner): ReturnableConnection) { 123 let addr = conn.addr(); 124 let codec = self.codec(addr, !conn.side()); 125 let reader = conn.reader.take().expect("missing connection reader!"); 126 let framed = FramedRead::new(reader, codec); 127 let mut framed = self.map_codec(framed, addr); 128 129 // the connection will notify the reading task once it's fully ready 130 let (tx_conn_ready, rx_conn_ready) = oneshot::channel(); 131 conn.readiness_notifier = Some(tx_conn_ready); 132 133 if Self::INITIAL_BUFFER_SIZE != 0 { 134 framed.read_buffer_mut().reserve(Self::INITIAL_BUFFER_SIZE); 135 } 136 137 let (inbound_message_sender, mut inbound_message_receiver) = mpsc::channel(self.message_queue_depth()); 138 139 // use a channel to know when the processing task is ready 140 let (tx_processing, rx_processing) = oneshot::channel::<()>(); 141 142 // the task for processing parsed messages 143 let self_clone = self.clone(); 144 let inbound_processing_task = tokio::spawn(Box::pin(async move { 145 let node = self_clone.tcp(); 146 trace!(parent: node.span(), "spawned a task for processing messages from {addr}"); 147 tx_processing.send(()).unwrap(); // safe; the channel was just opened 148 149 while let Some(msg) = inbound_message_receiver.recv().await { 150 if let Err(e) = self_clone.process_message(addr, msg).await { 151 error!(parent: node.span(), "can't process a message from {addr}: {e}"); 152 node.known_peers().register_failure(addr.ip()); 153 } 154 #[cfg(feature = "metrics")] 155 metrics::decrement_gauge(metrics::tcp::TCP_TASKS, 1f64); 156 } 157 })); 158 let _ = rx_processing.await; 159 conn.tasks.push(inbound_processing_task); 160 161 // use a channel to know when the reader task is ready 162 let (tx_reader, rx_reader) = oneshot::channel::<()>(); 163 164 // the task for reading messages from a stream 165 let node = self.tcp().clone(); 166 let reader_task = tokio::spawn(Box::pin(async move { 167 trace!(parent: node.span(), "spawned a task for reading messages from {addr}"); 168 tx_reader.send(()).unwrap(); // safe; the channel was just opened 169 170 // postpone reads until the connection is fully established; if the process fails, 171 // this task gets aborted, so there is no need for a dedicated timeout 172 let _ = rx_conn_ready.await; 173 174 while let Some(bytes) = framed.next().await { 175 match bytes { 176 Ok(msg) => { 177 // send the message for further processing 178 if let Err(e) = inbound_message_sender.try_send(msg) { 179 error!(parent: node.span(), "can't process a message from {addr}: {e}"); 180 node.stats().register_failure(); 181 if matches!(e, mpsc::error::TrySendError::Closed(_)) { 182 break; 183 } 184 } 185 #[cfg(feature = "metrics")] 186 metrics::increment_gauge(metrics::tcp::TCP_TASKS, 1f64); 187 } 188 Err(e) => { 189 error!(parent: node.span(), "can't read from {addr}: {e}"); 190 node.known_peers().register_failure(addr.ip()); 191 if node.config().fatal_io_errors.contains(&e.kind()) { 192 break; 193 } 194 } 195 } 196 } 197 198 let _ = node.disconnect(addr).await; 199 })); 200 let _ = rx_reader.await; 201 conn.tasks.push(reader_task); 202 203 // return the Connection to the Tcp, resuming Tcp::adapt_stream 204 if conn_returner.send(Ok(conn)).is_err() { 205 unreachable!("couldn't return a Connection to the Tcp"); 206 } 207 } 208 209 fn map_codec<T: AsyncRead>( 210 &self, 211 framed: FramedRead<T, Self::Codec>, 212 addr: SocketAddr, 213 ) -> FramedRead<T, CountingCodec<Self::Codec>> { 214 framed.map_decoder(|codec| CountingCodec { codec, node: self.tcp().clone(), addr, acc: 0 }) 215 } 216 } 217 218 /// A wrapper [`Decoder`] that also counts the inbound messages. 219 struct CountingCodec<D: Decoder> { 220 codec: D, 221 node: Tcp, 222 addr: SocketAddr, 223 acc: usize, 224 } 225 226 impl<D: Decoder> Decoder for CountingCodec<D> { 227 type Error = D::Error; 228 type Item = D::Item; 229 230 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { 231 let initial_buf_len = src.len(); 232 let ret = self.codec.decode(src)?; 233 let final_buf_len = src.len(); 234 let read_len = initial_buf_len - final_buf_len + self.acc; 235 236 if read_len != 0 { 237 trace!(parent: self.node.span(), "read {}B from {}", read_len, self.addr); 238 239 if ret.is_some() { 240 self.acc = 0; 241 self.node.known_peers().register_received_message(self.addr.ip(), read_len); 242 self.node.stats().register_received_message(read_len); 243 } else { 244 self.acc = read_len; 245 } 246 } 247 248 Ok(ret) 249 } 250 }