lib.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 #![forbid(unsafe_code)] 20 21 mod transactions_queue; 22 use transactions_queue::TransactionsQueue; 23 24 pub mod clp; 25 pub mod genesis_generation; 26 pub mod ipc_hooks; 27 pub mod ratifications; 28 pub use clp::{ClpConfig, ClpEpochResult, ClpManager}; 29 pub use genesis_generation::{ 30 generate_alpha_genesis, 31 generate_delta_genesis, 32 load_alpha_params, 33 load_delta_params, 34 verify_genesis_hash, 35 AlphaGenesisParams, 36 DeltaGenesisParams, 37 }; 38 // Section 11: Conditionally export network upgrade functions (disabled on mainnet) 39 #[cfg(not(feature = "mainnet"))] 40 pub use genesis_generation::{execute_alpha_genesis_swap, execute_delta_genesis_swap}; 41 pub use ipc_hooks::{IpcEvent, IpcEventHandler, IpcEventReceiver, IpcEventSender, NoOpIpcHandler}; 42 #[cfg(not(feature = "mainnet"))] 43 pub use ratifications::{ 44 check_upgrade_activation_sync, 45 execute_network_upgrade_sync, 46 get_my_vote, 47 verify_upgrade_approval, 48 }; 49 pub use ratifications::{NetworkUpgradeProposal, VoteChoice}; 50 51 #[macro_use] 52 extern crate tracing; 53 54 #[cfg(feature = "metrics")] 55 extern crate alphaos_node_metrics as metrics; 56 57 use alphaos_account::Account; 58 use alphaos_node_bft::{ 59 helpers::{ 60 fmt_id, 61 init_consensus_channels, 62 init_primary_channels, 63 ConsensusReceiver, 64 PrimarySender, 65 Storage as NarwhalStorage, 66 }, 67 spawn_blocking, 68 Primary, 69 BFT, 70 MAX_BATCH_DELAY_IN_MS, 71 }; 72 use alphaos_node_bft_ledger_service::LedgerService; 73 use alphaos_node_bft_storage_service::BFTPersistentStorage; 74 use alphaos_node_sync::{BlockSync, Ping}; 75 76 use alphavm::{ 77 ledger::{ 78 block::Transaction, 79 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID}, 80 puzzle::{Solution, SolutionID}, 81 }, 82 prelude::*, 83 utilities::flatten_error, 84 }; 85 86 use alphastd::StorageMode; 87 use anyhow::{Context, Result}; 88 use colored::Colorize; 89 use indexmap::IndexMap; 90 #[cfg(feature = "locktick")] 91 use locktick::parking_lot::{Mutex, RwLock}; 92 use lru::LruCache; 93 #[cfg(not(feature = "locktick"))] 94 use parking_lot::{Mutex, RwLock}; 95 use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration}; 96 use tokio::{sync::oneshot, task::JoinHandle}; 97 98 #[cfg(feature = "metrics")] 99 use std::collections::HashMap; 100 101 /// The capacity of the queue reserved for deployments. 102 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 103 const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10; 104 /// The capacity of the queue reserved for executions. 105 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 106 const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10; 107 /// The capacity of the queue reserved for solutions. 108 /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity. 109 const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10; 110 /// The **suggested** maximum number of deployments in each interval. 111 /// Note: This is an inbound queue limit, not a Narwhal-enforced limit. 112 const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1; 113 114 /// Wrapper around `BFT` that adds additional functionality, such as a mempool. 115 /// 116 /// Consensus acts as a rate limiter to prevents workers in BFT from being overloaded. 117 /// Each worker maintains a ready queue (which is essentially also a mempool), but verifies transactions/solutions 118 /// before enquing them. 119 /// Consensus only passes more transactions/solutions to the BFT layer if its ready queues are not already full. 120 #[derive(Clone)] 121 pub struct Consensus<N: Network> { 122 /// The ledger. 123 ledger: Arc<dyn LedgerService<N>>, 124 /// The BFT. 125 bft: BFT<N>, 126 /// The primary sender. 127 primary_sender: PrimarySender<N>, 128 /// The unconfirmed solutions queue. 129 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>, 130 /// The unconfirmed transactions queue. 131 transactions_queue: Arc<RwLock<TransactionsQueue<N>>>, 132 /// The recently-seen unconfirmed solutions. 133 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>, 134 /// The recently-seen unconfirmed transactions. 135 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>, 136 #[cfg(feature = "metrics")] 137 transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>, 138 /// The spawned handles. 139 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 140 /// The ping logic. 141 ping: Arc<Ping<N>>, 142 /// The block sync logic. 143 block_sync: Arc<BlockSync<N>>, 144 /// Optional IPC event handler for cross-chain communication with DELTA. 145 ipc_handler: Option<Arc<dyn IpcEventHandler<N>>>, 146 /// Optional CLP manager for validator liveness proofs. 147 clp_manager: Option<Arc<ClpManager>>, 148 } 149 150 impl<N: Network> Consensus<N> { 151 /// Initializes a new instance of consensus and spawn its background tasks. 152 #[allow(clippy::too_many_arguments)] 153 pub async fn new( 154 account: Account<N>, 155 ledger: Arc<dyn LedgerService<N>>, 156 block_sync: Arc<BlockSync<N>>, 157 ip: Option<SocketAddr>, 158 trusted_validators: &[SocketAddr], 159 trusted_peers_only: bool, 160 storage_mode: StorageMode, 161 ping: Arc<Ping<N>>, 162 dev: Option<u16>, 163 ) -> Result<Self> { 164 Self::with_ipc_handler( 165 account, 166 ledger, 167 block_sync, 168 ip, 169 trusted_validators, 170 trusted_peers_only, 171 storage_mode, 172 ping, 173 dev, 174 None, 175 ) 176 .await 177 } 178 179 /// Initializes a new instance of consensus with an optional IPC handler for cross-chain communication. 180 #[allow(clippy::too_many_arguments)] 181 pub async fn with_ipc_handler( 182 account: Account<N>, 183 ledger: Arc<dyn LedgerService<N>>, 184 block_sync: Arc<BlockSync<N>>, 185 ip: Option<SocketAddr>, 186 trusted_validators: &[SocketAddr], 187 trusted_peers_only: bool, 188 storage_mode: StorageMode, 189 ping: Arc<Ping<N>>, 190 dev: Option<u16>, 191 ipc_handler: Option<Arc<dyn IpcEventHandler<N>>>, 192 ) -> Result<Self> { 193 // Initialize the primary channels. 194 let (primary_sender, primary_receiver) = init_primary_channels::<N>(); 195 // Initialize the Narwhal transmissions. 196 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?); 197 // Initialize the Narwhal storage. 198 let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64); 199 // Initialize the BFT. 200 let bft = BFT::new( 201 account, 202 storage, 203 ledger.clone(), 204 block_sync.clone(), 205 ip, 206 trusted_validators, 207 trusted_peers_only, 208 storage_mode, 209 dev, 210 )?; 211 // Create a new instance of Consensus. 212 let mut _self = Self { 213 ledger, 214 bft, 215 block_sync, 216 primary_sender, 217 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))), 218 transactions_queue: Default::default(), 219 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))), 220 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))), 221 #[cfg(feature = "metrics")] 222 transmissions_tracker: Default::default(), 223 handles: Default::default(), 224 ping: ping.clone(), 225 ipc_handler, 226 clp_manager: None, // CLP can be enabled via set_clp_manager() 227 }; 228 229 info!("Starting the consensus instance..."); 230 231 // First, initialize the consensus channels. 232 let (consensus_sender, consensus_receiver) = init_consensus_channels(); 233 // Then, start the consensus handlers. 234 _self.start_handlers(consensus_receiver); 235 // Lastly, also start BFTs handlers. 236 _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?; 237 238 Ok(_self) 239 } 240 241 /// Returns the underlying `BFT` struct. 242 pub const fn bft(&self) -> &BFT<N> { 243 &self.bft 244 } 245 246 pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool { 247 self.transactions_queue.read().contains(transaction_id) 248 } 249 250 /// Set the IPC event handler for cross-chain communication. 251 /// 252 /// This can be called after construction to wire up the adnet runtime. 253 pub fn set_ipc_handler(&mut self, handler: Arc<dyn IpcEventHandler<N>>) { 254 self.ipc_handler = Some(handler); 255 } 256 257 /// Returns whether an IPC handler is configured. 258 pub fn has_ipc_handler(&self) -> bool { 259 self.ipc_handler.is_some() 260 } 261 262 /// Set the CLP manager for validator liveness proofs. 263 /// 264 /// This enables the Continuous Liveness Proof system for this consensus instance. 265 pub fn set_clp_manager(&mut self, manager: Arc<ClpManager>) { 266 self.clp_manager = Some(manager); 267 } 268 269 /// Returns whether a CLP manager is configured. 270 pub fn has_clp_manager(&self) -> bool { 271 self.clp_manager.is_some() 272 } 273 274 /// Returns a reference to the CLP manager if configured. 275 pub fn clp_manager(&self) -> Option<&Arc<ClpManager>> { 276 self.clp_manager.as_ref() 277 } 278 } 279 280 impl<N: Network> Consensus<N> { 281 /// Returns the number of unconfirmed transmissions in the BFT's workers (not in the mempool). 282 pub fn num_unconfirmed_transmissions(&self) -> usize { 283 self.bft.num_unconfirmed_transmissions() 284 } 285 286 /// Returns the number of unconfirmed ratifications in the BFT's workers (not in the mempool). 287 pub fn num_unconfirmed_ratifications(&self) -> usize { 288 self.bft.num_unconfirmed_ratifications() 289 } 290 291 /// Returns the number unconfirmed solutions in the BFT's workers (not in the mempool). 292 pub fn num_unconfirmed_solutions(&self) -> usize { 293 self.bft.num_unconfirmed_solutions() 294 } 295 296 /// Returns the number of unconfirmed transactions. 297 pub fn num_unconfirmed_transactions(&self) -> usize { 298 self.bft.num_unconfirmed_transactions() 299 } 300 } 301 302 impl<N: Network> Consensus<N> { 303 /// Returns the unconfirmed transmission IDs. 304 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> { 305 self.worker_transmission_ids().chain(self.inbound_transmission_ids()) 306 } 307 308 /// Returns the unconfirmed transmissions. 309 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> { 310 self.worker_transmissions().chain(self.inbound_transmissions()) 311 } 312 313 /// Returns the unconfirmed solutions. 314 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> { 315 self.worker_solutions().chain(self.inbound_solutions()) 316 } 317 318 /// Returns the unconfirmed transactions. 319 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> { 320 self.worker_transactions().chain(self.inbound_transactions()) 321 } 322 } 323 324 impl<N: Network> Consensus<N> { 325 /// Returns the worker transmission IDs. 326 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> { 327 self.bft.worker_transmission_ids() 328 } 329 330 /// Returns the worker transmissions. 331 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> { 332 self.bft.worker_transmissions() 333 } 334 335 /// Returns the worker solutions. 336 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> { 337 self.bft.worker_solutions() 338 } 339 340 /// Returns the worker transactions. 341 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> { 342 self.bft.worker_transactions() 343 } 344 } 345 346 impl<N: Network> Consensus<N> { 347 /// Returns the transmission IDs in the inbound queue. 348 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> { 349 self.inbound_transmissions().map(|(id, _)| id) 350 } 351 352 /// Returns the transmissions in the inbound queue. 353 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> { 354 self.inbound_transactions() 355 .map(|(id, tx)| { 356 ( 357 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()), 358 Transmission::Transaction(tx), 359 ) 360 }) 361 .chain(self.inbound_solutions().map(|(id, solution)| { 362 ( 363 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()), 364 Transmission::Solution(solution), 365 ) 366 })) 367 } 368 369 /// Returns the solutions in the inbound queue. 370 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> { 371 // Return an iterator over the solutions in the inbound queue. 372 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution))) 373 } 374 375 /// Returns the transactions in the inbound queue. 376 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> { 377 // Return an iterator over the deployment and execution transactions in the inbound queue. 378 self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx))) 379 } 380 } 381 382 impl<N: Network> Consensus<N> { 383 /// Adds the given unconfirmed solution to the memory pool, which will then eventually be passed 384 /// to the BFT layer for inclusion in a batch. 385 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> { 386 // Calculate the transmission checksum. 387 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?; 388 // Queue the unconfirmed solution. 389 { 390 let solution_id = solution.id(); 391 392 // Check if the transaction was recently seen. 393 if self.seen_solutions.lock().put(solution_id, ()).is_some() { 394 // If the transaction was recently seen, return early. 395 return Ok(()); 396 } 397 // Check if the solution already exists in the ledger. 398 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? { 399 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed()); 400 } 401 #[cfg(feature = "metrics")] 402 { 403 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64); 404 let timestamp = alphaos_node_bft::helpers::now(); 405 self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp); 406 } 407 // Add the solution to the memory pool. 408 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id)); 409 if self.solutions_queue.lock().put(solution_id, solution).is_some() { 410 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id)); 411 } 412 } 413 414 // Try to process the unconfirmed solutions in the memory pool. 415 self.process_unconfirmed_solutions().await 416 } 417 418 /// Processes unconfirmed solutions in the mempool, and passes them to the BFT layer 419 /// (if sufficient space is available). 420 async fn process_unconfirmed_solutions(&self) -> Result<()> { 421 // If the memory pool of this node is full, return early. 422 let num_unconfirmed_solutions = self.num_unconfirmed_solutions(); 423 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); 424 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS 425 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE 426 { 427 return Ok(()); 428 } 429 // Retrieve the solutions. 430 let solutions = { 431 // Determine the available capacity. 432 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions); 433 // Acquire the lock on the queue. 434 let mut queue = self.solutions_queue.lock(); 435 // Determine the number of solutions to send. 436 let num_solutions = queue.len().min(capacity); 437 // Drain the solutions from the queue. 438 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>() 439 }; 440 // Iterate over the solutions. 441 for solution in solutions.into_iter() { 442 let solution_id = solution.id(); 443 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id)); 444 // Send the unconfirmed solution to the primary. 445 match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await { 446 Ok(true) => {} 447 Ok(false) => debug!( 448 "Unable to add unconfirmed solution '{}' to the memory pool. Already exists.", 449 fmt_id(solution_id) 450 ), 451 Err(err) => { 452 let err = err.context(format!( 453 "Unable to add unconfirmed solution '{}' to the memory pool", 454 fmt_id(solution_id) 455 )); 456 457 // If the node is synced and the occurs after the first 10 blocks of an epoch, log it as a warning, otherwise trace it. 458 if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 { 459 warn!("{}", flatten_error(err)); 460 } else { 461 trace!("{}", flatten_error(err)); 462 } 463 } 464 } 465 } 466 Ok(()) 467 } 468 469 /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed 470 /// to the BFT layer for inclusion in a batch. 471 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> { 472 // Calculate the transmission checksum. 473 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?; 474 // Queue the unconfirmed transaction. 475 { 476 let transaction_id = transaction.id(); 477 478 // Check that the transaction is not a fee transaction. 479 if transaction.is_fee() { 480 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed()); 481 } 482 // Check if the transaction was recently seen. 483 if self.seen_transactions.lock().put(transaction_id, ()).is_some() { 484 // If the transaction was recently seen, return early. 485 return Ok(()); 486 } 487 // Check if the transaction already exists in the ledger. 488 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? { 489 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed()); 490 } 491 // Check that the transaction is not in the mempool. 492 if self.contains_transaction(&transaction_id) { 493 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id)); 494 } 495 #[cfg(feature = "metrics")] 496 { 497 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64); 498 let timestamp = alphaos_node_bft::helpers::now(); 499 self.transmissions_tracker 500 .lock() 501 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp); 502 } 503 // Add the transaction to the memory pool. 504 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id)); 505 let priority_fee = transaction.priority_fee_amount()?; 506 self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?; 507 } 508 509 // Try to process the unconfirmed transactions in the memory pool. 510 self.process_unconfirmed_transactions().await 511 } 512 513 /// Processes unconfirmed transactions in the mempool, and passes them to the BFT layer 514 /// (if sufficient space is available). 515 async fn process_unconfirmed_transactions(&self) -> Result<()> { 516 // If the memory pool of this node is full, return early. 517 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); 518 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE { 519 return Ok(()); 520 } 521 // Retrieve the transactions. 522 let transactions = { 523 // Determine the available capacity. 524 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions); 525 // Acquire the lock on the transactions queue. 526 let mut tx_queue = self.transactions_queue.write(); 527 // Determine the number of deployments to send. 528 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL); 529 // Determine the number of executions to send. 530 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments)); 531 // Create an iterator which will select interleaved deployments and executions within the capacity. 532 // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue. 533 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false)); 534 // Drain the transactions from the queue, interleaving deployments and executions. 535 selector_iter 536 .filter_map( 537 |select_deployment| { 538 if select_deployment { 539 tx_queue.deployments.pop() 540 } else { 541 tx_queue.executions.pop() 542 } 543 }, 544 ) 545 .map(|(_, tx)| tx) 546 .collect_vec() 547 }; 548 // Iterate over the transactions. 549 for transaction in transactions.into_iter() { 550 let transaction_id = transaction.id(); 551 // Determine the type of the transaction. The fee type is technically not possible here. 552 let tx_type_str = match transaction { 553 Transaction::Deploy(..) => "deployment", 554 Transaction::Execute(..) => "execution", 555 Transaction::Fee(..) => "fee", 556 }; 557 trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id)); 558 // Send the unconfirmed transaction to the primary. 559 match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await { 560 Ok(true) => {} 561 Ok(false) => debug!( 562 "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.", 563 fmt_id(transaction_id) 564 ), 565 Err(err) => { 566 // If the BFT is synced, then log the warning. 567 if self.bft.is_synced() { 568 let err = err.context(format!( 569 "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool", 570 fmt_id(transaction_id) 571 )); 572 warn!("{}", flatten_error(err)); 573 } 574 } 575 } 576 } 577 Ok(()) 578 } 579 } 580 581 impl<N: Network> Consensus<N> { 582 /// Starts the consensus handlers. 583 /// 584 /// This is only invoked once, in the constructor. 585 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) { 586 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver; 587 588 // Process the committed subdag and transmissions from the BFT. 589 let self_ = self.clone(); 590 self.spawn(async move { 591 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await { 592 self_.process_bft_subdag(committed_subdag, transmissions, callback).await; 593 } 594 }); 595 596 // Process the unconfirmed transactions in the memory pool. 597 // 598 // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted 599 // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions. 600 let self_ = self.clone(); 601 self.spawn(async move { 602 loop { 603 // Sleep briefly. 604 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; 605 // Process the unconfirmed transactions in the memory pool. 606 if let Err(err) = self_.process_unconfirmed_transactions().await { 607 warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions"))); 608 } 609 // Process the unconfirmed solutions in the memory pool. 610 if let Err(err) = self_.process_unconfirmed_solutions().await { 611 warn!("{}", flatten_error(err.context("Cannot process unconfirmed solutions"))); 612 } 613 } 614 }); 615 } 616 617 /// Attempts to build a new block from the given subDAG, and (tries to) advance the legder to it. 618 async fn process_bft_subdag( 619 &self, 620 subdag: Subdag<N>, 621 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>, 622 callback: oneshot::Sender<Result<()>>, 623 ) { 624 // Try to advance to the next block. 625 let self_ = self.clone(); 626 let transmissions_ = transmissions.clone(); 627 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") }; 628 629 // If the block failed to advance, reinsert the transmissions into the memory pool. 630 if let Err(e) = &result { 631 error!("{}", flatten_error(e)); 632 // On failure, reinsert the transmissions into the memory pool. 633 self.reinsert_transmissions(transmissions).await; 634 } 635 // Send the callback **after** advancing to the next block. 636 // Note: We must await the block to be advanced before sending the callback. 637 callback.send(result).ok(); 638 } 639 640 /// Attempts to advance the ledger to the next block, and updates the metrics (if enabled) accordingly. 641 fn try_advance_to_next_block( 642 &self, 643 subdag: Subdag<N>, 644 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>, 645 ) -> Result<()> { 646 #[cfg(feature = "metrics")] 647 let start = subdag.leader_certificate().batch_header().timestamp(); 648 #[cfg(feature = "metrics")] 649 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>(); 650 #[cfg(feature = "metrics")] 651 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp(); 652 653 // Create the candidate next block. 654 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?; 655 // Check that the block is well-formed. 656 self.ledger.check_next_block(&next_block)?; 657 // Advance to the next block. 658 self.ledger.advance_to_next_block(&next_block)?; 659 #[cfg(feature = "telemetry")] 660 // Fetch the latest committee 661 let latest_committee = self.ledger.current_committee()?; 662 663 // Section 12b: Check for network upgrade activation (testnet only) 664 #[cfg(not(feature = "mainnet"))] 665 { 666 let next_height = next_block.height(); 667 if let Ok(Some(proposal)) = ratifications::check_upgrade_activation_sync::<N>(&self.ledger, next_height) { 668 info!("🚨 NETWORK UPGRADE DETECTED AT HEIGHT {}", next_height); 669 670 // Verify approval 671 let total_governors = 5; // TODO: Get from committee 672 if let Err(e) = ratifications::verify_upgrade_approval(&proposal, total_governors) { 673 error!("⚠️ Network upgrade failed approval: {}", e); 674 } else { 675 // Use a placeholder address for testing 676 let placeholder_addr: Address<N> = Address::zero(); 677 match ratifications::get_my_vote::<N>(&self.ledger, &proposal.proposal_id, &placeholder_addr) { 678 Ok(vote) => { 679 info!("📊 Vote choice: {:?}", vote); 680 if let Err(e) = 681 ratifications::execute_network_upgrade_sync::<N>(&proposal, vote, &self.ledger) 682 { 683 error!("⚠️ Network upgrade execution failed: {}", e); 684 } 685 } 686 Err(e) => error!("Could not retrieve vote: {}", e), 687 } 688 } 689 } 690 } 691 692 // If the next block starts a new epoch, clear the existing solutions. 693 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 { 694 // Clear the solutions queue. 695 self.solutions_queue.lock().clear(); 696 // Clear the worker solutions. 697 self.bft.primary().clear_worker_solutions(); 698 } 699 700 // Notify peers that we have a new block. 701 let locators = self.block_sync.get_block_locators()?; 702 self.ping.update_block_locators(locators); 703 704 // Make block sync aware of the new block. 705 self.block_sync.set_sync_height(next_block.height()); 706 707 // Notify IPC handler of block finalization (for cross-chain communication with DELTA) 708 if let Some(ref handler) = self.ipc_handler { 709 let state_root = ipc_hooks::extract_state_root(&next_block); 710 handler.on_block_finalized(next_block.height() as u64, state_root, *next_block.hash()); 711 712 // Scan for lock_for_sax transactions and notify handler 713 ipc_hooks::scan_for_lock_transactions(&next_block, handler.as_ref()); 714 } 715 716 // CLP: Generate challenge and evaluate penalties at epoch boundary 717 if let Some(ref clp) = self.clp_manager { 718 // Generate a challenge if enough time has passed since the last one 719 let block_hash_bytes: [u8; 32] = 720 next_block.hash().to_bytes_le().map(|v| v.try_into().unwrap_or([0u8; 32])).unwrap_or([0u8; 32]); 721 let block_height: u64 = next_block.height().into(); 722 if let Some(_challenge) = clp.on_block_finalized(block_height, &block_hash_bytes) { 723 // Challenge generated - in production, this would be broadcast to the network 724 // For now, the challenge is registered in the aggregator 725 debug!("CLP challenge generated at block {}", block_height); 726 } 727 728 // Finalize any expired challenges 729 clp.finalize_expired_challenges(block_height); 730 731 // At epoch boundary, evaluate CLP penalties 732 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 { 733 if let Some(result) = clp.evaluate_epoch() { 734 info!( 735 "CLP epoch {} complete: {} validators passed, {} slashed, {} ejected", 736 result.epoch, 737 result.passed.len(), 738 result.slashed.len(), 739 result.ejected.len() 740 ); 741 // TODO: Apply slashes and ejections to staking state 742 } 743 } 744 } 745 746 // TODO(kaimast): This should also remove any transmissions/solutions contained in the block from the mempool. 747 // Removal currently happens when Consensus eventually passes them to the worker, which then just discards them. 748 749 #[cfg(feature = "metrics")] 750 { 751 let now_utc = alphaos_node_bft::helpers::now_utc(); 752 let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64); 753 let next_block_timestamp = next_block.header().metadata().timestamp(); 754 let next_block_utc = alphaos_node_bft::helpers::to_utc_datetime(next_block_timestamp); 755 let block_latency = next_block_timestamp - current_block_timestamp; 756 let block_lag = (now_utc - next_block_utc).whole_milliseconds(); 757 758 let proof_target = next_block.header().proof_target(); 759 let coinbase_target = next_block.header().coinbase_target(); 760 let cumulative_proof_target = next_block.header().cumulative_proof_target(); 761 762 // Calculate latency for all transmissions included in this block. 763 metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block); 764 765 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64); 766 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64()); 767 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64); 768 metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64); 769 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64); 770 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64); 771 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64); 772 773 #[cfg(feature = "telemetry")] 774 { 775 // Retrieve the latest participation scores. 776 let participation_scores = 777 self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee); 778 779 // Log the participation scores. 780 for (address, participation_score) in participation_scores { 781 metrics::histogram_label( 782 metrics::consensus::VALIDATOR_PARTICIPATION, 783 "validator_address", 784 address.to_string(), 785 participation_score, 786 ) 787 } 788 } 789 } 790 Ok(()) 791 } 792 793 /// Reinserts the given transmissions into the memory pool. 794 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) { 795 // Iterate over the transmissions. 796 for (transmission_id, transmission) in transmissions.into_iter() { 797 // Reinsert the transmission into the memory pool. 798 match self.reinsert_transmission(transmission_id, transmission).await { 799 Ok(true) => {} 800 Ok(false) => debug!( 801 "Unable to reinsert transmission {}.{} into the memory pool. Already exists.", 802 fmt_id(transmission_id), 803 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() 804 ), 805 Err(err) => { 806 let err = err.context(format!( 807 "Unable to reinsert transmission {}.{} into the memory pool", 808 fmt_id(transmission_id), 809 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() 810 )); 811 warn!("{}", flatten_error(err)); 812 } 813 } 814 } 815 } 816 817 /// Reinserts the given transmission into the memory pool. 818 /// 819 /// # Returns 820 /// - `Ok(true)` if the transmission was added to the memory pool. 821 /// - `Ok(false)` if the transmission was valid but already exists in the memory pool. 822 /// - `Err(anyhow::Error)` if the transmission was invalid. 823 async fn reinsert_transmission( 824 &self, 825 transmission_id: TransmissionID<N>, 826 transmission: Transmission<N>, 827 ) -> Result<bool> { 828 // Initialize a callback sender and receiver. 829 let (callback, callback_receiver) = oneshot::channel(); 830 // Send the transmission to the primary. 831 match (transmission_id, transmission) { 832 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(true), 833 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => { 834 // Send the solution to the primary. 835 self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?; 836 } 837 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => { 838 // Send the transaction to the primary. 839 self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?; 840 } 841 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"), 842 } 843 // Await the callback. 844 callback_receiver.await? 845 } 846 847 /// Spawns a task with the given future; it should only be used for long-running tasks. 848 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 849 self.handles.lock().push(tokio::spawn(future)); 850 } 851 852 /// Shuts down the consensus and BFT layers. 853 pub async fn shut_down(&self) { 854 info!("Shutting down consensus..."); 855 // Shut down the BFT. 856 self.bft.shut_down().await; 857 // Abort the tasks. 858 self.handles.lock().iter().for_each(|handle| handle.abort()); 859 } 860 }