/ node / consensus / src / lib.rs
lib.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  #![forbid(unsafe_code)]
 20  
 21  mod transactions_queue;
 22  use transactions_queue::TransactionsQueue;
 23  
 24  pub mod clp;
 25  pub mod genesis_generation;
 26  pub mod ipc_hooks;
 27  pub mod ratifications;
 28  pub use clp::{ClpConfig, ClpEpochResult, ClpManager};
 29  pub use genesis_generation::{
 30      generate_alpha_genesis,
 31      generate_delta_genesis,
 32      load_alpha_params,
 33      load_delta_params,
 34      verify_genesis_hash,
 35      AlphaGenesisParams,
 36      DeltaGenesisParams,
 37  };
 38  // Section 11: Conditionally export network upgrade functions (disabled on mainnet)
 39  #[cfg(not(feature = "mainnet"))]
 40  pub use genesis_generation::{execute_alpha_genesis_swap, execute_delta_genesis_swap};
 41  pub use ipc_hooks::{IpcEvent, IpcEventHandler, IpcEventReceiver, IpcEventSender, NoOpIpcHandler};
 42  #[cfg(not(feature = "mainnet"))]
 43  pub use ratifications::{
 44      check_upgrade_activation_sync,
 45      execute_network_upgrade_sync,
 46      get_my_vote,
 47      verify_upgrade_approval,
 48  };
 49  pub use ratifications::{NetworkUpgradeProposal, VoteChoice};
 50  
 51  #[macro_use]
 52  extern crate tracing;
 53  
 54  #[cfg(feature = "metrics")]
 55  extern crate alphaos_node_metrics as metrics;
 56  
 57  use alphaos_account::Account;
 58  use alphaos_node_bft::{
 59      helpers::{
 60          fmt_id,
 61          init_consensus_channels,
 62          init_primary_channels,
 63          ConsensusReceiver,
 64          PrimarySender,
 65          Storage as NarwhalStorage,
 66      },
 67      spawn_blocking,
 68      Primary,
 69      BFT,
 70      MAX_BATCH_DELAY_IN_MS,
 71  };
 72  use alphaos_node_bft_ledger_service::LedgerService;
 73  use alphaos_node_bft_storage_service::BFTPersistentStorage;
 74  use alphaos_node_sync::{BlockSync, Ping};
 75  
 76  use alphavm::{
 77      ledger::{
 78          block::Transaction,
 79          narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
 80          puzzle::{Solution, SolutionID},
 81      },
 82      prelude::*,
 83      utilities::flatten_error,
 84  };
 85  
 86  use alphastd::StorageMode;
 87  use anyhow::{Context, Result};
 88  use colored::Colorize;
 89  use indexmap::IndexMap;
 90  #[cfg(feature = "locktick")]
 91  use locktick::parking_lot::{Mutex, RwLock};
 92  use lru::LruCache;
 93  #[cfg(not(feature = "locktick"))]
 94  use parking_lot::{Mutex, RwLock};
 95  use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
 96  use tokio::{sync::oneshot, task::JoinHandle};
 97  
 98  #[cfg(feature = "metrics")]
 99  use std::collections::HashMap;
