/ crates / adnet-ipc / src / queue.rs
queue.rs
  1  // Copyright (c) 2025 ALPHA/DELTA Network
  2  
  3  //! Pending attestation queue for cross-chain operations.
  4  
  5  use crate::CrossChainMessage;
  6  use std::collections::VecDeque;
  7  use tokio::sync::RwLock;
  8  use tracing::{debug, info, warn};
  9  
 10  /// Maximum pending attestations before backpressure.
 11  const MAX_PENDING: usize = 10_000;
 12  
 13  /// Queue for pending cross-chain attestations.
 14  pub struct AttestationQueue {
 15      /// Pending ALPHA → DELTA attestations.
 16      alpha_to_delta: RwLock<VecDeque<PendingAttestation>>,
 17      /// Pending DELTA → ALPHA attestations.
 18      delta_to_alpha: RwLock<VecDeque<PendingAttestation>>,
 19  }
 20  
 21  /// A pending attestation with metadata.
 22  #[derive(Debug, Clone)]
 23  pub struct PendingAttestation {
 24      /// The cross-chain message.
 25      pub message: CrossChainMessage,
 26      /// When this attestation was queued (block height on source chain).
 27      pub queued_at: u64,
 28      /// Number of retry attempts.
 29      pub retries: u32,
 30  }
 31  
 32  impl AttestationQueue {
 33      /// Create a new attestation queue.
 34      pub fn new() -> Self {
 35          Self {
 36              alpha_to_delta: RwLock::new(VecDeque::new()),
 37              delta_to_alpha: RwLock::new(VecDeque::new()),
 38          }
 39      }
 40  
 41      /// Queue an ALPHA → DELTA attestation.
 42      pub async fn queue_alpha_to_delta(&self, message: CrossChainMessage, source_block: u64) -> bool {
 43          let mut queue = self.alpha_to_delta.write().await;
 44  
 45          if queue.len() >= MAX_PENDING {
 46              warn!("ALPHA→DELTA queue full, rejecting attestation");
 47              return false;
 48          }
 49  
 50          queue.push_back(PendingAttestation {
 51              message,
 52              queued_at: source_block,
 53              retries: 0,
 54          });
 55  
 56          debug!("Queued ALPHA→DELTA attestation (queue size: {})", queue.len());
 57          true
 58      }
 59  
 60      /// Queue a DELTA → ALPHA attestation.
 61      pub async fn queue_delta_to_alpha(&self, message: CrossChainMessage, source_block: u64) -> bool {
 62          let mut queue = self.delta_to_alpha.write().await;
 63  
 64          if queue.len() >= MAX_PENDING {
 65              warn!("DELTA→ALPHA queue full, rejecting attestation");
 66              return false;
 67          }
 68  
 69          queue.push_back(PendingAttestation {
 70              message,
 71              queued_at: source_block,
 72              retries: 0,
 73          });
 74  
 75          debug!("Queued DELTA→ALPHA attestation (queue size: {})", queue.len());
 76          true
 77      }
 78  
 79      /// Get the next pending ALPHA → DELTA attestation.
 80      pub async fn pop_alpha_to_delta(&self) -> Option<PendingAttestation> {
 81          self.alpha_to_delta.write().await.pop_front()
 82      }
 83  
 84      /// Get the next pending DELTA → ALPHA attestation.
 85      pub async fn pop_delta_to_alpha(&self) -> Option<PendingAttestation> {
 86          self.delta_to_alpha.write().await.pop_front()
 87      }
 88  
 89      /// Requeue a failed attestation for retry.
 90      pub async fn requeue_alpha_to_delta(&self, mut attestation: PendingAttestation) -> bool {
 91          attestation.retries += 1;
 92  
 93          if attestation.retries > 5 {
 94              warn!("Dropping ALPHA→DELTA attestation after 5 retries");
 95              return false;
 96          }
 97  
 98          let mut queue = self.alpha_to_delta.write().await;
 99          queue.push_back(attestation);
100          true
101      }
102  
103      /// Requeue a failed attestation for retry.
104      pub async fn requeue_delta_to_alpha(&self, mut attestation: PendingAttestation) -> bool {
105          attestation.retries += 1;
106  
107          if attestation.retries > 5 {
108              warn!("Dropping DELTA→ALPHA attestation after 5 retries");
109              return false;
110          }
111  
112          let mut queue = self.delta_to_alpha.write().await;
113          queue.push_back(attestation);
114          true
115      }
116  
117      /// Get queue lengths.
118      pub async fn queue_lengths(&self) -> (usize, usize) {
119          (
120              self.alpha_to_delta.read().await.len(),
121              self.delta_to_alpha.read().await.len(),
122          )
123      }
124  
125      /// Clear all pending attestations (for emergency/reset).
126      pub async fn clear_all(&self) {
127          self.alpha_to_delta.write().await.clear();
128          self.delta_to_alpha.write().await.clear();
129          info!("All pending attestations cleared");
130      }
131  }
132  
133  impl Default for AttestationQueue {
134      fn default() -> Self {
135          Self::new()
136      }
137  }