/ abzu-core / src / replication.rs
replication.rs
  1  //! Replication Service
  2  //!
  3  //! Manages content replication from Edge nodes to their Home Node.
  4  //! Uses a sled-backed journal for crash resilience.
  5  
  6  use std::time::{Duration, SystemTime, UNIX_EPOCH};
  7  
  8  use serde::{Deserialize, Serialize};
  9  use sled::Tree;
 10  use thiserror::Error;
 11  use tracing::{debug, info, warn};
 12  
 13  /// Replication errors
 14  #[derive(Debug, Error)]
 15  pub enum ReplicationError {
 16      #[error("Storage error: {0}")]
 17      Storage(#[from] sled::Error),
 18  
 19      #[error("Serialization error: {0}")]
 20      Serialization(String),
 21  }
 22  
 23  /// A pending replication operation
 24  #[derive(Debug, Clone, Serialize, Deserialize)]
 25  pub struct PendingOp {
 26      /// Content ID to replicate
 27      pub cid: [u8; 32],
 28      /// Target Home Node peer ID
 29      pub target: [u8; 32],
 30      /// Unix timestamp when queued
 31      pub created_at: u64,
 32      /// Number of attempts so far
 33      pub attempts: u32,
 34      /// Unix timestamp of last attempt (0 if never attempted)
 35      pub last_attempt: u64,
 36  }
 37  
 38  impl PendingOp {
 39      /// Create a new pending operation
 40      pub fn new(cid: [u8; 32], target: [u8; 32]) -> Self {
 41          Self {
 42              cid,
 43              target,
 44              created_at: current_timestamp(),
 45              attempts: 0,
 46              last_attempt: 0,
 47          }
 48      }
 49  
 50      /// Calculate backoff duration before next retry (capped at 15 minutes)
 51      pub fn next_backoff(&self) -> Duration {
 52          // Exponential backoff: 1s, 2s, 4s, 8s, ... capped at 15 min
 53          let secs = 1u64 << self.attempts.min(13); // Max 2^13 = 8192s (cap to 900)
 54          Duration::from_secs(secs.min(900))
 55      }
 56  
 57      /// Check if ready for retry
 58      pub fn ready_for_retry(&self, now: u64) -> bool {
 59          if self.last_attempt == 0 {
 60              return true; // Never attempted
 61          }
 62          let backoff_secs = self.next_backoff().as_secs();
 63          now >= self.last_attempt + backoff_secs
 64      }
 65  
 66      /// Record an attempt
 67      pub fn mark_attempt(&mut self) {
 68          self.attempts += 1;
 69          self.last_attempt = current_timestamp();
 70      }
 71  }
 72  
 73  /// Replication journal backed by sled
 74  pub struct ReplicationService {
 75      pending: Tree,
 76  }
 77  
 78  impl ReplicationService {
 79      /// Create a new replication service from a sled tree
 80      pub fn new(tree: Tree) -> Self {
 81          Self { pending: tree }
 82      }
 83  
 84      /// Queue a content ID for replication to the Home Node
 85      pub fn queue(&self, cid: [u8; 32], target: [u8; 32]) -> Result<(), ReplicationError> {
 86          // Check if already queued
 87          if self.pending.contains_key(&cid)? {
 88              debug!(cid = %hex(&cid[..8]), "Already queued for replication");
 89              return Ok(());
 90          }
 91  
 92          let op = PendingOp::new(cid, target);
 93          let value = postcard::to_allocvec(&op)
 94              .map_err(|e| ReplicationError::Serialization(e.to_string()))?;
 95          self.pending.insert(&cid, value)?;
 96  
 97          debug!(
 98              cid = %hex(&cid[..8]),
 99              target = %hex(&target[..8]),
100              "Queued for Home Node replication"
101          );
102          Ok(())
103      }
104  
105      /// Remove a successfully replicated item
106      pub fn complete(&self, cid: &[u8; 32]) -> Result<bool, ReplicationError> {
107          Ok(self.pending.remove(cid)?.is_some())
108      }
109  
110      /// Get all pending operations ready for retry
111      pub fn ready_for_retry(&self) -> Result<Vec<PendingOp>, ReplicationError> {
112          let now = current_timestamp();
113          let mut ready = Vec::new();
114  
115          for item in self.pending.iter() {
116              let (_, value) = item?;
117              match postcard::from_bytes::<PendingOp>(&value) {
118                  Ok(op) if op.ready_for_retry(now) => ready.push(op),
119                  Ok(_) => {} // Not ready yet
120                  Err(e) => warn!("Failed to deserialize pending op: {}", e),
121              }
122          }
123  
124          // Sort by created_at (oldest first)
125          ready.sort_by_key(|op| op.created_at);
126          Ok(ready)
127      }
128  
129      /// Update an operation after an attempt
130      pub fn update(&self, op: &PendingOp) -> Result<(), ReplicationError> {
131          let value = postcard::to_allocvec(op)
132              .map_err(|e| ReplicationError::Serialization(e.to_string()))?;
133          self.pending.insert(&op.cid, value)?;
134          Ok(())
135      }
136  
137      /// Get pending count
138      pub fn pending_count(&self) -> usize {
139          self.pending.len()
140      }
141  
142      /// Prune operations that have been pending too long (> 7 days)
143      pub fn prune_stale(&self) -> Result<usize, ReplicationError> {
144          let week_ago = current_timestamp().saturating_sub(7 * 24 * 60 * 60);
145          let mut pruned = 0;
146          let mut to_remove = Vec::new();
147  
148          for item in self.pending.iter() {
149              let (key, value) = item?;
150              if let Ok(op) = postcard::from_bytes::<PendingOp>(&value) {
151                  if op.created_at < week_ago {
152                      to_remove.push(key.to_vec());
153                  }
154              }
155          }
156  
157          for key in to_remove {
158              self.pending.remove(key)?;
159              pruned += 1;
160          }
161  
162          if pruned > 0 {
163              info!(count = pruned, "Pruned stale replication ops");
164          }
165          Ok(pruned)
166      }
167  
168      /// Flush to disk
169      pub fn flush(&self) -> Result<(), ReplicationError> {
170          self.pending.flush()?;
171          Ok(())
172      }
173  }
174  
175  /// Get current Unix timestamp in seconds
176  fn current_timestamp() -> u64 {
177      SystemTime::now()
178          .duration_since(UNIX_EPOCH)
179          .unwrap_or_default()
180          .as_secs()
181  }
182  
183  /// Hex encode helper (for logging)
184  fn hex(bytes: &[u8]) -> String {
185      bytes.iter().map(|b| format!("{:02x}", b)).collect()
186  }
187  
188  #[cfg(test)]
189  mod tests {
190      use super::*;
191      use tempfile::tempdir;
192  
193      fn test_cid(seed: u8) -> [u8; 32] {
194          let mut cid = [0u8; 32];
195          cid[0] = seed;
196          cid
197      }
198  
199      fn test_target() -> [u8; 32] {
200          [0xAB; 32]
201      }
202  
203      #[test]
204      fn test_queue_and_complete() {
205          let tmp = tempdir().unwrap();
206          let db = sled::open(tmp.path()).unwrap();
207          let tree = db.open_tree("replication").unwrap();
208          let svc = ReplicationService::new(tree);
209  
210          let cid = test_cid(1);
211          svc.queue(cid, test_target()).unwrap();
212          assert_eq!(svc.pending_count(), 1);
213  
214          svc.complete(&cid).unwrap();
215          assert_eq!(svc.pending_count(), 0);
216      }
217  
218      #[test]
219      fn test_dedup() {
220          let tmp = tempdir().unwrap();
221          let db = sled::open(tmp.path()).unwrap();
222          let tree = db.open_tree("replication").unwrap();
223          let svc = ReplicationService::new(tree);
224  
225          let cid = test_cid(1);
226          svc.queue(cid, test_target()).unwrap();
227          svc.queue(cid, test_target()).unwrap(); // Duplicate
228          assert_eq!(svc.pending_count(), 1);
229      }
230  
231      #[test]
232      fn test_backoff_calculation() {
233          let mut op = PendingOp::new(test_cid(1), test_target());
234          assert_eq!(op.next_backoff(), Duration::from_secs(1));
235  
236          op.attempts = 1;
237          assert_eq!(op.next_backoff(), Duration::from_secs(2));
238  
239          op.attempts = 3;
240          assert_eq!(op.next_backoff(), Duration::from_secs(8));
241  
242          op.attempts = 20; // Should cap
243          assert_eq!(op.next_backoff(), Duration::from_secs(900)); // 15 min cap
244      }
245  
246      #[test]
247      fn test_ready_for_retry() {
248          let tmp = tempdir().unwrap();
249          let db = sled::open(tmp.path()).unwrap();
250          let tree = db.open_tree("replication").unwrap();
251          let svc = ReplicationService::new(tree);
252  
253          let cid = test_cid(1);
254          svc.queue(cid, test_target()).unwrap();
255  
256          let ready = svc.ready_for_retry().unwrap();
257          assert_eq!(ready.len(), 1);
258  
259          // Mark as attempted
260          let mut op = ready[0].clone();
261          op.mark_attempt();
262          svc.update(&op).unwrap();
263  
264          // Should not be ready immediately
265          let ready = svc.ready_for_retry().unwrap();
266          assert_eq!(ready.len(), 0);
267      }
268  
269      #[test]
270      fn test_persistence() {
271          let tmp = tempdir().unwrap();
272          let path = tmp.path().to_path_buf();
273          let cid = test_cid(1);
274  
275          // Queue
276          {
277              let db = sled::open(&path).unwrap();
278              let tree = db.open_tree("replication").unwrap();
279              let svc = ReplicationService::new(tree);
280              svc.queue(cid, test_target()).unwrap();
281              svc.flush().unwrap();
282          }
283  
284          // Verify after "restart"
285          {
286              let db = sled::open(&path).unwrap();
287              let tree = db.open_tree("replication").unwrap();
288              let svc = ReplicationService::new(tree);
289              assert_eq!(svc.pending_count(), 1);
290          }
291      }
292  }