transactions_queue.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use std::{ 20 cmp::Reverse, 21 collections::{hash_map::Entry, BTreeMap, HashMap}, 22 num::NonZeroUsize, 23 }; 24 25 use alphavm::{ledger::Transaction, prelude::*}; 26 use anyhow::{bail, Result}; 27 use lru::LruCache; 28 29 use crate::{CAPACITY_FOR_DEPLOYMENTS, CAPACITY_FOR_EXECUTIONS}; 30 31 pub struct TransactionsQueue<N: Network> { 32 pub deployments: TransactionsQueueInner<N>, 33 pub executions: TransactionsQueueInner<N>, 34 } 35 36 impl<N: Network> Default for TransactionsQueue<N> { 37 fn default() -> Self { 38 Self { 39 deployments: TransactionsQueueInner::new(CAPACITY_FOR_DEPLOYMENTS), 40 executions: TransactionsQueueInner::new(CAPACITY_FOR_EXECUTIONS), 41 } 42 } 43 } 44 45 impl<N: Network> TransactionsQueue<N> { 46 pub fn contains(&self, transaction_id: &N::TransactionID) -> bool { 47 self.executions.contains(transaction_id) || self.deployments.contains(transaction_id) 48 } 49 50 pub fn insert( 51 &mut self, 52 transaction_id: N::TransactionID, 53 transaction: Transaction<N>, 54 priority_fee: U64<N>, 55 ) -> Result<()> { 56 if transaction.is_execute() { 57 self.executions.insert(transaction_id, transaction, priority_fee) 58 } else { 59 self.deployments.insert(transaction_id, transaction, priority_fee) 60 } 61 } 62 63 pub fn transactions(&self) -> impl Iterator<Item = (N::TransactionID, Transaction<N>)> + use<N> { 64 self.deployments 65 .priority_queue 66 .transactions 67 .clone() 68 .into_iter() 69 .chain(self.deployments.queue.clone()) 70 .chain(self.executions.priority_queue.transactions.clone()) 71 .chain(self.executions.queue.clone()) 72 } 73 } 74 75 pub struct TransactionsQueueInner<N: Network> { 76 capacity: usize, 77 queue: LruCache<N::TransactionID, Transaction<N>>, 78 priority_queue: PriorityQueue<N>, 79 } 80 81 impl<N: Network> TransactionsQueueInner<N> { 82 fn new(capacity: usize) -> Self { 83 Self { 84 capacity, 85 queue: LruCache::new(NonZeroUsize::new(capacity).unwrap()), 86 priority_queue: Default::default(), 87 } 88 } 89 90 pub fn len(&self) -> usize { 91 self.queue.len().saturating_add(self.priority_queue.len()) 92 } 93 94 fn contains(&self, transaction_id: &N::TransactionID) -> bool { 95 self.queue.contains(transaction_id) || self.priority_queue.transactions.contains_key(transaction_id) 96 } 97 98 fn insert( 99 &mut self, 100 transaction_id: N::TransactionID, 101 transaction: Transaction<N>, 102 priority_fee: U64<N>, 103 ) -> Result<()> { 104 // If the queue is not full, insert in the appropriate queue. 105 if self.len() < self.capacity { 106 if priority_fee.is_zero() { 107 self.queue.get_or_insert(transaction_id, || transaction); 108 } else { 109 self.priority_queue.insert(transaction_id, transaction, priority_fee); 110 } 111 112 return Ok(()); 113 } 114 115 match (self.priority_queue.len() < self.capacity, *priority_fee) { 116 // Invariant: if the queue is at capacity but the priority queue 117 // isn't equal to the capacity, the low-priority queue must be non-empty. 118 (true, 0) => { 119 let _ = self.queue.get_or_insert(transaction_id, || transaction); 120 } 121 (true, _fee) => { 122 // Remove an entry from the low-priority queue to make room for the high-priority transaction. 123 self.queue.pop_lru(); 124 self.priority_queue.insert(transaction_id, transaction, priority_fee) 125 } 126 127 // Invariant: if the queue is at capacity but the priority queue is 128 // equal to the capacity, the low-priority queue must be empty. 129 (false, 0) => bail!("The memory pool is full"), 130 (false, _fee) => self.priority_queue.compare_insert(transaction_id, transaction, priority_fee), 131 } 132 133 Ok(()) 134 } 135 136 pub fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> { 137 self.priority_queue.pop().or_else(|| self.queue.pop_lru()) 138 } 139 } 140 141 struct PriorityQueue<N: Network> { 142 /// A counter to ensure fifo ordering for transmissions with the same fee. 143 counter: u64, 144 /// A map of transmissions ordered by fee and by fifo sequence. 145 transaction_ids: BTreeMap<(Reverse<U64<N>>, u64), N::TransactionID>, 146 /// A map of transmission IDs to transmissions. 147 transactions: HashMap<N::TransactionID, Transaction<N>>, 148 } 149 150 impl<N: Network> Default for PriorityQueue<N> { 151 /// Initializes a new instance of the priority queue. 152 fn default() -> Self { 153 Self { counter: Default::default(), transaction_ids: Default::default(), transactions: Default::default() } 154 } 155 } 156 157 impl<N: Network> PriorityQueue<N> { 158 fn len(&self) -> usize { 159 self.transactions.len() 160 } 161 162 fn insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) { 163 if let Entry::Vacant(entry) = self.transactions.entry(transaction_id) { 164 // Insert the transaction into the map. 165 entry.insert(transaction); 166 // Sort by fee (highest first) and counter (fifo). 167 self.transaction_ids.insert((Reverse(fee), self.counter), transaction_id); 168 // Increment the counter. 169 self.counter += 1; 170 } 171 } 172 173 fn compare_insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) { 174 // Make sure the collection isn't empty. 175 if self.transaction_ids.is_empty() { 176 return; 177 } 178 179 // If the lowest fee in the collection is higher than the new fee, no-op. 180 // 181 // SAFETY: the empty check guarantees an item will be returned 182 let ((Reverse(lowest_fee), _), _) = self.transaction_ids.last_key_value().expect("item must be present"); 183 if lowest_fee > &fee { 184 return; 185 } 186 187 // Otherwise, remove the current value and insert the new. 188 // 189 // SAFETY: the empty check guarantees an item will be returned 190 let (_, id) = self.transaction_ids.pop_last().expect("item must be present"); 191 self.transactions.remove(&id); 192 self.insert(transaction_id, transaction, fee); 193 } 194 195 fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> { 196 let (_, transaction_id) = self.transaction_ids.pop_first()?; 197 self.transactions.remove(&transaction_id).map(|transaction| (transaction_id, transaction)) 198 } 199 } 200 201 #[cfg(test)] 202 mod tests { 203 use super::*; 204 205 use alphavm::{ 206 ledger::test_helpers::{sample_deployment_transaction, sample_execution_transaction_with_fee}, 207 prelude::{MainnetV0, TestRng}, 208 }; 209 210 type CurrentNetwork = MainnetV0; 211 212 #[test] 213 fn insert_and_pop_low_priority_transactions() { 214 let mut rng = TestRng::default(); 215 216 /* Executions */ 217 218 // Test low-priority execution transaction. 219 let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0); 220 let execution_id = execution_transaction.id(); 221 let zero_fee = U64::new(0); 222 223 let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default(); 224 assert!(!transactions_queue.contains(&execution_id)); 225 transactions_queue.insert(execution_id, execution_transaction.clone(), zero_fee).unwrap(); 226 227 // Check execution was put into the right queue. 228 assert!(transactions_queue.contains(&execution_id)); 229 assert!(transactions_queue.executions.contains(&execution_id)); 230 assert!(!transactions_queue.deployments.contains(&execution_id)); 231 232 // Pop the execution transaction. 233 let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap(); 234 assert_eq!(popped_execution_id, execution_id); 235 assert_eq!(popped_execution_transaction, execution_transaction); 236 assert!(!transactions_queue.contains(&execution_id)); 237 238 /* Deployments */ 239 240 // Test low-priority deployment transaction. 241 let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng); 242 let deployment_id = deployment_transaction.id(); 243 244 assert!(!transactions_queue.contains(&deployment_id)); 245 transactions_queue.insert(deployment_id, deployment_transaction.clone(), zero_fee).unwrap(); 246 247 // Check deployment was put into the right queue. 248 assert!(transactions_queue.contains(&deployment_id)); 249 assert!(transactions_queue.deployments.contains(&deployment_id)); 250 assert!(!transactions_queue.executions.contains(&deployment_id)); 251 252 // Pop the deployment transaction. 253 let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap(); 254 assert_eq!(popped_deployment_id, deployment_id); 255 assert_eq!(popped_deployment_transaction, deployment_transaction); 256 assert!(!transactions_queue.contains(&deployment_id)); 257 } 258 259 #[test] 260 fn insert_and_pop_high_priority_transactions() { 261 let mut rng = TestRng::default(); 262 263 /* Executions */ 264 265 // Test high-priority execution transaction. 266 let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0); 267 let execution_id = execution_transaction.id(); 268 let high_fee = U64::new(100); 269 270 let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default(); 271 assert!(!transactions_queue.contains(&execution_id)); 272 transactions_queue.insert(execution_id, execution_transaction.clone(), high_fee).unwrap(); 273 274 // Check execution was put into the priority queue. 275 assert!(transactions_queue.contains(&execution_id)); 276 assert!(transactions_queue.executions.contains(&execution_id)); 277 assert!(transactions_queue.executions.priority_queue.transactions.contains_key(&execution_id)); 278 279 // Pop the execution transaction. 280 let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap(); 281 assert_eq!(popped_execution_id, execution_id); 282 assert_eq!(popped_execution_transaction, execution_transaction); 283 assert!(!transactions_queue.contains(&execution_id)); 284 285 /* Deployments */ 286 287 // Test high-priority deployment transaction. 288 let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng); 289 let deployment_id = deployment_transaction.id(); 290 291 assert!(!transactions_queue.contains(&deployment_id)); 292 transactions_queue.insert(deployment_id, deployment_transaction.clone(), high_fee).unwrap(); 293 294 // Check deployment was put into the priority queue. 295 assert!(transactions_queue.contains(&deployment_id)); 296 assert!(transactions_queue.deployments.contains(&deployment_id)); 297 assert!(transactions_queue.deployments.priority_queue.transactions.contains_key(&deployment_id)); 298 299 // Pop the deployment transaction. 300 let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap(); 301 assert_eq!(popped_deployment_id, deployment_id); 302 assert_eq!(popped_deployment_transaction, deployment_transaction); 303 assert!(!transactions_queue.contains(&deployment_id)); 304 } 305 306 #[test] 307 fn insert_and_pop_ordering_with_eviction() { 308 let mut rng = TestRng::default(); 309 310 let executions: Vec<_> = (0..10) 311 .map(|_| { 312 let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0); 313 (execution_transaction.id(), execution_transaction) 314 }) 315 .collect(); 316 317 let mut executions_queue = TransactionsQueueInner::new(4); 318 executions_queue.insert(executions[0].0, executions[0].1.clone(), U64::new(300)).unwrap(); 319 executions_queue.insert(executions[1].0, executions[1].1.clone(), U64::new(0)).unwrap(); 320 executions_queue.insert(executions[2].0, executions[2].1.clone(), U64::new(100)).unwrap(); 321 executions_queue.insert(executions[3].0, executions[3].1.clone(), U64::new(200)).unwrap(); 322 assert_eq!(executions_queue.len(), 4); 323 324 // Insert a high-priority transaction and evict the remaining low-priority transactions. 325 executions_queue.insert(executions[4].0, executions[4].1.clone(), U64::new(50)).unwrap(); 326 assert_eq!(executions_queue.queue.len(), 0); 327 assert_eq!(executions_queue.priority_queue.len(), 4); 328 assert!(executions_queue.priority_queue.transactions.contains_key(&executions[4].0)); 329 assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[1].0)); 330 331 // Insert a high-priority transaction and evict the lowest high-priority transaction. 332 executions_queue.insert(executions[5].0, executions[5].1.clone(), U64::new(150)).unwrap(); 333 assert_eq!(executions_queue.queue.len(), 0); 334 assert_eq!(executions_queue.priority_queue.len(), 4); 335 assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[4].0)); 336 assert!(executions_queue.priority_queue.transactions.contains_key(&executions[5].0)); 337 338 // Try to insert a low-priority transaction and expect an error. 339 assert!(executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).is_err()); 340 341 // Pop the transactions in the correct order. 342 assert_eq!(executions_queue.pop().unwrap(), executions[0]); 343 assert_eq!(executions_queue.pop().unwrap(), executions[3]); 344 assert_eq!(executions_queue.pop().unwrap(), executions[5]); 345 assert_eq!(executions_queue.pop().unwrap(), executions[2]); 346 347 // Check the queue is empty. 348 assert_eq!(executions_queue.len(), 0); 349 assert_eq!(executions_queue.queue.len(), 0); 350 assert_eq!(executions_queue.priority_queue.len(), 0); 351 assert!(executions_queue.pop().is_none()); 352 353 // Insert a low-priority transaction and expect it to be inserted into the queue. 354 executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).unwrap(); 355 assert_eq!(executions_queue.len(), 1); 356 assert_eq!(executions_queue.queue.len(), 1); 357 assert_eq!(executions_queue.priority_queue.len(), 0); 358 } 359 }