queue.rs
1 // Copyright (c) 2025 ALPHA/DELTA Network 2 3 //! Pending attestation queue for cross-chain operations. 4 5 use crate::CrossChainMessage; 6 use std::collections::VecDeque; 7 use tokio::sync::RwLock; 8 use tracing::{debug, info, warn}; 9 10 /// Maximum pending attestations before backpressure. 11 const MAX_PENDING: usize = 10_000; 12 13 /// Queue for pending cross-chain attestations. 14 pub struct AttestationQueue { 15 /// Pending ALPHA → DELTA attestations. 16 alpha_to_delta: RwLock<VecDeque<PendingAttestation>>, 17 /// Pending DELTA → ALPHA attestations. 18 delta_to_alpha: RwLock<VecDeque<PendingAttestation>>, 19 } 20 21 /// A pending attestation with metadata. 22 #[derive(Debug, Clone)] 23 pub struct PendingAttestation { 24 /// The cross-chain message. 25 pub message: CrossChainMessage, 26 /// When this attestation was queued (block height on source chain). 27 pub queued_at: u64, 28 /// Number of retry attempts. 29 pub retries: u32, 30 } 31 32 impl AttestationQueue { 33 /// Create a new attestation queue. 34 pub fn new() -> Self { 35 Self { 36 alpha_to_delta: RwLock::new(VecDeque::new()), 37 delta_to_alpha: RwLock::new(VecDeque::new()), 38 } 39 } 40 41 /// Queue an ALPHA → DELTA attestation. 42 pub async fn queue_alpha_to_delta(&self, message: CrossChainMessage, source_block: u64) -> bool { 43 let mut queue = self.alpha_to_delta.write().await; 44 45 if queue.len() >= MAX_PENDING { 46 warn!("ALPHA→DELTA queue full, rejecting attestation"); 47 return false; 48 } 49 50 queue.push_back(PendingAttestation { 51 message, 52 queued_at: source_block, 53 retries: 0, 54 }); 55 56 debug!("Queued ALPHA→DELTA attestation (queue size: {})", queue.len()); 57 true 58 } 59 60 /// Queue a DELTA → ALPHA attestation. 61 pub async fn queue_delta_to_alpha(&self, message: CrossChainMessage, source_block: u64) -> bool { 62 let mut queue = self.delta_to_alpha.write().await; 63 64 if queue.len() >= MAX_PENDING { 65 warn!("DELTA→ALPHA queue full, rejecting attestation"); 66 return false; 67 } 68 69 queue.push_back(PendingAttestation { 70 message, 71 queued_at: source_block, 72 retries: 0, 73 }); 74 75 debug!("Queued DELTA→ALPHA attestation (queue size: {})", queue.len()); 76 true 77 } 78 79 /// Get the next pending ALPHA → DELTA attestation. 80 pub async fn pop_alpha_to_delta(&self) -> Option<PendingAttestation> { 81 self.alpha_to_delta.write().await.pop_front() 82 } 83 84 /// Get the next pending DELTA → ALPHA attestation. 85 pub async fn pop_delta_to_alpha(&self) -> Option<PendingAttestation> { 86 self.delta_to_alpha.write().await.pop_front() 87 } 88 89 /// Requeue a failed attestation for retry. 90 pub async fn requeue_alpha_to_delta(&self, mut attestation: PendingAttestation) -> bool { 91 attestation.retries += 1; 92 93 if attestation.retries > 5 { 94 warn!("Dropping ALPHA→DELTA attestation after 5 retries"); 95 return false; 96 } 97 98 let mut queue = self.alpha_to_delta.write().await; 99 queue.push_back(attestation); 100 true 101 } 102 103 /// Requeue a failed attestation for retry. 104 pub async fn requeue_delta_to_alpha(&self, mut attestation: PendingAttestation) -> bool { 105 attestation.retries += 1; 106 107 if attestation.retries > 5 { 108 warn!("Dropping DELTA→ALPHA attestation after 5 retries"); 109 return false; 110 } 111 112 let mut queue = self.delta_to_alpha.write().await; 113 queue.push_back(attestation); 114 true 115 } 116 117 /// Get queue lengths. 118 pub async fn queue_lengths(&self) -> (usize, usize) { 119 ( 120 self.alpha_to_delta.read().await.len(), 121 self.delta_to_alpha.read().await.len(), 122 ) 123 } 124 125 /// Clear all pending attestations (for emergency/reset). 126 pub async fn clear_all(&self) { 127 self.alpha_to_delta.write().await.clear(); 128 self.delta_to_alpha.write().await.clear(); 129 info!("All pending attestations cleared"); 130 } 131 } 132 133 impl Default for AttestationQueue { 134 fn default() -> Self { 135 Self::new() 136 } 137 }