/ node / consensus / src / transactions_queue.rs
transactions_queue.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use std::{
 17      cmp::Reverse,
 18      collections::{BTreeMap, HashMap, hash_map::Entry},
 19      num::NonZeroUsize,
 20  };
 21  
 22  use alphavm::{ledger::Transaction, prelude::*};
 23  use anyhow::{Result, bail};
 24  use lru::LruCache;
 25  
 26  use crate::{CAPACITY_FOR_DEPLOYMENTS, CAPACITY_FOR_EXECUTIONS};
 27  
 28  pub struct TransactionsQueue<N: Network> {
 29      pub deployments: TransactionsQueueInner<N>,
 30      pub executions: TransactionsQueueInner<N>,
 31  }
 32  
 33  impl<N: Network> Default for TransactionsQueue<N> {
 34      fn default() -> Self {
 35          Self {
 36              deployments: TransactionsQueueInner::new(CAPACITY_FOR_DEPLOYMENTS),
 37              executions: TransactionsQueueInner::new(CAPACITY_FOR_EXECUTIONS),
 38          }
 39      }
 40  }
 41  
 42  impl<N: Network> TransactionsQueue<N> {
 43      pub fn contains(&self, transaction_id: &N::TransactionID) -> bool {
 44          self.executions.contains(transaction_id) || self.deployments.contains(transaction_id)
 45      }
 46  
 47      pub fn insert(
 48          &mut self,
 49          transaction_id: N::TransactionID,
 50          transaction: Transaction<N>,
 51          priority_fee: U64<N>,
 52      ) -> Result<()> {
 53          if transaction.is_execute() {
 54              self.executions.insert(transaction_id, transaction, priority_fee)
 55          } else {
 56              self.deployments.insert(transaction_id, transaction, priority_fee)
 57          }
 58      }
 59  
 60      pub fn transactions(&self) -> impl Iterator<Item = (N::TransactionID, Transaction<N>)> + use<N> {
 61          self.deployments
 62              .priority_queue
 63              .transactions
 64              .clone()
 65              .into_iter()
 66              .chain(self.deployments.queue.clone())
 67              .chain(self.executions.priority_queue.transactions.clone())
 68              .chain(self.executions.queue.clone())
 69      }
 70  }
 71  
 72  pub struct TransactionsQueueInner<N: Network> {
 73      capacity: usize,
 74      queue: LruCache<N::TransactionID, Transaction<N>>,
 75      priority_queue: PriorityQueue<N>,
 76  }
 77  
 78  impl<N: Network> TransactionsQueueInner<N> {
 79      fn new(capacity: usize) -> Self {
 80          Self {
 81              capacity,
 82              queue: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
 83              priority_queue: Default::default(),
 84          }
 85      }
 86  
 87      pub fn len(&self) -> usize {
 88          self.queue.len().saturating_add(self.priority_queue.len())
 89      }
 90  
 91      fn contains(&self, transaction_id: &N::TransactionID) -> bool {
 92          self.queue.contains(transaction_id) || self.priority_queue.transactions.contains_key(transaction_id)
 93      }
 94  
 95      fn insert(
 96          &mut self,
 97          transaction_id: N::TransactionID,
 98          transaction: Transaction<N>,
 99          priority_fee: U64<N>,
100      ) -> Result<()> {
101          // If the queue is not full, insert in the appropriate queue.
102          if self.len() < self.capacity {
103              if priority_fee.is_zero() {
104                  self.queue.get_or_insert(transaction_id, || transaction);
105              } else {
106                  self.priority_queue.insert(transaction_id, transaction, priority_fee);
107              }
108  
109              return Ok(());
110          }
111  
112          match (self.priority_queue.len() < self.capacity, *priority_fee) {
113              // Invariant: if the queue is at capacity but the priority queue
114              // isn't equal to the capacity, the low-priority queue must be non-empty.
115              (true, 0) => {
116                  let _ = self.queue.get_or_insert(transaction_id, || transaction);
117              }
118              (true, _fee) => {
119                  // Remove an entry from the low-priority queue to make room for the high-priority transaction.
120                  self.queue.pop_lru();
121                  self.priority_queue.insert(transaction_id, transaction, priority_fee)
122              }
123  
124              // Invariant: if the queue is at capacity but the priority queue is
125              // equal to the capacity, the low-priority queue must be empty.
126              (false, 0) => bail!("The memory pool is full"),
127              (false, _fee) => self.priority_queue.compare_insert(transaction_id, transaction, priority_fee),
128          }
129  
130          Ok(())
131      }
132  
133      pub fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> {
134          self.priority_queue.pop().or_else(|| self.queue.pop_lru())
135      }
136  }
137  
138  struct PriorityQueue<N: Network> {
139      /// A counter to ensure fifo ordering for transmissions with the same fee.
140      counter: u64,
141      /// A map of transmissions ordered by fee and by fifo sequence.
142      transaction_ids: BTreeMap<(Reverse<U64<N>>, u64), N::TransactionID>,
143      /// A map of transmission IDs to transmissions.
144      transactions: HashMap<N::TransactionID, Transaction<N>>,
145  }
146  
147  impl<N: Network> Default for PriorityQueue<N> {
148      /// Initializes a new instance of the priority queue.
149      fn default() -> Self {
150          Self { counter: Default::default(), transaction_ids: Default::default(), transactions: Default::default() }
151      }
152  }
153  
154  impl<N: Network> PriorityQueue<N> {
155      fn len(&self) -> usize {
156          self.transactions.len()
157      }
158  
159      fn insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) {
160          if let Entry::Vacant(entry) = self.transactions.entry(transaction_id) {
161              // Insert the transaction into the map.
162              entry.insert(transaction);
163              // Sort by fee (highest first) and counter (fifo).
164              self.transaction_ids.insert((Reverse(fee), self.counter), transaction_id);
165              // Increment the counter.
166              self.counter += 1;
167          }
168      }
169  
170      fn compare_insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) {
171          // Make sure the collection isn't empty.
172          if self.transaction_ids.is_empty() {
173              return;
174          }
175  
176          // If the lowest fee in the collection is higher than the new fee, no-op.
177          //
178          // SAFETY: the empty check guarantees an item will be returned
179          let ((Reverse(lowest_fee), _), _) = self.transaction_ids.last_key_value().expect("item must be present");
180          if lowest_fee > &fee {
181              return;
182          }
183  
184          // Otherwise, remove the current value and insert the new.
185          //
186          // SAFETY: the empty check guarantees an item will be returned
187          let (_, id) = self.transaction_ids.pop_last().expect("item must be present");
188          self.transactions.remove(&id);
189          self.insert(transaction_id, transaction, fee);
190      }
191  
192      fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> {
193          let (_, transaction_id) = self.transaction_ids.pop_first()?;
194          self.transactions.remove(&transaction_id).map(|transaction| (transaction_id, transaction))
195      }
196  }
197  
198  #[cfg(test)]
199  mod tests {
200      use super::*;
201  
202      use alphavm::{
203          ledger::test_helpers::{sample_deployment_transaction, sample_execution_transaction_with_fee},
204          prelude::{MainnetV0, TestRng},
205      };
206  
207      type CurrentNetwork = MainnetV0;
208  
209      #[test]
210      fn insert_and_pop_low_priority_transactions() {
211          let mut rng = TestRng::default();
212  
213          /* Executions */
214  
215          // Test low-priority execution transaction.
216          let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
217          let execution_id = execution_transaction.id();
218          let zero_fee = U64::new(0);
219  
220          let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default();
221          assert!(!transactions_queue.contains(&execution_id));
222          transactions_queue.insert(execution_id, execution_transaction.clone(), zero_fee).unwrap();
223  
224          // Check execution was put into the right queue.
225          assert!(transactions_queue.contains(&execution_id));
226          assert!(transactions_queue.executions.contains(&execution_id));
227          assert!(!transactions_queue.deployments.contains(&execution_id));
228  
229          // Pop the execution transaction.
230          let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap();
231          assert_eq!(popped_execution_id, execution_id);
232          assert_eq!(popped_execution_transaction, execution_transaction);
233          assert!(!transactions_queue.contains(&execution_id));
234  
235          /* Deployments */
236  
237          // Test low-priority deployment transaction.
238          let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng);
239          let deployment_id = deployment_transaction.id();
240  
241          assert!(!transactions_queue.contains(&deployment_id));
242          transactions_queue.insert(deployment_id, deployment_transaction.clone(), zero_fee).unwrap();
243  
244          // Check deployment was put into the right queue.
245          assert!(transactions_queue.contains(&deployment_id));
246          assert!(transactions_queue.deployments.contains(&deployment_id));
247          assert!(!transactions_queue.executions.contains(&deployment_id));
248  
249          // Pop the deployment transaction.
250          let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap();
251          assert_eq!(popped_deployment_id, deployment_id);
252          assert_eq!(popped_deployment_transaction, deployment_transaction);
253          assert!(!transactions_queue.contains(&deployment_id));
254      }
255  
256      #[test]
257      fn insert_and_pop_high_priority_transactions() {
258          let mut rng = TestRng::default();
259  
260          /* Executions */
261  
262          // Test high-priority execution transaction.
263          let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
264          let execution_id = execution_transaction.id();
265          let high_fee = U64::new(100);
266  
267          let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default();
268          assert!(!transactions_queue.contains(&execution_id));
269          transactions_queue.insert(execution_id, execution_transaction.clone(), high_fee).unwrap();
270  
271          // Check execution was put into the priority queue.
272          assert!(transactions_queue.contains(&execution_id));
273          assert!(transactions_queue.executions.contains(&execution_id));
274          assert!(transactions_queue.executions.priority_queue.transactions.contains_key(&execution_id));
275  
276          // Pop the execution transaction.
277          let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap();
278          assert_eq!(popped_execution_id, execution_id);
279          assert_eq!(popped_execution_transaction, execution_transaction);
280          assert!(!transactions_queue.contains(&execution_id));
281  
282          /* Deployments */
283  
284          // Test high-priority deployment transaction.
285          let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng);
286          let deployment_id = deployment_transaction.id();
287  
288          assert!(!transactions_queue.contains(&deployment_id));
289          transactions_queue.insert(deployment_id, deployment_transaction.clone(), high_fee).unwrap();
290  
291          // Check deployment was put into the priority queue.
292          assert!(transactions_queue.contains(&deployment_id));
293          assert!(transactions_queue.deployments.contains(&deployment_id));
294          assert!(transactions_queue.deployments.priority_queue.transactions.contains_key(&deployment_id));
295  
296          // Pop the deployment transaction.
297          let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap();
298          assert_eq!(popped_deployment_id, deployment_id);
299          assert_eq!(popped_deployment_transaction, deployment_transaction);
300          assert!(!transactions_queue.contains(&deployment_id));
301      }
302  
303      #[test]
304      fn insert_and_pop_ordering_with_eviction() {
305          let mut rng = TestRng::default();
306  
307          let executions: Vec<_> = (0..10)
308              .map(|_| {
309                  let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
310                  (execution_transaction.id(), execution_transaction)
311              })
312              .collect();
313  
314          let mut executions_queue = TransactionsQueueInner::new(4);
315          executions_queue.insert(executions[0].0, executions[0].1.clone(), U64::new(300)).unwrap();
316          executions_queue.insert(executions[1].0, executions[1].1.clone(), U64::new(0)).unwrap();
317          executions_queue.insert(executions[2].0, executions[2].1.clone(), U64::new(100)).unwrap();
318          executions_queue.insert(executions[3].0, executions[3].1.clone(), U64::new(200)).unwrap();
319          assert_eq!(executions_queue.len(), 4);
320  
321          // Insert a high-priority transaction and evict the remaining low-priority transactions.
322          executions_queue.insert(executions[4].0, executions[4].1.clone(), U64::new(50)).unwrap();
323          assert_eq!(executions_queue.queue.len(), 0);
324          assert_eq!(executions_queue.priority_queue.len(), 4);
325          assert!(executions_queue.priority_queue.transactions.contains_key(&executions[4].0));
326          assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[1].0));
327  
328          // Insert a high-priority transaction and evict the lowest high-priority transaction.
329          executions_queue.insert(executions[5].0, executions[5].1.clone(), U64::new(150)).unwrap();
330          assert_eq!(executions_queue.queue.len(), 0);
331          assert_eq!(executions_queue.priority_queue.len(), 4);
332          assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[4].0));
333          assert!(executions_queue.priority_queue.transactions.contains_key(&executions[5].0));
334  
335          // Try to insert a low-priority transaction and expect an error.
336          assert!(executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).is_err());
337  
338          // Pop the transactions in the correct order.
339          assert_eq!(executions_queue.pop().unwrap(), executions[0]);
340          assert_eq!(executions_queue.pop().unwrap(), executions[3]);
341          assert_eq!(executions_queue.pop().unwrap(), executions[5]);
342          assert_eq!(executions_queue.pop().unwrap(), executions[2]);
343  
344          // Check the queue is empty.
345          assert_eq!(executions_queue.len(), 0);
346          assert_eq!(executions_queue.queue.len(), 0);
347          assert_eq!(executions_queue.priority_queue.len(), 0);
348          assert!(executions_queue.pop().is_none());
349  
350          // Insert a low-priority transaction and expect it to be inserted into the queue.
351          executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).unwrap();
352          assert_eq!(executions_queue.len(), 1);
353          assert_eq!(executions_queue.queue.len(), 1);
354          assert_eq!(executions_queue.priority_queue.len(), 0);
355      }
356  }