channels.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::events::{ 20 BatchPropose, 21 BatchSignature, 22 CertificateRequest, 23 CertificateResponse, 24 TransmissionRequest, 25 TransmissionResponse, 26 }; 27 use alphaos_node_sync::locators::BlockLocators; 28 use alphavm::{ 29 console::network::*, 30 ledger::{ 31 block::{Block, Transaction}, 32 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID}, 33 puzzle::{Solution, SolutionID}, 34 }, 35 prelude::Result, 36 }; 37 38 use indexmap::IndexMap; 39 use std::net::SocketAddr; 40 use tokio::sync::{mpsc, oneshot}; 41 42 const MAX_CHANNEL_SIZE: usize = 8192; 43 44 #[derive(Debug)] 45 pub struct ConsensusSender<N: Network> { 46 pub tx_consensus_subdag: 47 mpsc::Sender<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>, 48 } 49 50 #[derive(Debug)] 51 pub struct ConsensusReceiver<N: Network> { 52 pub rx_consensus_subdag: 53 mpsc::Receiver<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>, 54 } 55 56 /// Initializes the consensus channels. 57 pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusReceiver<N>) { 58 let (tx_consensus_subdag, rx_consensus_subdag) = mpsc::channel(MAX_CHANNEL_SIZE); 59 60 let sender = ConsensusSender { tx_consensus_subdag }; 61 let receiver = ConsensusReceiver { rx_consensus_subdag }; 62 63 (sender, receiver) 64 } 65 66 /// "Interface" that enables, for example, sending data from storage to the the BFT logic. 67 #[derive(Clone, Debug)] 68 pub struct BFTSender<N: Network> { 69 pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<bool>)>, 70 pub tx_primary_certificate: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>, 71 pub tx_sync_bft_dag_at_bootup: mpsc::Sender<Vec<BatchCertificate<N>>>, 72 pub tx_sync_bft: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>, 73 } 74 75 impl<N: Network> BFTSender<N> { 76 /// Sends the current round to the BFT. 77 pub async fn send_primary_round_to_bft(&self, current_round: u64) -> Result<bool> { 78 // Initialize a callback sender and receiver. 79 let (callback_sender, callback_receiver) = oneshot::channel(); 80 // Send the current round to the BFT. 81 self.tx_primary_round.send((current_round, callback_sender)).await?; 82 // Await the callback to continue. 83 Ok(callback_receiver.await?) 84 } 85 86 /// Sends the batch certificate to the BFT. 87 pub async fn send_primary_certificate_to_bft(&self, certificate: BatchCertificate<N>) -> Result<()> { 88 // Initialize a callback sender and receiver. 89 let (callback_sender, callback_receiver) = oneshot::channel(); 90 // Send the certificate to the BFT. 91 self.tx_primary_certificate.send((certificate, callback_sender)).await?; 92 // Await the callback to continue. 93 callback_receiver.await? 94 } 95 96 /// Sends the batch certificates to the BFT for syncing. 97 pub async fn send_sync_bft(&self, certificate: BatchCertificate<N>) -> Result<()> { 98 // Initialize a callback sender and receiver. 99 let (callback_sender, callback_receiver) = oneshot::channel(); 100 // Send the certificate to the BFT for syncing. 101 self.tx_sync_bft.send((certificate, callback_sender)).await?; 102 // Await the callback to continue. 103 callback_receiver.await? 104 } 105 } 106 107 /// Receiving counterpart to `BFTSender` 108 #[derive(Debug)] 109 pub struct BFTReceiver<N: Network> { 110 pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender<bool>)>, 111 pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>, 112 pub rx_sync_bft_dag_at_bootup: mpsc::Receiver<Vec<BatchCertificate<N>>>, 113 pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>, 114 } 115 116 /// Initializes the BFT channels, and returns the sending and receiving ends. 117 pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) { 118 let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE); 119 let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE); 120 let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE); 121 let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE); 122 123 let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft }; 124 let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft }; 125 126 (sender, receiver) 127 } 128 129 #[derive(Clone, Debug)] 130 pub struct PrimarySender<N: Network> { 131 pub tx_batch_propose: mpsc::Sender<(SocketAddr, BatchPropose<N>)>, 132 pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature<N>)>, 133 pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>, 134 pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>, 135 pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>, 136 pub tx_unconfirmed_transaction: 137 mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>, 138 } 139 140 impl<N: Network> PrimarySender<N> { 141 /// Sends the unconfirmed solution to the primary. 142 /// 143 /// # Returns 144 /// - `Ok(true)` if the solution was added to the ready queue. 145 /// - `Ok(false)` if the solution was valid but already exists in the ready queue. 146 /// - `Err(anyhow::Error)` if the solution was invalid. 147 pub async fn send_unconfirmed_solution( 148 &self, 149 solution_id: SolutionID<N>, 150 solution: Data<Solution<N>>, 151 ) -> Result<bool> { 152 // Initialize a callback sender and receiver. 153 let (callback_sender, callback_receiver) = oneshot::channel(); 154 // Send the unconfirmed solution to the primary. 155 self.tx_unconfirmed_solution.send((solution_id, solution, callback_sender)).await?; 156 // Await the callback to continue. 157 callback_receiver.await? 158 } 159 160 /// Sends the unconfirmed transaction to the primary. 161 /// 162 /// # Returns 163 /// - `Ok(true)` if the transaction was added to the ready queue. 164 /// - `Ok(false)` if the transaction was valid but already exists in the ready queue. 165 /// - `Err(anyhow::Error)` if the transaction was invalid. 166 pub async fn send_unconfirmed_transaction( 167 &self, 168 transaction_id: N::TransactionID, 169 transaction: Data<Transaction<N>>, 170 ) -> Result<bool> { 171 // Initialize a callback sender and receiver. 172 let (callback_sender, callback_receiver) = oneshot::channel(); 173 // Send the unconfirmed transaction to the primary. 174 self.tx_unconfirmed_transaction.send((transaction_id, transaction, callback_sender)).await?; 175 // Await the callback to continue. 176 callback_receiver.await? 177 } 178 } 179 180 #[derive(Debug)] 181 pub struct PrimaryReceiver<N: Network> { 182 pub rx_batch_propose: mpsc::Receiver<(SocketAddr, BatchPropose<N>)>, 183 pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>, 184 pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>, 185 pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>, 186 pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>, 187 pub rx_unconfirmed_transaction: 188 mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>, 189 } 190 191 /// Initializes the primary channels. 192 pub fn init_primary_channels<N: Network>() -> (PrimarySender<N>, PrimaryReceiver<N>) { 193 let (tx_batch_propose, rx_batch_propose) = mpsc::channel(MAX_CHANNEL_SIZE); 194 let (tx_batch_signature, rx_batch_signature) = mpsc::channel(MAX_CHANNEL_SIZE); 195 let (tx_batch_certified, rx_batch_certified) = mpsc::channel(MAX_CHANNEL_SIZE); 196 let (tx_primary_ping, rx_primary_ping) = mpsc::channel(MAX_CHANNEL_SIZE); 197 let (tx_unconfirmed_solution, rx_unconfirmed_solution) = mpsc::channel(MAX_CHANNEL_SIZE); 198 let (tx_unconfirmed_transaction, rx_unconfirmed_transaction) = mpsc::channel(MAX_CHANNEL_SIZE); 199 200 let sender = PrimarySender { 201 tx_batch_propose, 202 tx_batch_signature, 203 tx_batch_certified, 204 tx_primary_ping, 205 tx_unconfirmed_solution, 206 tx_unconfirmed_transaction, 207 }; 208 let receiver = PrimaryReceiver { 209 rx_batch_propose, 210 rx_batch_signature, 211 rx_batch_certified, 212 rx_primary_ping, 213 rx_unconfirmed_solution, 214 rx_unconfirmed_transaction, 215 }; 216 217 (sender, receiver) 218 } 219 220 #[derive(Debug)] 221 pub struct WorkerSender<N: Network> { 222 pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>, 223 pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>, 224 pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>, 225 } 226 227 #[derive(Debug)] 228 pub struct WorkerReceiver<N: Network> { 229 pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>, 230 pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>, 231 pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>, 232 } 233 234 /// Initializes the worker channels. 235 pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) { 236 let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE); 237 let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE); 238 let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE); 239 240 let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response }; 241 let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response }; 242 243 (sender, receiver) 244 } 245 246 #[derive(Debug)] 247 pub struct SyncSender<N: Network> { 248 pub tx_block_sync_insert_block_response: 249 mpsc::Sender<(SocketAddr, Vec<Block<N>>, Option<ConsensusVersion>, oneshot::Sender<Result<()>>)>, 250 pub tx_block_sync_remove_peer: mpsc::Sender<SocketAddr>, 251 pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>, 252 pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest<N>)>, 253 pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse<N>)>, 254 } 255 256 impl<N: Network> SyncSender<N> { 257 /// Sends the request to update the peer locators. 258 pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators<N>) -> Result<()> { 259 // Initialize a callback sender and receiver. 260 let (callback_sender, callback_receiver) = oneshot::channel(); 261 // Send the request to update the peer locators. 262 // This `tx_block_sync_update_peer_locators.send()` call 263 // causes the `rx_block_sync_update_peer_locators.recv()` call 264 // in one of the loops in [`Sync::run()`] to return. 265 self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?; 266 // Await the callback to continue. 267 callback_receiver.await? 268 } 269 270 /// Sends the request to insert a new block response. 271 pub async fn insert_block_response( 272 &self, 273 peer_ip: SocketAddr, 274 blocks: Vec<Block<N>>, 275 latest_consensus_version: Option<ConsensusVersion>, 276 ) -> Result<()> { 277 // Initialize a callback sender and receiver. 278 let (callback_sender, callback_receiver) = oneshot::channel(); 279 // Send the request to advance with sync blocks. 280 // This `tx_block_sync_advance_with_sync_blocks.send()` call 281 // causes the `rx_block_sync_advance_with_sync_blocks.recv()` call 282 // in one of the loops in [`Sync::run()`] to return. 283 self.tx_block_sync_insert_block_response 284 .send((peer_ip, blocks, latest_consensus_version, callback_sender)) 285 .await?; 286 // Await the callback to continue. 287 callback_receiver.await? 288 } 289 } 290 291 #[derive(Debug)] 292 pub struct SyncReceiver<N: Network> { 293 pub rx_block_sync_insert_block_response: 294 mpsc::Receiver<(SocketAddr, Vec<Block<N>>, Option<ConsensusVersion>, oneshot::Sender<Result<()>>)>, 295 pub rx_block_sync_remove_peer: mpsc::Receiver<SocketAddr>, 296 pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>, 297 pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest<N>)>, 298 pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse<N>)>, 299 } 300 301 /// Initializes the sync channels. 302 pub fn init_sync_channels<N: Network>() -> (SyncSender<N>, SyncReceiver<N>) { 303 let (tx_block_sync_insert_block_response, rx_block_sync_insert_block_response) = mpsc::channel(MAX_CHANNEL_SIZE); 304 let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE); 305 let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE); 306 let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE); 307 let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE); 308 309 let sender = SyncSender { 310 tx_block_sync_insert_block_response, 311 tx_block_sync_remove_peer, 312 tx_block_sync_update_peer_locators, 313 tx_certificate_request, 314 tx_certificate_response, 315 }; 316 let receiver = SyncReceiver { 317 rx_block_sync_insert_block_response, 318 rx_block_sync_remove_peer, 319 rx_block_sync_update_peer_locators, 320 rx_certificate_request, 321 rx_certificate_response, 322 }; 323 324 (sender, receiver) 325 }