/ node / bft / src / helpers / channels.rs
channels.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use crate::events::{
 20      BatchPropose,
 21      BatchSignature,
 22      CertificateRequest,
 23      CertificateResponse,
 24      TransmissionRequest,
 25      TransmissionResponse,
 26  };
 27  use alphaos_node_sync::locators::BlockLocators;
 28  use alphavm::{
 29      console::network::*,
 30      ledger::{
 31          block::{Block, Transaction},
 32          narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
 33          puzzle::{Solution, SolutionID},
 34      },
 35      prelude::Result,
 36  };
 37  
 38  use indexmap::IndexMap;
 39  use std::net::SocketAddr;
 40  use tokio::sync::{mpsc, oneshot};
 41  
 42  const MAX_CHANNEL_SIZE: usize = 8192;
 43  
 44  #[derive(Debug)]
 45  pub struct ConsensusSender<N: Network> {
 46      pub tx_consensus_subdag:
 47          mpsc::Sender<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
 48  }
 49  
 50  #[derive(Debug)]
 51  pub struct ConsensusReceiver<N: Network> {
 52      pub rx_consensus_subdag:
 53          mpsc::Receiver<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
 54  }
 55  
 56  /// Initializes the consensus channels.
 57  pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusReceiver<N>) {
 58      let (tx_consensus_subdag, rx_consensus_subdag) = mpsc::channel(MAX_CHANNEL_SIZE);
 59  
 60      let sender = ConsensusSender { tx_consensus_subdag };
 61      let receiver = ConsensusReceiver { rx_consensus_subdag };
 62  
 63      (sender, receiver)
 64  }
 65  
 66  /// "Interface" that enables, for example, sending data from storage to the the BFT logic.
 67  #[derive(Clone, Debug)]
 68  pub struct BFTSender<N: Network> {
 69      pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<bool>)>,
 70      pub tx_primary_certificate: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
 71      pub tx_sync_bft_dag_at_bootup: mpsc::Sender<Vec<BatchCertificate<N>>>,
 72      pub tx_sync_bft: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
 73  }
 74  
 75  impl<N: Network> BFTSender<N> {
 76      /// Sends the current round to the BFT.
 77      pub async fn send_primary_round_to_bft(&self, current_round: u64) -> Result<bool> {
 78          // Initialize a callback sender and receiver.
 79          let (callback_sender, callback_receiver) = oneshot::channel();
 80          // Send the current round to the BFT.
 81          self.tx_primary_round.send((current_round, callback_sender)).await?;
 82          // Await the callback to continue.
 83          Ok(callback_receiver.await?)
 84      }
 85  
 86      /// Sends the batch certificate to the BFT.
 87      pub async fn send_primary_certificate_to_bft(&self, certificate: BatchCertificate<N>) -> Result<()> {
 88          // Initialize a callback sender and receiver.
 89          let (callback_sender, callback_receiver) = oneshot::channel();
 90          // Send the certificate to the BFT.
 91          self.tx_primary_certificate.send((certificate, callback_sender)).await?;
 92          // Await the callback to continue.
 93          callback_receiver.await?
 94      }
 95  
 96      /// Sends the batch certificates to the BFT for syncing.
 97      pub async fn send_sync_bft(&self, certificate: BatchCertificate<N>) -> Result<()> {
 98          // Initialize a callback sender and receiver.
 99          let (callback_sender, callback_receiver) = oneshot::channel();
100          // Send the certificate to the BFT for syncing.
101          self.tx_sync_bft.send((certificate, callback_sender)).await?;
102          // Await the callback to continue.
103          callback_receiver.await?
104      }
105  }
106  
107  /// Receiving counterpart to `BFTSender`
108  #[derive(Debug)]
109  pub struct BFTReceiver<N: Network> {
110      pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender<bool>)>,
111      pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
112      pub rx_sync_bft_dag_at_bootup: mpsc::Receiver<Vec<BatchCertificate<N>>>,
113      pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
114  }
115  
116  /// Initializes the BFT channels, and returns the sending and receiving ends.
117  pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
118      let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE);
119      let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);
120      let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE);
121      let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE);
122  
123      let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft };
124      let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft };
125  
126      (sender, receiver)
127  }
128  
129  #[derive(Clone, Debug)]
130  pub struct PrimarySender<N: Network> {
131      pub tx_batch_propose: mpsc::Sender<(SocketAddr, BatchPropose<N>)>,
132      pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature<N>)>,
133      pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
134      pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
135      pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
136      pub tx_unconfirmed_transaction:
137          mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
138  }
139  
140  impl<N: Network> PrimarySender<N> {
141      /// Sends the unconfirmed solution to the primary.
142      ///
143      /// # Returns
144      /// - `Ok(true)` if the solution was added to the ready queue.
145      /// - `Ok(false)` if the solution was valid but already exists in the ready queue.
146      /// - `Err(anyhow::Error)` if the solution was invalid.
147      pub async fn send_unconfirmed_solution(
148          &self,
149          solution_id: SolutionID<N>,
150          solution: Data<Solution<N>>,
151      ) -> Result<bool> {
152          // Initialize a callback sender and receiver.
153          let (callback_sender, callback_receiver) = oneshot::channel();
154          // Send the unconfirmed solution to the primary.
155          self.tx_unconfirmed_solution.send((solution_id, solution, callback_sender)).await?;
156          // Await the callback to continue.
157          callback_receiver.await?
158      }
159  
160      /// Sends the unconfirmed transaction to the primary.
161      ///
162      /// # Returns
163      /// - `Ok(true)` if the transaction was added to the ready queue.
164      /// - `Ok(false)` if the transaction was valid but already exists in the ready queue.
165      /// - `Err(anyhow::Error)` if the transaction was invalid.
166      pub async fn send_unconfirmed_transaction(
167          &self,
168          transaction_id: N::TransactionID,
169          transaction: Data<Transaction<N>>,
170      ) -> Result<bool> {
171          // Initialize a callback sender and receiver.
172          let (callback_sender, callback_receiver) = oneshot::channel();
173          // Send the unconfirmed transaction to the primary.
174          self.tx_unconfirmed_transaction.send((transaction_id, transaction, callback_sender)).await?;
175          // Await the callback to continue.
176          callback_receiver.await?
177      }
178  }
179  
180  #[derive(Debug)]
181  pub struct PrimaryReceiver<N: Network> {
182      pub rx_batch_propose: mpsc::Receiver<(SocketAddr, BatchPropose<N>)>,
183      pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>,
184      pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
185      pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
186      pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
187      pub rx_unconfirmed_transaction:
188          mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
189  }
190  
191  /// Initializes the primary channels.
192  pub fn init_primary_channels<N: Network>() -> (PrimarySender<N>, PrimaryReceiver<N>) {
193      let (tx_batch_propose, rx_batch_propose) = mpsc::channel(MAX_CHANNEL_SIZE);
194      let (tx_batch_signature, rx_batch_signature) = mpsc::channel(MAX_CHANNEL_SIZE);
195      let (tx_batch_certified, rx_batch_certified) = mpsc::channel(MAX_CHANNEL_SIZE);
196      let (tx_primary_ping, rx_primary_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
197      let (tx_unconfirmed_solution, rx_unconfirmed_solution) = mpsc::channel(MAX_CHANNEL_SIZE);
198      let (tx_unconfirmed_transaction, rx_unconfirmed_transaction) = mpsc::channel(MAX_CHANNEL_SIZE);
199  
200      let sender = PrimarySender {
201          tx_batch_propose,
202          tx_batch_signature,
203          tx_batch_certified,
204          tx_primary_ping,
205          tx_unconfirmed_solution,
206          tx_unconfirmed_transaction,
207      };
208      let receiver = PrimaryReceiver {
209          rx_batch_propose,
210          rx_batch_signature,
211          rx_batch_certified,
212          rx_primary_ping,
213          rx_unconfirmed_solution,
214          rx_unconfirmed_transaction,
215      };
216  
217      (sender, receiver)
218  }
219  
220  #[derive(Debug)]
221  pub struct WorkerSender<N: Network> {
222      pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>,
223      pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>,
224      pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>,
225  }
226  
227  #[derive(Debug)]
228  pub struct WorkerReceiver<N: Network> {
229      pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>,
230      pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>,
231      pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>,
232  }
233  
234  /// Initializes the worker channels.
235  pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) {
236      let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
237      let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE);
238      let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE);
239  
240      let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response };
241      let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response };
242  
243      (sender, receiver)
244  }
245  
246  #[derive(Debug)]
247  pub struct SyncSender<N: Network> {
248      pub tx_block_sync_insert_block_response:
249          mpsc::Sender<(SocketAddr, Vec<Block<N>>, Option<ConsensusVersion>, oneshot::Sender<Result<()>>)>,
250      pub tx_block_sync_remove_peer: mpsc::Sender<SocketAddr>,
251      pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
252      pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest<N>)>,
253      pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse<N>)>,
254  }
255  
256  impl<N: Network> SyncSender<N> {
257      /// Sends the request to update the peer locators.
258      pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators<N>) -> Result<()> {
259          // Initialize a callback sender and receiver.
260          let (callback_sender, callback_receiver) = oneshot::channel();
261          // Send the request to update the peer locators.
262          // This `tx_block_sync_update_peer_locators.send()` call
263          // causes the `rx_block_sync_update_peer_locators.recv()` call
264          // in one of the loops in [`Sync::run()`] to return.
265          self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?;
266          // Await the callback to continue.
267          callback_receiver.await?
268      }
269  
270      /// Sends the request to insert a new block response.
271      pub async fn insert_block_response(
272          &self,
273          peer_ip: SocketAddr,
274          blocks: Vec<Block<N>>,
275          latest_consensus_version: Option<ConsensusVersion>,
276      ) -> Result<()> {
277          // Initialize a callback sender and receiver.
278          let (callback_sender, callback_receiver) = oneshot::channel();
279          // Send the request to advance with sync blocks.
280          // This `tx_block_sync_advance_with_sync_blocks.send()` call
281          // causes the `rx_block_sync_advance_with_sync_blocks.recv()` call
282          // in one of the loops in [`Sync::run()`] to return.
283          self.tx_block_sync_insert_block_response
284              .send((peer_ip, blocks, latest_consensus_version, callback_sender))
285              .await?;
286          // Await the callback to continue.
287          callback_receiver.await?
288      }
289  }
290  
291  #[derive(Debug)]
292  pub struct SyncReceiver<N: Network> {
293      pub rx_block_sync_insert_block_response:
294          mpsc::Receiver<(SocketAddr, Vec<Block<N>>, Option<ConsensusVersion>, oneshot::Sender<Result<()>>)>,
295      pub rx_block_sync_remove_peer: mpsc::Receiver<SocketAddr>,
296      pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
297      pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest<N>)>,
298      pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse<N>)>,
299  }
300  
301  /// Initializes the sync channels.
302  pub fn init_sync_channels<N: Network>() -> (SyncSender<N>, SyncReceiver<N>) {
303      let (tx_block_sync_insert_block_response, rx_block_sync_insert_block_response) = mpsc::channel(MAX_CHANNEL_SIZE);
304      let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE);
305      let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE);
306      let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE);
307      let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE);
308  
309      let sender = SyncSender {
310          tx_block_sync_insert_block_response,
311          tx_block_sync_remove_peer,
312          tx_block_sync_update_peer_locators,
313          tx_certificate_request,
314          tx_certificate_response,
315      };
316      let receiver = SyncReceiver {
317          rx_block_sync_insert_block_response,
318          rx_block_sync_remove_peer,
319          rx_block_sync_update_peer_locators,
320          rx_certificate_request,
321          rx_certificate_response,
322      };
323  
324      (sender, receiver)
325  }