transactions_queue.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use std::{ 17 cmp::Reverse, 18 collections::{BTreeMap, HashMap, hash_map::Entry}, 19 num::NonZeroUsize, 20 }; 21 22 use alphavm::{ledger::Transaction, prelude::*}; 23 use anyhow::{Result, bail}; 24 use lru::LruCache; 25 26 use crate::{CAPACITY_FOR_DEPLOYMENTS, CAPACITY_FOR_EXECUTIONS}; 27 28 pub struct TransactionsQueue<N: Network> { 29 pub deployments: TransactionsQueueInner<N>, 30 pub executions: TransactionsQueueInner<N>, 31 } 32 33 impl<N: Network> Default for TransactionsQueue<N> { 34 fn default() -> Self { 35 Self { 36 deployments: TransactionsQueueInner::new(CAPACITY_FOR_DEPLOYMENTS), 37 executions: TransactionsQueueInner::new(CAPACITY_FOR_EXECUTIONS), 38 } 39 } 40 } 41 42 impl<N: Network> TransactionsQueue<N> { 43 pub fn contains(&self, transaction_id: &N::TransactionID) -> bool { 44 self.executions.contains(transaction_id) || self.deployments.contains(transaction_id) 45 } 46 47 pub fn insert( 48 &mut self, 49 transaction_id: N::TransactionID, 50 transaction: Transaction<N>, 51 priority_fee: U64<N>, 52 ) -> Result<()> { 53 if transaction.is_execute() { 54 self.executions.insert(transaction_id, transaction, priority_fee) 55 } else { 56 self.deployments.insert(transaction_id, transaction, priority_fee) 57 } 58 } 59 60 pub fn transactions(&self) -> impl Iterator<Item = (N::TransactionID, Transaction<N>)> + use<N> { 61 self.deployments 62 .priority_queue 63 .transactions 64 .clone() 65 .into_iter() 66 .chain(self.deployments.queue.clone()) 67 .chain(self.executions.priority_queue.transactions.clone()) 68 .chain(self.executions.queue.clone()) 69 } 70 } 71 72 pub struct TransactionsQueueInner<N: Network> { 73 capacity: usize, 74 queue: LruCache<N::TransactionID, Transaction<N>>, 75 priority_queue: PriorityQueue<N>, 76 } 77 78 impl<N: Network> TransactionsQueueInner<N> { 79 fn new(capacity: usize) -> Self { 80 Self { 81 capacity, 82 queue: LruCache::new(NonZeroUsize::new(capacity).unwrap()), 83 priority_queue: Default::default(), 84 } 85 } 86 87 pub fn len(&self) -> usize { 88 self.queue.len().saturating_add(self.priority_queue.len()) 89 } 90 91 fn contains(&self, transaction_id: &N::TransactionID) -> bool { 92 self.queue.contains(transaction_id) || self.priority_queue.transactions.contains_key(transaction_id) 93 } 94 95 fn insert( 96 &mut self, 97 transaction_id: N::TransactionID, 98 transaction: Transaction<N>, 99 priority_fee: U64<N>, 100 ) -> Result<()> { 101 // If the queue is not full, insert in the appropriate queue. 102 if self.len() < self.capacity { 103 if priority_fee.is_zero() { 104 self.queue.get_or_insert(transaction_id, || transaction); 105 } else { 106 self.priority_queue.insert(transaction_id, transaction, priority_fee); 107 } 108 109 return Ok(()); 110 } 111 112 match (self.priority_queue.len() < self.capacity, *priority_fee) { 113 // Invariant: if the queue is at capacity but the priority queue 114 // isn't equal to the capacity, the low-priority queue must be non-empty. 115 (true, 0) => { 116 let _ = self.queue.get_or_insert(transaction_id, || transaction); 117 } 118 (true, _fee) => { 119 // Remove an entry from the low-priority queue to make room for the high-priority transaction. 120 self.queue.pop_lru(); 121 self.priority_queue.insert(transaction_id, transaction, priority_fee) 122 } 123 124 // Invariant: if the queue is at capacity but the priority queue is 125 // equal to the capacity, the low-priority queue must be empty. 126 (false, 0) => bail!("The memory pool is full"), 127 (false, _fee) => self.priority_queue.compare_insert(transaction_id, transaction, priority_fee), 128 } 129 130 Ok(()) 131 } 132 133 pub fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> { 134 self.priority_queue.pop().or_else(|| self.queue.pop_lru()) 135 } 136 } 137 138 struct PriorityQueue<N: Network> { 139 /// A counter to ensure fifo ordering for transmissions with the same fee. 140 counter: u64, 141 /// A map of transmissions ordered by fee and by fifo sequence. 142 transaction_ids: BTreeMap<(Reverse<U64<N>>, u64), N::TransactionID>, 143 /// A map of transmission IDs to transmissions. 144 transactions: HashMap<N::TransactionID, Transaction<N>>, 145 } 146 147 impl<N: Network> Default for PriorityQueue<N> { 148 /// Initializes a new instance of the priority queue. 149 fn default() -> Self { 150 Self { counter: Default::default(), transaction_ids: Default::default(), transactions: Default::default() } 151 } 152 } 153 154 impl<N: Network> PriorityQueue<N> { 155 fn len(&self) -> usize { 156 self.transactions.len() 157 } 158 159 fn insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) { 160 if let Entry::Vacant(entry) = self.transactions.entry(transaction_id) { 161 // Insert the transaction into the map. 162 entry.insert(transaction); 163 // Sort by fee (highest first) and counter (fifo). 164 self.transaction_ids.insert((Reverse(fee), self.counter), transaction_id); 165 // Increment the counter. 166 self.counter += 1; 167 } 168 } 169 170 fn compare_insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) { 171 // Make sure the collection isn't empty. 172 if self.transaction_ids.is_empty() { 173 return; 174 } 175 176 // If the lowest fee in the collection is higher than the new fee, no-op. 177 // 178 // SAFETY: the empty check guarantees an item will be returned 179 let ((Reverse(lowest_fee), _), _) = self.transaction_ids.last_key_value().expect("item must be present"); 180 if lowest_fee > &fee { 181 return; 182 } 183 184 // Otherwise, remove the current value and insert the new. 185 // 186 // SAFETY: the empty check guarantees an item will be returned 187 let (_, id) = self.transaction_ids.pop_last().expect("item must be present"); 188 self.transactions.remove(&id); 189 self.insert(transaction_id, transaction, fee); 190 } 191 192 fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> { 193 let (_, transaction_id) = self.transaction_ids.pop_first()?; 194 self.transactions.remove(&transaction_id).map(|transaction| (transaction_id, transaction)) 195 } 196 } 197 198 #[cfg(test)] 199 mod tests { 200 use super::*; 201 202 use alphavm::{ 203 ledger::test_helpers::{sample_deployment_transaction, sample_execution_transaction_with_fee}, 204 prelude::{MainnetV0, TestRng}, 205 }; 206 207 type CurrentNetwork = MainnetV0; 208 209 #[test] 210 fn insert_and_pop_low_priority_transactions() { 211 let mut rng = TestRng::default(); 212 213 /* Executions */ 214 215 // Test low-priority execution transaction. 216 let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0); 217 let execution_id = execution_transaction.id(); 218 let zero_fee = U64::new(0); 219 220 let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default(); 221 assert!(!transactions_queue.contains(&execution_id)); 222 transactions_queue.insert(execution_id, execution_transaction.clone(), zero_fee).unwrap(); 223 224 // Check execution was put into the right queue. 225 assert!(transactions_queue.contains(&execution_id)); 226 assert!(transactions_queue.executions.contains(&execution_id)); 227 assert!(!transactions_queue.deployments.contains(&execution_id)); 228 229 // Pop the execution transaction. 230 let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap(); 231 assert_eq!(popped_execution_id, execution_id); 232 assert_eq!(popped_execution_transaction, execution_transaction); 233 assert!(!transactions_queue.contains(&execution_id)); 234 235 /* Deployments */ 236 237 // Test low-priority deployment transaction. 238 let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng); 239 let deployment_id = deployment_transaction.id(); 240 241 assert!(!transactions_queue.contains(&deployment_id)); 242 transactions_queue.insert(deployment_id, deployment_transaction.clone(), zero_fee).unwrap(); 243 244 // Check deployment was put into the right queue. 245 assert!(transactions_queue.contains(&deployment_id)); 246 assert!(transactions_queue.deployments.contains(&deployment_id)); 247 assert!(!transactions_queue.executions.contains(&deployment_id)); 248 249 // Pop the deployment transaction. 250 let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap(); 251 assert_eq!(popped_deployment_id, deployment_id); 252 assert_eq!(popped_deployment_transaction, deployment_transaction); 253 assert!(!transactions_queue.contains(&deployment_id)); 254 } 255 256 #[test] 257 fn insert_and_pop_high_priority_transactions() { 258 let mut rng = TestRng::default(); 259 260 /* Executions */ 261 262 // Test high-priority execution transaction. 263 let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0); 264 let execution_id = execution_transaction.id(); 265 let high_fee = U64::new(100); 266 267 let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default(); 268 assert!(!transactions_queue.contains(&execution_id)); 269 transactions_queue.insert(execution_id, execution_transaction.clone(), high_fee).unwrap(); 270 271 // Check execution was put into the priority queue. 272 assert!(transactions_queue.contains(&execution_id)); 273 assert!(transactions_queue.executions.contains(&execution_id)); 274 assert!(transactions_queue.executions.priority_queue.transactions.contains_key(&execution_id)); 275 276 // Pop the execution transaction. 277 let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap(); 278 assert_eq!(popped_execution_id, execution_id); 279 assert_eq!(popped_execution_transaction, execution_transaction); 280 assert!(!transactions_queue.contains(&execution_id)); 281 282 /* Deployments */ 283 284 // Test high-priority deployment transaction. 285 let deployment_transaction = sample_deployment_transaction(2, 0, false, &mut rng); 286 let deployment_id = deployment_transaction.id(); 287 288 assert!(!transactions_queue.contains(&deployment_id)); 289 transactions_queue.insert(deployment_id, deployment_transaction.clone(), high_fee).unwrap(); 290 291 // Check deployment was put into the priority queue. 292 assert!(transactions_queue.contains(&deployment_id)); 293 assert!(transactions_queue.deployments.contains(&deployment_id)); 294 assert!(transactions_queue.deployments.priority_queue.transactions.contains_key(&deployment_id)); 295 296 // Pop the deployment transaction. 297 let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap(); 298 assert_eq!(popped_deployment_id, deployment_id); 299 assert_eq!(popped_deployment_transaction, deployment_transaction); 300 assert!(!transactions_queue.contains(&deployment_id)); 301 } 302 303 #[test] 304 fn insert_and_pop_ordering_with_eviction() { 305 let mut rng = TestRng::default(); 306 307 let executions: Vec<_> = (0..10) 308 .map(|_| { 309 let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0); 310 (execution_transaction.id(), execution_transaction) 311 }) 312 .collect(); 313 314 let mut executions_queue = TransactionsQueueInner::new(4); 315 executions_queue.insert(executions[0].0, executions[0].1.clone(), U64::new(300)).unwrap(); 316 executions_queue.insert(executions[1].0, executions[1].1.clone(), U64::new(0)).unwrap(); 317 executions_queue.insert(executions[2].0, executions[2].1.clone(), U64::new(100)).unwrap(); 318 executions_queue.insert(executions[3].0, executions[3].1.clone(), U64::new(200)).unwrap(); 319 assert_eq!(executions_queue.len(), 4); 320 321 // Insert a high-priority transaction and evict the remaining low-priority transactions. 322 executions_queue.insert(executions[4].0, executions[4].1.clone(), U64::new(50)).unwrap(); 323 assert_eq!(executions_queue.queue.len(), 0); 324 assert_eq!(executions_queue.priority_queue.len(), 4); 325 assert!(executions_queue.priority_queue.transactions.contains_key(&executions[4].0)); 326 assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[1].0)); 327 328 // Insert a high-priority transaction and evict the lowest high-priority transaction. 329 executions_queue.insert(executions[5].0, executions[5].1.clone(), U64::new(150)).unwrap(); 330 assert_eq!(executions_queue.queue.len(), 0); 331 assert_eq!(executions_queue.priority_queue.len(), 4); 332 assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[4].0)); 333 assert!(executions_queue.priority_queue.transactions.contains_key(&executions[5].0)); 334 335 // Try to insert a low-priority transaction and expect an error. 336 assert!(executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).is_err()); 337 338 // Pop the transactions in the correct order. 339 assert_eq!(executions_queue.pop().unwrap(), executions[0]); 340 assert_eq!(executions_queue.pop().unwrap(), executions[3]); 341 assert_eq!(executions_queue.pop().unwrap(), executions[5]); 342 assert_eq!(executions_queue.pop().unwrap(), executions[2]); 343 344 // Check the queue is empty. 345 assert_eq!(executions_queue.len(), 0); 346 assert_eq!(executions_queue.queue.len(), 0); 347 assert_eq!(executions_queue.priority_queue.len(), 0); 348 assert!(executions_queue.pop().is_none()); 349 350 // Insert a low-priority transaction and expect it to be inserted into the queue. 351 executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).unwrap(); 352 assert_eq!(executions_queue.len(), 1); 353 assert_eq!(executions_queue.queue.len(), 1); 354 assert_eq!(executions_queue.priority_queue.len(), 0); 355 } 356 }