/ node / consensus / src / transactions_queue.rs
transactions_queue.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use std::{
 20      cmp::Reverse,
 21      collections::{hash_map::Entry, BTreeMap, HashMap},
 22      num::NonZeroUsize,
 23  };
 24  
 25  use alphavm::{ledger::Transaction, prelude::*};
 26  use anyhow::{bail, Result};
 27  use lru::LruCache;
 28  
 29  use crate::{CAPACITY_FOR_DEPLOYMENTS, CAPACITY_FOR_EXECUTIONS};
 30  
 31  pub struct TransactionsQueue<N: Network> {
 32      pub deployments: TransactionsQueueInner<N>,
 33      pub executions: TransactionsQueueInner<N>,
 34  }
 35  
 36  impl<N: Network> Default for TransactionsQueue<N> {
 37      fn default() -> Self {
 38          Self {
 39              deployments: TransactionsQueueInner::new(CAPACITY_FOR_DEPLOYMENTS),
 40              executions: TransactionsQueueInner::new(CAPACITY_FOR_EXECUTIONS),
 41          }
 42      }
 43  }
 44  
 45  impl<N: Network> TransactionsQueue<N> {
 46      pub fn contains(&self, transaction_id: &N::TransactionID) -> bool {
 47          self.executions.contains(transaction_id) || self.deployments.contains(transaction_id)
 48      }
 49  
 50      pub fn insert(
 51          &mut self,
 52          transaction_id: N::TransactionID,
 53          transaction: Transaction<N>,
 54          priority_fee: U64<N>,
 55      ) -> Result<()> {
 56          if transaction.is_execute() {
 57              self.executions.insert(transaction_id, transaction, priority_fee)
 58          } else {
 59              self.deployments.insert(transaction_id, transaction, priority_fee)
 60          }
 61      }
 62  
 63      pub fn transactions(&self) -> impl Iterator<Item = (N::TransactionID, Transaction<N>)> + use<N> {
 64          self.deployments
 65              .priority_queue
 66              .transactions
 67              .clone()
 68              .into_iter()
 69              .chain(self.deployments.queue.clone())
 70              .chain(self.executions.priority_queue.transactions.clone())
 71              .chain(self.executions.queue.clone())
 72      }
 73  }
 74  
 75  pub struct TransactionsQueueInner<N: Network> {
 76      capacity: usize,
 77      queue: LruCache<N::TransactionID, Transaction<N>>,
 78      priority_queue: PriorityQueue<N>,
 79  }
 80  
 81  impl<N: Network> TransactionsQueueInner<N> {
 82      fn new(capacity: usize) -> Self {
 83          Self {
 84              capacity,
 85              queue: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
 86              priority_queue: Default::default(),
 87          }
 88      }
 89  
 90      pub fn len(&self) -> usize {
 91          self.queue.len().saturating_add(self.priority_queue.len())
 92      }
 93  
 94      fn contains(&self, transaction_id: &N::TransactionID) -> bool {
 95          self.queue.contains(transaction_id) || self.priority_queue.transactions.contains_key(transaction_id)
 96      }
 97  
 98      fn insert(
 99          &mut self,
100          transaction_id: N::TransactionID,
101          transaction: Transaction<N>,
102          priority_fee: U64<N>,
103      ) -> Result<()> {
104          // If the queue is not full, insert in the appropriate queue.
105          if self.len() < self.capacity {
106              if priority_fee.is_zero() {
107                  self.queue.get_or_insert(transaction_id, || transaction);
108              } else {
109                  self.priority_queue.insert(transaction_id, transaction, priority_fee);
110              }
111  
112              return Ok(());
113          }
114  
115          match (self.priority_queue.len() < self.capacity, *priority_fee) {
116              // Invariant: if the queue is at capacity but the priority queue
117              // isn't equal to the capacity, the low-priority queue must be non-empty.
118              (true, 0) => {
119                  let _ = self.queue.get_or_insert(transaction_id, || transaction);
120              }
121              (true, _fee) => {
122                  // Remove an entry from the low-priority queue to make room for the high-priority transaction.
123                  self.queue.pop_lru();
124                  self.priority_queue.insert(transaction_id, transaction, priority_fee)
125              }
126  
127              // Invariant: if the queue is at capacity but the priority queue is
128              // equal to the capacity, the low-priority queue must be empty.
129              (false, 0) => bail!("The memory pool is full"),
130              (false, _fee) => self.priority_queue.compare_insert(transaction_id, transaction, priority_fee),
131          }
132  
133          Ok(())
134      }
135  
136      pub fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> {
137          self.priority_queue.pop().or_else(|| self.queue.pop_lru())
138      }
139  }
140  
141  struct PriorityQueue<N: Network> {
142      /// A counter to ensure fifo ordering for transmissions with the same fee.
143      counter: u64,
144      /// A map of transmissions ordered by fee and by fifo sequence.
145      transaction_ids: BTreeMap<(Reverse<U64<N>>, u64), N::TransactionID>,
146      /// A map of transmission IDs to transmissions.
147      transactions: HashMap<N::TransactionID, Transaction<N>>,
148  }
149  
150  impl<N: Network> Default for PriorityQueue<N> {
151      /// Initializes a new instance of the priority queue.
152      fn default() -> Self {
153          Self { counter: Default::default(), transaction_ids: Default::default(), transactions: Default::default() }
154      }
155  }
156  
157  impl<N: Network> PriorityQueue<N> {
158      fn len(&self) -> usize {
159          self.transactions.len()
160      }
161  
162      fn insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) {
163          if let Entry::Vacant(entry) = self.transactions.entry(transaction_id) {
164              // Insert the transaction into the map.
165              entry.insert(transaction);
166              // Sort by fee (highest first) and counter (fifo).
167              self.transaction_ids.insert((Reverse(fee), self.counter), transaction_id);
168              // Increment the counter.
169              self.counter += 1;
170          }
171      }
172  
173      fn compare_insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) {
174          // Make sure the collection isn't empty.
175          if self.transaction_ids.is_empty() {
176              return;
177          }
178  
179          // If the lowest fee in the collection is higher than the new fee, no-op.
180          //
181          // SAFETY: the empty check guarantees an item will be returned
182          let ((Reverse(lowest_fee), _), _) = self.transaction_ids.last_key_value().expect("item must be present");
183          if lowest_fee > &fee {
184              return;
185          }
186  
187          // Otherwise, remove the current value and insert the new.
188          //
189          // SAFETY: the empty check guarantees an item will be returned
190          let (_, id) = self.transaction_ids.pop_last().expect("item must be present");
191          self.transactions.remove(&id);
192          self.insert(transaction_id, transaction, fee);
193      }
194  
195      fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> {
196          let (_, transaction_id) = self.transaction_ids.pop_first()?;
197          self.transactions.remove(&transaction_id).map(|transaction| (transaction_id, transaction))
198      }
199  }
200  
201  #[cfg(test)]
202  mod tests {
203      use super::*;
204  
205      use alphavm::{
206          ledger::test_helpers::{sample_deployment_transaction, sample_execution_transaction_with_fee},
207          prelude::{MainnetV0, TestRng},
208      };
209  
210      type CurrentNetwork = MainnetV0;
211  
212      #[test]
213      fn insert_and_pop_low_priority_transactions() {
214          let mut rng = TestRng::default();
215  
216          /* Executions */
217  
218          // Test low-priority execution transaction.
219          let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
220          let execution_id = execution_transaction.id();
221          let zero_fee = U64::new(0);
222  
223          let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default();
224          assert!(!transactions_queue.contains(&execution_id));
225          transactions_queue.insert(execution_id, execution_transaction.clone(), zero_fee).unwrap();
226  
227          // Check execution was put into the right queue.
228          assert!(transactions_queue.contains(&execution_id));
229          assert!(transactions_queue.executions.contains(&execution_id));
230          assert!(!transactions_queue.deployments.contains(&execution_id));
231  
232          // Pop the execution transaction.
233          let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap();
234          assert_eq!(popped_execution_id, execution_id);
235          assert_eq!(popped_execution_transaction, execution_transaction);
236          assert!(!transactions_queue.contains(&execution_id));
237  
238          /* Deployments */
239  
240          // Test low-priority deployment transaction.
241          let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng);
242          let deployment_id = deployment_transaction.id();
243  
244          assert!(!transactions_queue.contains(&deployment_id));
245          transactions_queue.insert(deployment_id, deployment_transaction.clone(), zero_fee).unwrap();
246  
247          // Check deployment was put into the right queue.
248          assert!(transactions_queue.contains(&deployment_id));
249          assert!(transactions_queue.deployments.contains(&deployment_id));
250          assert!(!transactions_queue.executions.contains(&deployment_id));
251  
252          // Pop the deployment transaction.
253          let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap();
254          assert_eq!(popped_deployment_id, deployment_id);
255          assert_eq!(popped_deployment_transaction, deployment_transaction);
256          assert!(!transactions_queue.contains(&deployment_id));
257      }
258  
259      #[test]
260      fn insert_and_pop_high_priority_transactions() {
261          let mut rng = TestRng::default();
262  
263          /* Executions */
264  
265          // Test high-priority execution transaction.
266          let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
267          let execution_id = execution_transaction.id();
268          let high_fee = U64::new(100);
269  
270          let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default();
271          assert!(!transactions_queue.contains(&execution_id));
272          transactions_queue.insert(execution_id, execution_transaction.clone(), high_fee).unwrap();
273  
274          // Check execution was put into the priority queue.
275          assert!(transactions_queue.contains(&execution_id));
276          assert!(transactions_queue.executions.contains(&execution_id));
277          assert!(transactions_queue.executions.priority_queue.transactions.contains_key(&execution_id));
278  
279          // Pop the execution transaction.
280          let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap();
281          assert_eq!(popped_execution_id, execution_id);
282          assert_eq!(popped_execution_transaction, execution_transaction);
283          assert!(!transactions_queue.contains(&execution_id));
284  
285          /* Deployments */
286  
287          // Test high-priority deployment transaction.
288          let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng);
289          let deployment_id = deployment_transaction.id();
290  
291          assert!(!transactions_queue.contains(&deployment_id));
292          transactions_queue.insert(deployment_id, deployment_transaction.clone(), high_fee).unwrap();
293  
294          // Check deployment was put into the priority queue.
295          assert!(transactions_queue.contains(&deployment_id));
296          assert!(transactions_queue.deployments.contains(&deployment_id));
297          assert!(transactions_queue.deployments.priority_queue.transactions.contains_key(&deployment_id));
298  
299          // Pop the deployment transaction.
300          let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap();
301          assert_eq!(popped_deployment_id, deployment_id);
302          assert_eq!(popped_deployment_transaction, deployment_transaction);
303          assert!(!transactions_queue.contains(&deployment_id));
304      }
305  
306      #[test]
307      fn insert_and_pop_ordering_with_eviction() {
308          let mut rng = TestRng::default();
309  
310          let executions: Vec<_> = (0..10)
311              .map(|_| {
312                  let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
313                  (execution_transaction.id(), execution_transaction)
314              })
315              .collect();
316  
317          let mut executions_queue = TransactionsQueueInner::new(4);
318          executions_queue.insert(executions[0].0, executions[0].1.clone(), U64::new(300)).unwrap();
319          executions_queue.insert(executions[1].0, executions[1].1.clone(), U64::new(0)).unwrap();
320          executions_queue.insert(executions[2].0, executions[2].1.clone(), U64::new(100)).unwrap();
321          executions_queue.insert(executions[3].0, executions[3].1.clone(), U64::new(200)).unwrap();
322          assert_eq!(executions_queue.len(), 4);
323  
324          // Insert a high-priority transaction and evict the remaining low-priority transactions.
325          executions_queue.insert(executions[4].0, executions[4].1.clone(), U64::new(50)).unwrap();
326          assert_eq!(executions_queue.queue.len(), 0);
327          assert_eq!(executions_queue.priority_queue.len(), 4);
328          assert!(executions_queue.priority_queue.transactions.contains_key(&executions[4].0));
329          assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[1].0));
330  
331          // Insert a high-priority transaction and evict the lowest high-priority transaction.
332          executions_queue.insert(executions[5].0, executions[5].1.clone(), U64::new(150)).unwrap();
333          assert_eq!(executions_queue.queue.len(), 0);
334          assert_eq!(executions_queue.priority_queue.len(), 4);
335          assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[4].0));
336          assert!(executions_queue.priority_queue.transactions.contains_key(&executions[5].0));
337  
338          // Try to insert a low-priority transaction and expect an error.
339          assert!(executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).is_err());
340  
341          // Pop the transactions in the correct order.
342          assert_eq!(executions_queue.pop().unwrap(), executions[0]);
343          assert_eq!(executions_queue.pop().unwrap(), executions[3]);
344          assert_eq!(executions_queue.pop().unwrap(), executions[5]);
345          assert_eq!(executions_queue.pop().unwrap(), executions[2]);
346  
347          // Check the queue is empty.
348          assert_eq!(executions_queue.len(), 0);
349          assert_eq!(executions_queue.queue.len(), 0);
350          assert_eq!(executions_queue.priority_queue.len(), 0);
351          assert!(executions_queue.pop().is_none());
352  
353          // Insert a low-priority transaction and expect it to be inserted into the queue.
354          executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).unwrap();
355          assert_eq!(executions_queue.len(), 1);
356          assert_eq!(executions_queue.queue.len(), 1);
357          assert_eq!(executions_queue.priority_queue.len(), 0);
358      }
359  }