100  
101  /// The capacity of the queue reserved for deployments.
102  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
103  const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
104  /// The capacity of the queue reserved for executions.
105  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
106  const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
107  /// The capacity of the queue reserved for solutions.
108  /// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
109  const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
110  /// The **suggested** maximum number of deployments in each interval.
111  /// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
112  const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
113  
114  /// Wrapper around `BFT` that adds additional functionality, such as a mempool.
115  ///
116  /// Consensus acts as a rate limiter to prevents workers in BFT from being overloaded.
117  /// Each worker maintains a ready queue (which is essentially also a mempool), but verifies transactions/solutions
118  /// before enquing them.
119  /// Consensus only passes more transactions/solutions to the BFT layer if its ready queues are not already full.
120  #[derive(Clone)]
121  pub struct Consensus<N: Network> {
122      /// The ledger.
123      ledger: Arc<dyn LedgerService<N>>,
124      /// The BFT.
125      bft: BFT<N>,
126      /// The primary sender.
127      primary_sender: PrimarySender<N>,
128      /// The unconfirmed solutions queue.
129      solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
130      /// The unconfirmed transactions queue.
131      transactions_queue: Arc<RwLock<TransactionsQueue<N>>>,
132      /// The recently-seen unconfirmed solutions.
133      seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
134      /// The recently-seen unconfirmed transactions.
135      seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
136      #[cfg(feature = "metrics")]
137      transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
138      /// The spawned handles.
139      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
140      /// The ping logic.
141      ping: Arc<Ping<N>>,
142      /// The block sync logic.
143      block_sync: Arc<BlockSync<N>>,
144      /// Optional IPC event handler for cross-chain communication with DELTA.
145      ipc_handler: Option<Arc<dyn IpcEventHandler<N>>>,
146      /// Optional CLP manager for validator liveness proofs.
147      clp_manager: Option<Arc<ClpManager>>,
148  }
149  
150  impl<N: Network> Consensus<N> {
151      /// Initializes a new instance of consensus and spawn its background tasks.
152      #[allow(clippy::too_many_arguments)]
153      pub async fn new(
154          account: Account<N>,
155          ledger: Arc<dyn LedgerService<N>>,
156          block_sync: Arc<BlockSync<N>>,
157          ip: Option<SocketAddr>,
158          trusted_validators: &[SocketAddr],
159          trusted_peers_only: bool,
160          storage_mode: StorageMode,
161          ping: Arc<Ping<N>>,
162          dev: Option<u16>,
163      ) -> Result<Self> {
164          Self::with_ipc_handler(
165              account,
166              ledger,
167              block_sync,
168              ip,
169              trusted_validators,
170              trusted_peers_only,
171              storage_mode,
172              ping,
173              dev,
174              None,
175          )
176          .await
177      }
178  
179      /// Initializes a new instance of consensus with an optional IPC handler for cross-chain communication.
180      #[allow(clippy::too_many_arguments)]
181      pub async fn with_ipc_handler(
182          account: Account<N>,
183          ledger: Arc<dyn LedgerService<N>>,
184          block_sync: Arc<BlockSync<N>>,
185          ip: Option<SocketAddr>,
186          trusted_validators: &[SocketAddr],
187          trusted_peers_only: bool,
188          storage_mode: StorageMode,
189          ping: Arc<Ping<N>>,
190          dev: Option<u16>,
191          ipc_handler: Option<Arc<dyn IpcEventHandler<N>>>,
192      ) -> Result<Self> {
193          // Initialize the primary channels.
194          let (primary_sender, primary_receiver) = init_primary_channels::<N>();
195          // Initialize the Narwhal transmissions.
196          let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
197          // Initialize the Narwhal storage.
198          let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
199          // Initialize the BFT.
200          let bft = BFT::new(
201              account,
202              storage,
203              ledger.clone(),
204              block_sync.clone(),
205              ip,
206              trusted_validators,
207              trusted_peers_only,
208              storage_mode,
209              dev,
210          )?;
211          // Create a new instance of Consensus.
212          let mut _self = Self {
213              ledger,
214              bft,
215              block_sync,
216              primary_sender,
217              solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
218              transactions_queue: Default::default(),
219              seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
220              seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
221              #[cfg(feature = "metrics")]
222              transmissions_tracker: Default::default(),
223              handles: Default::default(),
224              ping: ping.clone(),
225              ipc_handler,
226              clp_manager: None, // CLP can be enabled via set_clp_manager()
227          };
228  
229          info!("Starting the consensus instance...");
230  
231          // First, initialize the consensus channels.
232          let (consensus_sender, consensus_receiver) = init_consensus_channels();
233          // Then, start the consensus handlers.
234          _self.start_handlers(consensus_receiver);
235          // Lastly, also start BFTs handlers.
236          _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
237  
238          Ok(_self)
239      }
240  
241      /// Returns the underlying `BFT` struct.
242      pub const fn bft(&self) -> &BFT<N> {
243          &self.bft
244      }
245  
246      pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool {
247          self.transactions_queue.read().contains(transaction_id)
248      }
249  
250      /// Set the IPC event handler for cross-chain communication.
251      ///
252      /// This can be called after construction to wire up the adnet runtime.
253      pub fn set_ipc_handler(&mut self, handler: Arc<dyn IpcEventHandler<N>>) {
254          self.ipc_handler = Some(handler);
255      }
256  
257      /// Returns whether an IPC handler is configured.
258      pub fn has_ipc_handler(&self) -> bool {
259          self.ipc_handler.is_some()
260      }
261  
262      /// Set the CLP manager for validator liveness proofs.
263      ///
264      /// This enables the Continuous Liveness Proof system for this consensus instance.
265      pub fn set_clp_manager(&mut self, manager: Arc<ClpManager>) {
266          self.clp_manager = Some(manager);
267      }
268  
269      /// Returns whether a CLP manager is configured.
270      pub fn has_clp_manager(&self) -> bool {
271          self.clp_manager.is_some()
272      }
273  
274      /// Returns a reference to the CLP manager if configured.
275      pub fn clp_manager(&self) -> Option<&Arc<ClpManager>> {
276          self.clp_manager.as_ref()
277      }
278  }
279  
280  impl<N: Network> Consensus<N> {
281      /// Returns the number of unconfirmed transmissions in the BFT's workers (not in the mempool).
282      pub fn num_unconfirmed_transmissions(&self) -> usize {
283          self.bft.num_unconfirmed_transmissions()
284      }
285  
286      /// Returns the number of unconfirmed ratifications in the BFT's workers (not in the mempool).
287      pub fn num_unconfirmed_ratifications(&self) -> usize {
288          self.bft.num_unconfirmed_ratifications()
289      }
290  
291      /// Returns the number unconfirmed solutions in the BFT's workers (not in the mempool).
292      pub fn num_unconfirmed_solutions(&self) -> usize {
293          self.bft.num_unconfirmed_solutions()
294      }
295  
296      /// Returns the number of unconfirmed transactions.
297      pub fn num_unconfirmed_transactions(&self) -> usize {
298          self.bft.num_unconfirmed_transactions()
299      }
300  }
301  
302  impl<N: Network> Consensus<N> {
303      /// Returns the unconfirmed transmission IDs.
304      pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
305          self.worker_transmission_ids().chain(self.inbound_transmission_ids())
306      }
307  
308      /// Returns the unconfirmed transmissions.
309      pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
310          self.worker_transmissions().chain(self.inbound_transmissions())
311      }
312  
313      /// Returns the unconfirmed solutions.
314      pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
315          self.worker_solutions().chain(self.inbound_solutions())
316      }
317  
318      /// Returns the unconfirmed transactions.
319      pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
320          self.worker_transactions().chain(self.inbound_transactions())
321      }
322  }
323  
324  impl<N: Network> Consensus<N> {
325      /// Returns the worker transmission IDs.
326      pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
327          self.bft.worker_transmission_ids()
328      }
329  
330      /// Returns the worker transmissions.
331      pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
332          self.bft.worker_transmissions()
333      }
334  
335      /// Returns the worker solutions.
336      pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
337          self.bft.worker_solutions()
338      }
339  
340      /// Returns the worker transactions.
341      pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
342          self.bft.worker_transactions()
343      }
344  }
345  
346  impl<N: Network> Consensus<N> {
347      /// Returns the transmission IDs in the inbound queue.
348      pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
349          self.inbound_transmissions().map(|(id, _)| id)
350      }
351  
352      /// Returns the transmissions in the inbound queue.
353      pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
354          self.inbound_transactions()
355              .map(|(id, tx)| {
356                  (
357                      TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
358                      Transmission::Transaction(tx),
359                  )
360              })
361              .chain(self.inbound_solutions().map(|(id, solution)| {
362                  (
363                      TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
364                      Transmission::Solution(solution),
365                  )
366              }))
367      }
368  
369      /// Returns the solutions in the inbound queue.
370      pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
371          // Return an iterator over the solutions in the inbound queue.
372          self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
373      }
374  
375      /// Returns the transactions in the inbound queue.
376      pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
377          // Return an iterator over the deployment and execution transactions in the inbound queue.
378          self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx)))
379      }
380  }
381  
382  impl<N: Network> Consensus<N> {
383      /// Adds the given unconfirmed solution to the memory pool, which will then eventually be passed
384      /// to the BFT layer for inclusion in a batch.
385      pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
386          // Calculate the transmission checksum.
387          let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
388          // Queue the unconfirmed solution.
389          {
390              let solution_id = solution.id();
391  
392              // Check if the transaction was recently seen.
393              if self.seen_solutions.lock().put(solution_id, ()).is_some() {
394                  // If the transaction was recently seen, return early.
395                  return Ok(());
396              }
397              // Check if the solution already exists in the ledger.
398              if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
399                  bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
400              }
401              #[cfg(feature = "metrics")]
402              {
403                  metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
404                  let timestamp = alphaos_node_bft::helpers::now();
405                  self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
406              }
407              // Add the solution to the memory pool.
408              trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
409              if self.solutions_queue.lock().put(solution_id, solution).is_some() {
410                  bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
411              }
412          }
413  
414          // Try to process the unconfirmed solutions in the memory pool.
415          self.process_unconfirmed_solutions().await
416      }
417  
418      /// Processes unconfirmed solutions in the mempool, and passes them to the BFT layer
419      /// (if sufficient space is available).
420      async fn process_unconfirmed_solutions(&self) -> Result<()> {
421          // If the memory pool of this node is full, return early.
422          let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
423          let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
424          if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
425              || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
426          {
427              return Ok(());
428          }
429          // Retrieve the solutions.
430          let solutions = {
431              // Determine the available capacity.
432              let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
433              // Acquire the lock on the queue.
434              let mut queue = self.solutions_queue.lock();
435              // Determine the number of solutions to send.
436              let num_solutions = queue.len().min(capacity);
437              // Drain the solutions from the queue.
438              (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
439          };
440          // Iterate over the solutions.
441          for solution in solutions.into_iter() {
442              let solution_id = solution.id();
443              trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
444              // Send the unconfirmed solution to the primary.
445              match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
446                  Ok(true) => {}
447                  Ok(false) => debug!(
448                      "Unable to add unconfirmed solution '{}' to the memory pool. Already exists.",
449                      fmt_id(solution_id)
450                  ),
451                  Err(err) => {
452                      let err = err.context(format!(
453                          "Unable to add unconfirmed solution '{}' to the memory pool",
454                          fmt_id(solution_id)
455                      ));
456  
457                      // If the node is synced and the occurs after the first 10 blocks of an epoch, log it as a warning, otherwise trace it.
458                      if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
459                          warn!("{}", flatten_error(err));
460                      } else {
461                          trace!("{}", flatten_error(err));
462                      }
463                  }
464              }
465          }
466          Ok(())
467      }
468  
469      /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed
470      /// to the BFT layer for inclusion in a batch.
471      pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
472          // Calculate the transmission checksum.
473          let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
474          // Queue the unconfirmed transaction.
475          {
476              let transaction_id = transaction.id();
477  
478              // Check that the transaction is not a fee transaction.
479              if transaction.is_fee() {
480                  bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
481              }
482              // Check if the transaction was recently seen.
483              if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
484                  // If the transaction was recently seen, return early.
485                  return Ok(());
486              }
487              // Check if the transaction already exists in the ledger.
488              if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
489                  bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
490              }
491              // Check that the transaction is not in the mempool.
492              if self.contains_transaction(&transaction_id) {
493                  bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
494              }
495              #[cfg(feature = "metrics")]
496              {
497                  metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
498                  let timestamp = alphaos_node_bft::helpers::now();
499                  self.transmissions_tracker
500                      .lock()
501                      .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
502              }
503              // Add the transaction to the memory pool.
504              trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
505              let priority_fee = transaction.priority_fee_amount()?;
506              self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?;
507          }
508  
509          // Try to process the unconfirmed transactions in the memory pool.
510          self.process_unconfirmed_transactions().await
511      }
512  
513      /// Processes unconfirmed transactions in the mempool, and passes them to the BFT layer
514      /// (if sufficient space is available).
515      async fn process_unconfirmed_transactions(&self) -> Result<()> {
516          // If the memory pool of this node is full, return early.
517          let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
518          if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
519              return Ok(());
520          }
521          // Retrieve the transactions.
522          let transactions = {
523              // Determine the available capacity.
524              let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
525              // Acquire the lock on the transactions queue.
526              let mut tx_queue = self.transactions_queue.write();
527              // Determine the number of deployments to send.
528              let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
529              // Determine the number of executions to send.
530              let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
531              // Create an iterator which will select interleaved deployments and executions within the capacity.
532              // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
533              let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
534              // Drain the transactions from the queue, interleaving deployments and executions.
535              selector_iter
536                  .filter_map(
537                      |select_deployment| {
538                          if select_deployment {
539                              tx_queue.deployments.pop()
540                          } else {
541                              tx_queue.executions.pop()
542                          }
543                      },
544                  )
545                  .map(|(_, tx)| tx)
546                  .collect_vec()
547          };
548          // Iterate over the transactions.
549          for transaction in transactions.into_iter() {
550              let transaction_id = transaction.id();
551              // Determine the type of the transaction. The fee type is technically not possible here.
552              let tx_type_str = match transaction {
553                  Transaction::Deploy(..) => "deployment",
554                  Transaction::Execute(..) => "execution",
555                  Transaction::Fee(..) => "fee",
556              };
557              trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
558              // Send the unconfirmed transaction to the primary.
559              match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await {
560                  Ok(true) => {}
561                  Ok(false) => debug!(
562                      "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.",
563                      fmt_id(transaction_id)
564                  ),
565                  Err(err) => {
566                      // If the BFT is synced, then log the warning.
567                      if self.bft.is_synced() {
568                          let err = err.context(format!(
569                              "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool",
570                              fmt_id(transaction_id)
571                          ));
572                          warn!("{}", flatten_error(err));
573                      }
574                  }
575              }
576          }
577          Ok(())
578      }
579  }
580  
581  impl<N: Network> Consensus<N> {
582      /// Starts the consensus handlers.
583      ///
584      /// This is only invoked once, in the constructor.
585      fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
586          let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
587  
588          // Process the committed subdag and transmissions from the BFT.
589          let self_ = self.clone();
590          self.spawn(async move {
591              while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
592                  self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
593              }
594          });
595  
596          // Process the unconfirmed transactions in the memory pool.
597          //
598          // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted
599          // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions.
600          let self_ = self.clone();
601          self.spawn(async move {
602              loop {
603                  // Sleep briefly.
604                  tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
605                  // Process the unconfirmed transactions in the memory pool.
606                  if let Err(err) = self_.process_unconfirmed_transactions().await {
607                      warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions")));
608                  }
609                  // Process the unconfirmed solutions in the memory pool.
610                  if let Err(err) = self_.process_unconfirmed_solutions().await {
611                      warn!("{}", flatten_error(err.context("Cannot process unconfirmed solutions")));
612                  }
613              }
614          });
615      }
616  
617      /// Attempts to build a new block from the given subDAG, and (tries to) advance the legder to it.
618      async fn process_bft_subdag(
619          &self,
620          subdag: Subdag<N>,
621          transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
622          callback: oneshot::Sender<Result<()>>,
623      ) {
624          // Try to advance to the next block.
625          let self_ = self.clone();
626          let transmissions_ = transmissions.clone();
627          let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") };
628  
629          // If the block failed to advance, reinsert the transmissions into the memory pool.
630          if let Err(e) = &result {
631              error!("{}", flatten_error(e));
632              // On failure, reinsert the transmissions into the memory pool.
633              self.reinsert_transmissions(transmissions).await;
634          }
635          // Send the callback **after** advancing to the next block.
636          // Note: We must await the block to be advanced before sending the callback.
637          callback.send(result).ok();
638      }
639  
640      /// Attempts to advance the ledger to the next block, and updates the metrics (if enabled) accordingly.
641      fn try_advance_to_next_block(
642          &self,
643          subdag: Subdag<N>,
644          transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
645      ) -> Result<()> {
646          #[cfg(feature = "metrics")]
647          let start = subdag.leader_certificate().batch_header().timestamp();
648          #[cfg(feature = "metrics")]
649          let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
650          #[cfg(feature = "metrics")]
651          let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
652  
653          // Create the candidate next block.
654          let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
655          // Check that the block is well-formed.
656          self.ledger.check_next_block(&next_block)?;
657          // Advance to the next block.
658          self.ledger.advance_to_next_block(&next_block)?;
659          #[cfg(feature = "telemetry")]
660          // Fetch the latest committee
661          let latest_committee = self.ledger.current_committee()?;
662  
663          // Section 12b: Check for network upgrade activation (testnet only)
664          #[cfg(not(feature = "mainnet"))]
665          {
666              let next_height = next_block.height();
667              if let Ok(Some(proposal)) = ratifications::check_upgrade_activation_sync::<N>(&self.ledger, next_height) {
668                  info!("🚨 NETWORK UPGRADE DETECTED AT HEIGHT {}", next_height);
669  
670                  // Verify approval
671                  let total_governors = 5; // TODO: Get from committee
672                  if let Err(e) = ratifications::verify_upgrade_approval(&proposal, total_governors) {
673                      error!("⚠️ Network upgrade failed approval: {}", e);
674                  } else {
675                      // Use a placeholder address for testing
676                      let placeholder_addr: Address<N> = Address::zero();
677                      match ratifications::get_my_vote::<N>(&self.ledger, &proposal.proposal_id, &placeholder_addr) {
678                          Ok(vote) => {
679                              info!("📊 Vote choice: {:?}", vote);
680                              if let Err(e) =
681                                  ratifications::execute_network_upgrade_sync::<N>(&proposal, vote, &self.ledger)
682                              {
683                                  error!("⚠️ Network upgrade execution failed: {}", e);
684                              }
685                          }
686                          Err(e) => error!("Could not retrieve vote: {}", e),
687                      }
688                  }
689              }
690          }
691  
692          // If the next block starts a new epoch, clear the existing solutions.
693          if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
694              // Clear the solutions queue.
695              self.solutions_queue.lock().clear();
696              // Clear the worker solutions.
697              self.bft.primary().clear_worker_solutions();
698          }
699  
700          // Notify peers that we have a new block.
701          let locators = self.block_sync.get_block_locators()?;
702          self.ping.update_block_locators(locators);
703  
704          // Make block sync aware of the new block.
705          self.block_sync.set_sync_height(next_block.height());
706  
707          // Notify IPC handler of block finalization (for cross-chain communication with DELTA)
708          if let Some(ref handler) = self.ipc_handler {
709              let state_root = ipc_hooks::extract_state_root(&next_block);
710              handler.on_block_finalized(next_block.height() as u64, state_root, *next_block.hash());
711  
712              // Scan for lock_for_sax transactions and notify handler
713              ipc_hooks::scan_for_lock_transactions(&next_block, handler.as_ref());
714          }
715  
716          // CLP: Generate challenge and evaluate penalties at epoch boundary
717          if let Some(ref clp) = self.clp_manager {
718              // Generate a challenge if enough time has passed since the last one
719              let block_hash_bytes: [u8; 32] =
720                  next_block.hash().to_bytes_le().map(|v| v.try_into().unwrap_or([0u8; 32])).unwrap_or([0u8; 32]);
721              let block_height: u64 = next_block.height().into();
722              if let Some(_challenge) = clp.on_block_finalized(block_height, &block_hash_bytes) {
723                  // Challenge generated - in production, this would be broadcast to the network
724                  // For now, the challenge is registered in the aggregator
725                  debug!("CLP challenge generated at block {}", block_height);
726              }
727  
728              // Finalize any expired challenges
729              clp.finalize_expired_challenges(block_height);
730  
731              // At epoch boundary, evaluate CLP penalties
732              if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
733                  if let Some(result) = clp.evaluate_epoch() {
734                      info!(
735                          "CLP epoch {} complete: {} validators passed, {} slashed, {} ejected",
736                          result.epoch,
737                          result.passed.len(),
738                          result.slashed.len(),
739                          result.ejected.len()
740                      );
741                      // TODO: Apply slashes and ejections to staking state
742                  }
743              }
744          }
745  
746          // TODO(kaimast): This should also remove any transmissions/solutions contained in the block from the mempool.
747          // Removal currently happens when Consensus eventually passes them to the worker, which then just discards them.
748  
749          #[cfg(feature = "metrics")]
750          {
751              let now_utc = alphaos_node_bft::helpers::now_utc();
752              let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64);
753              let next_block_timestamp = next_block.header().metadata().timestamp();
754              let next_block_utc = alphaos_node_bft::helpers::to_utc_datetime(next_block_timestamp);
755              let block_latency = next_block_timestamp - current_block_timestamp;
756              let block_lag = (now_utc - next_block_utc).whole_milliseconds();
757  
758              let proof_target = next_block.header().proof_target();
759              let coinbase_target = next_block.header().coinbase_target();
760              let cumulative_proof_target = next_block.header().cumulative_proof_target();
761  
762              // Calculate latency for all transmissions included in this block.
763              metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block);
764  
765              metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
766              metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
767              metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
768              metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64);
769              metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
770              metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
771              metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
772  
773              #[cfg(feature = "telemetry")]
774              {
775                  // Retrieve the latest participation scores.
776                  let participation_scores =
777                      self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);
778  
779                  // Log the participation scores.
780                  for (address, participation_score) in participation_scores {
781                      metrics::histogram_label(
782                          metrics::consensus::VALIDATOR_PARTICIPATION,
783                          "validator_address",
784                          address.to_string(),
785                          participation_score,
786                      )
787                  }
788              }
789          }
790          Ok(())
791      }
792  
793      /// Reinserts the given transmissions into the memory pool.
794      async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
795          // Iterate over the transmissions.
796          for (transmission_id, transmission) in transmissions.into_iter() {
797              // Reinsert the transmission into the memory pool.
798              match self.reinsert_transmission(transmission_id, transmission).await {
799                  Ok(true) => {}
800                  Ok(false) => debug!(
801                      "Unable to reinsert transmission {}.{} into the memory pool. Already exists.",
802                      fmt_id(transmission_id),
803                      fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
804                  ),
805                  Err(err) => {
806                      let err = err.context(format!(
807                          "Unable to reinsert transmission {}.{} into the memory pool",
808                          fmt_id(transmission_id),
809                          fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
810                      ));
811                      warn!("{}", flatten_error(err));
812                  }
813              }
814          }
815      }
816  
817      /// Reinserts the given transmission into the memory pool.
818      ///
819      /// # Returns
820      /// - `Ok(true)` if the transmission was added to the memory pool.
821      /// - `Ok(false)` if the transmission was valid but already exists in the memory pool.
822      /// - `Err(anyhow::Error)` if the transmission was invalid.
823      async fn reinsert_transmission(
824          &self,
825          transmission_id: TransmissionID<N>,
826          transmission: Transmission<N>,
827      ) -> Result<bool> {
828          // Initialize a callback sender and receiver.
829          let (callback, callback_receiver) = oneshot::channel();
830          // Send the transmission to the primary.
831          match (transmission_id, transmission) {
832              (TransmissionID::Ratification, Transmission::Ratification) => return Ok(true),
833              (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
834                  // Send the solution to the primary.
835                  self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
836              }
837              (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
838                  // Send the transaction to the primary.
839                  self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
840              }
841              _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
842          }
843          // Await the callback.
844          callback_receiver.await?
845      }
846  
847      /// Spawns a task with the given future; it should only be used for long-running tasks.
848      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
849          self.handles.lock().push(tokio::spawn(future));
850      }
851  
852      /// Shuts down the consensus and BFT layers.
853      pub async fn shut_down(&self) {
854          info!("Shutting down consensus...");
855          // Shut down the BFT.
856          self.bft.shut_down().await;
857          // Abort the tasks.
858          self.handles.lock().iter().for_each(|handle| handle.abort());
859      }
860  }