/ node / tcp / src / protocols / reading.rs
reading.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  #[cfg(doc)]
 20  use crate::{protocols::Handshake, Config};
 21  use crate::{
 22      protocols::{ProtocolHandler, ReturnableConnection},
 23      ConnectionSide,
 24      Tcp,
 25      P2P,
 26  };
 27  
 28  use async_trait::async_trait;
 29  use bytes::BytesMut;
 30  use futures_util::StreamExt;
 31  use std::{io, net::SocketAddr};
 32  use tokio::{
 33      io::AsyncRead,
 34      sync::{mpsc, oneshot},
 35  };
 36  use tokio_util::codec::{Decoder, FramedRead};
 37  use tracing::*;
 38  
 39  /// Can be used to specify and enable reading, i.e. receiving inbound messages. If the [`Handshake`]
 40  /// protocol is enabled too, it goes into force only after the handshake has been concluded.
 41  ///
 42  /// Each inbound message is isolated by the user-supplied [`Reading::Codec`], creating a [`Reading::Message`],
 43  /// which is immediately queued (with a [`Reading::MESSAGE_QUEUE_DEPTH`] limit) to be processed by
 44  /// [`Reading::process_message`]. The configured fatal IO errors result in an immediate disconnect
 45  /// (in order to e.g. avoid accidentally reading "borked" messages).
 46  #[async_trait]
 47  pub trait Reading: P2P
 48  where
 49      Self: Clone + Send + Sync + 'static,
 50  {
 51      /// The depth of per-connection queues used to process inbound messages; the greater it is, the more inbound
 52      /// messages the node can enqueue, but setting it to a large value can make the node more susceptible to DoS
 53      /// attacks.
 54      ///
 55      /// The default value is 1024.
 56      fn message_queue_depth(&self) -> usize {
 57          1024
 58      }
 59  
 60      /// The initial size of a per-connection buffer for reading inbound messages. Can be set to the maximum expected size
 61      /// of the inbound message in order to only allocate it once.
 62      ///
 63      /// The default value is 1024KiB.
 64      const INITIAL_BUFFER_SIZE: usize = 1024 * 1024;
 65  
 66      /// The final (deserialized) type of inbound messages.
 67      type Message: Send;
 68  
 69      /// The user-supplied [`Decoder`] used to interpret inbound messages.
 70      type Codec: Decoder<Item = Self::Message, Error = io::Error> + Send;
 71  
 72      /// Prepares the node to receive messages.
 73      async fn enable_reading(&self) {
 74          let (conn_sender, mut conn_receiver) = mpsc::unbounded_channel();
 75  
 76          // use a channel to know when the reading task is ready
 77          let (tx_reading, rx_reading) = oneshot::channel();
 78  
 79          // the main task spawning per-connection tasks reading messages from their streams
 80          let self_clone = self.clone();
 81          let reading_task = tokio::spawn(async move {
 82              trace!(parent: self_clone.tcp().span(), "spawned the Reading handler task");
 83              tx_reading.send(()).unwrap(); // safe; the channel was just opened
 84  
 85              // these objects are sent from `Tcp::adapt_stream`
 86              while let Some(returnable_conn) = conn_receiver.recv().await {
 87                  self_clone.handle_new_connection(returnable_conn).await;
 88              }
 89          });
 90          let _ = rx_reading.await;
 91          self.tcp().tasks.lock().push(reading_task);
 92  
 93          // register the Reading handler with the Tcp
 94          let hdl = Box::new(ProtocolHandler(conn_sender));
 95          assert!(self.tcp().protocols.reading.set(hdl).is_ok(), "the Reading protocol was enabled more than once!");
 96      }
 97  
 98      /// Creates a [`Decoder`] used to interpret messages from the network.
 99      /// The `side` param indicates the connection side **from the node's perspective**.
100      fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec;
101  
102      /// Processes an inbound message. Can be used to update state, send replies etc.
103      async fn process_message(&self, source: SocketAddr, message: Self::Message) -> io::Result<()>;
104  }
105  
106  /// This trait is used to restrict access to methods that would otherwise be public in [`Reading`].
107  #[async_trait]
108  trait ReadingInternal: Reading {
109      /// Applies the [`Reading`] protocol to a single connection.
110      async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection);
111  
112      /// Wraps the user-supplied [`Decoder`] ([`Reading::Codec`]) in another one used for message accounting.
113      fn map_codec<T: AsyncRead>(
114          &self,
115          framed: FramedRead<T, Self::Codec>,
116          addr: SocketAddr,
117      ) -> FramedRead<T, CountingCodec<Self::Codec>>;
118  }
119  
120  #[async_trait]
121  impl<R: Reading> ReadingInternal for R {
122      async fn handle_new_connection(&self, (mut conn, conn_returner): ReturnableConnection) {
123          let addr = conn.addr();
124          let codec = self.codec(addr, !conn.side());
125          let reader = conn.reader.take().expect("missing connection reader!");
126          let framed = FramedRead::new(reader, codec);
127          let mut framed = self.map_codec(framed, addr);
128  
129          // the connection will notify the reading task once it's fully ready
130          let (tx_conn_ready, rx_conn_ready) = oneshot::channel();
131          conn.readiness_notifier = Some(tx_conn_ready);
132  
133          if Self::INITIAL_BUFFER_SIZE != 0 {
134              framed.read_buffer_mut().reserve(Self::INITIAL_BUFFER_SIZE);
135          }
136  
137          let (inbound_message_sender, mut inbound_message_receiver) = mpsc::channel(self.message_queue_depth());
138  
139          // use a channel to know when the processing task is ready
140          let (tx_processing, rx_processing) = oneshot::channel::<()>();
141  
142          // the task for processing parsed messages
143          let self_clone = self.clone();
144          let inbound_processing_task = tokio::spawn(Box::pin(async move {
145              let node = self_clone.tcp();
146              trace!(parent: node.span(), "spawned a task for processing messages from {addr}");
147              tx_processing.send(()).unwrap(); // safe; the channel was just opened
148  
149              while let Some(msg) = inbound_message_receiver.recv().await {
150                  if let Err(e) = self_clone.process_message(addr, msg).await {
151                      error!(parent: node.span(), "can't process a message from {addr}: {e}");
152                      node.known_peers().register_failure(addr.ip());
153                  }
154                  #[cfg(feature = "metrics")]
155                  metrics::decrement_gauge(metrics::tcp::TCP_TASKS, 1f64);
156              }
157          }));
158          let _ = rx_processing.await;
159          conn.tasks.push(inbound_processing_task);
160  
161          // use a channel to know when the reader task is ready
162          let (tx_reader, rx_reader) = oneshot::channel::<()>();
163  
164          // the task for reading messages from a stream
165          let node = self.tcp().clone();
166          let reader_task = tokio::spawn(Box::pin(async move {
167              trace!(parent: node.span(), "spawned a task for reading messages from {addr}");
168              tx_reader.send(()).unwrap(); // safe; the channel was just opened
169  
170              // postpone reads until the connection is fully established; if the process fails,
171              // this task gets aborted, so there is no need for a dedicated timeout
172              let _ = rx_conn_ready.await;
173  
174              while let Some(bytes) = framed.next().await {
175                  match bytes {
176                      Ok(msg) => {
177                          // send the message for further processing
178                          if let Err(e) = inbound_message_sender.try_send(msg) {
179                              error!(parent: node.span(), "can't process a message from {addr}: {e}");
180                              node.stats().register_failure();
181                              if matches!(e, mpsc::error::TrySendError::Closed(_)) {
182                                  break;
183                              }
184                          }
185                          #[cfg(feature = "metrics")]
186                          metrics::increment_gauge(metrics::tcp::TCP_TASKS, 1f64);
187                      }
188                      Err(e) => {
189                          error!(parent: node.span(), "can't read from {addr}: {e}");
190                          node.known_peers().register_failure(addr.ip());
191                          if node.config().fatal_io_errors.contains(&e.kind()) {
192                              break;
193                          }
194                      }
195                  }
196              }
197  
198              let _ = node.disconnect(addr).await;
199          }));
200          let _ = rx_reader.await;
201          conn.tasks.push(reader_task);
202  
203          // return the Connection to the Tcp, resuming Tcp::adapt_stream
204          if conn_returner.send(Ok(conn)).is_err() {
205              unreachable!("couldn't return a Connection to the Tcp");
206          }
207      }
208  
209      fn map_codec<T: AsyncRead>(
210          &self,
211          framed: FramedRead<T, Self::Codec>,
212          addr: SocketAddr,
213      ) -> FramedRead<T, CountingCodec<Self::Codec>> {
214          framed.map_decoder(|codec| CountingCodec { codec, node: self.tcp().clone(), addr, acc: 0 })
215      }
216  }
217  
218  /// A wrapper [`Decoder`] that also counts the inbound messages.
219  struct CountingCodec<D: Decoder> {
220      codec: D,
221      node: Tcp,
222      addr: SocketAddr,
223      acc: usize,
224  }
225  
226  impl<D: Decoder> Decoder for CountingCodec<D> {
227      type Error = D::Error;
228      type Item = D::Item;
229  
230      fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
231          let initial_buf_len = src.len();
232          let ret = self.codec.decode(src)?;
233          let final_buf_len = src.len();
234          let read_len = initial_buf_len - final_buf_len + self.acc;
235  
236          if read_len != 0 {
237              trace!(parent: self.node.span(), "read {}B from {}", read_len, self.addr);
238  
239              if ret.is_some() {
240                  self.acc = 0;
241                  self.node.known_peers().register_received_message(self.addr.ip(), read_len);
242                  self.node.stats().register_received_message(read_len);
243              } else {
244                  self.acc = read_len;
245              }
246          }
247  
248          Ok(ret)
249      }
250  }