replication.rs
1 //! Replication Service 2 //! 3 //! Manages content replication from Edge nodes to their Home Node. 4 //! Uses a sled-backed journal for crash resilience. 5 6 use std::time::{Duration, SystemTime, UNIX_EPOCH}; 7 8 use serde::{Deserialize, Serialize}; 9 use sled::Tree; 10 use thiserror::Error; 11 use tracing::{debug, info, warn}; 12 13 /// Replication errors 14 #[derive(Debug, Error)] 15 pub enum ReplicationError { 16 #[error("Storage error: {0}")] 17 Storage(#[from] sled::Error), 18 19 #[error("Serialization error: {0}")] 20 Serialization(String), 21 } 22 23 /// A pending replication operation 24 #[derive(Debug, Clone, Serialize, Deserialize)] 25 pub struct PendingOp { 26 /// Content ID to replicate 27 pub cid: [u8; 32], 28 /// Target Home Node peer ID 29 pub target: [u8; 32], 30 /// Unix timestamp when queued 31 pub created_at: u64, 32 /// Number of attempts so far 33 pub attempts: u32, 34 /// Unix timestamp of last attempt (0 if never attempted) 35 pub last_attempt: u64, 36 } 37 38 impl PendingOp { 39 /// Create a new pending operation 40 pub fn new(cid: [u8; 32], target: [u8; 32]) -> Self { 41 Self { 42 cid, 43 target, 44 created_at: current_timestamp(), 45 attempts: 0, 46 last_attempt: 0, 47 } 48 } 49 50 /// Calculate backoff duration before next retry (capped at 15 minutes) 51 pub fn next_backoff(&self) -> Duration { 52 // Exponential backoff: 1s, 2s, 4s, 8s, ... capped at 15 min 53 let secs = 1u64 << self.attempts.min(13); // Max 2^13 = 8192s (cap to 900) 54 Duration::from_secs(secs.min(900)) 55 } 56 57 /// Check if ready for retry 58 pub fn ready_for_retry(&self, now: u64) -> bool { 59 if self.last_attempt == 0 { 60 return true; // Never attempted 61 } 62 let backoff_secs = self.next_backoff().as_secs(); 63 now >= self.last_attempt + backoff_secs 64 } 65 66 /// Record an attempt 67 pub fn mark_attempt(&mut self) { 68 self.attempts += 1; 69 self.last_attempt = current_timestamp(); 70 } 71 } 72 73 /// Replication journal backed by sled 74 pub struct ReplicationService { 75 pending: Tree, 76 } 77 78 impl ReplicationService { 79 /// Create a new replication service from a sled tree 80 pub fn new(tree: Tree) -> Self { 81 Self { pending: tree } 82 } 83 84 /// Queue a content ID for replication to the Home Node 85 pub fn queue(&self, cid: [u8; 32], target: [u8; 32]) -> Result<(), ReplicationError> { 86 // Check if already queued 87 if self.pending.contains_key(&cid)? { 88 debug!(cid = %hex(&cid[..8]), "Already queued for replication"); 89 return Ok(()); 90 } 91 92 let op = PendingOp::new(cid, target); 93 let value = postcard::to_allocvec(&op) 94 .map_err(|e| ReplicationError::Serialization(e.to_string()))?; 95 self.pending.insert(&cid, value)?; 96 97 debug!( 98 cid = %hex(&cid[..8]), 99 target = %hex(&target[..8]), 100 "Queued for Home Node replication" 101 ); 102 Ok(()) 103 } 104 105 /// Remove a successfully replicated item 106 pub fn complete(&self, cid: &[u8; 32]) -> Result<bool, ReplicationError> { 107 Ok(self.pending.remove(cid)?.is_some()) 108 } 109 110 /// Get all pending operations ready for retry 111 pub fn ready_for_retry(&self) -> Result<Vec<PendingOp>, ReplicationError> { 112 let now = current_timestamp(); 113 let mut ready = Vec::new(); 114 115 for item in self.pending.iter() { 116 let (_, value) = item?; 117 match postcard::from_bytes::<PendingOp>(&value) { 118 Ok(op) if op.ready_for_retry(now) => ready.push(op), 119 Ok(_) => {} // Not ready yet 120 Err(e) => warn!("Failed to deserialize pending op: {}", e), 121 } 122 } 123 124 // Sort by created_at (oldest first) 125 ready.sort_by_key(|op| op.created_at); 126 Ok(ready) 127 } 128 129 /// Update an operation after an attempt 130 pub fn update(&self, op: &PendingOp) -> Result<(), ReplicationError> { 131 let value = postcard::to_allocvec(op) 132 .map_err(|e| ReplicationError::Serialization(e.to_string()))?; 133 self.pending.insert(&op.cid, value)?; 134 Ok(()) 135 } 136 137 /// Get pending count 138 pub fn pending_count(&self) -> usize { 139 self.pending.len() 140 } 141 142 /// Prune operations that have been pending too long (> 7 days) 143 pub fn prune_stale(&self) -> Result<usize, ReplicationError> { 144 let week_ago = current_timestamp().saturating_sub(7 * 24 * 60 * 60); 145 let mut pruned = 0; 146 let mut to_remove = Vec::new(); 147 148 for item in self.pending.iter() { 149 let (key, value) = item?; 150 if let Ok(op) = postcard::from_bytes::<PendingOp>(&value) { 151 if op.created_at < week_ago { 152 to_remove.push(key.to_vec()); 153 } 154 } 155 } 156 157 for key in to_remove { 158 self.pending.remove(key)?; 159 pruned += 1; 160 } 161 162 if pruned > 0 { 163 info!(count = pruned, "Pruned stale replication ops"); 164 } 165 Ok(pruned) 166 } 167 168 /// Flush to disk 169 pub fn flush(&self) -> Result<(), ReplicationError> { 170 self.pending.flush()?; 171 Ok(()) 172 } 173 } 174 175 /// Get current Unix timestamp in seconds 176 fn current_timestamp() -> u64 { 177 SystemTime::now() 178 .duration_since(UNIX_EPOCH) 179 .unwrap_or_default() 180 .as_secs() 181 } 182 183 /// Hex encode helper (for logging) 184 fn hex(bytes: &[u8]) -> String { 185 bytes.iter().map(|b| format!("{:02x}", b)).collect() 186 } 187 188 #[cfg(test)] 189 mod tests { 190 use super::*; 191 use tempfile::tempdir; 192 193 fn test_cid(seed: u8) -> [u8; 32] { 194 let mut cid = [0u8; 32]; 195 cid[0] = seed; 196 cid 197 } 198 199 fn test_target() -> [u8; 32] { 200 [0xAB; 32] 201 } 202 203 #[test] 204 fn test_queue_and_complete() { 205 let tmp = tempdir().unwrap(); 206 let db = sled::open(tmp.path()).unwrap(); 207 let tree = db.open_tree("replication").unwrap(); 208 let svc = ReplicationService::new(tree); 209 210 let cid = test_cid(1); 211 svc.queue(cid, test_target()).unwrap(); 212 assert_eq!(svc.pending_count(), 1); 213 214 svc.complete(&cid).unwrap(); 215 assert_eq!(svc.pending_count(), 0); 216 } 217 218 #[test] 219 fn test_dedup() { 220 let tmp = tempdir().unwrap(); 221 let db = sled::open(tmp.path()).unwrap(); 222 let tree = db.open_tree("replication").unwrap(); 223 let svc = ReplicationService::new(tree); 224 225 let cid = test_cid(1); 226 svc.queue(cid, test_target()).unwrap(); 227 svc.queue(cid, test_target()).unwrap(); // Duplicate 228 assert_eq!(svc.pending_count(), 1); 229 } 230 231 #[test] 232 fn test_backoff_calculation() { 233 let mut op = PendingOp::new(test_cid(1), test_target()); 234 assert_eq!(op.next_backoff(), Duration::from_secs(1)); 235 236 op.attempts = 1; 237 assert_eq!(op.next_backoff(), Duration::from_secs(2)); 238 239 op.attempts = 3; 240 assert_eq!(op.next_backoff(), Duration::from_secs(8)); 241 242 op.attempts = 20; // Should cap 243 assert_eq!(op.next_backoff(), Duration::from_secs(900)); // 15 min cap 244 } 245 246 #[test] 247 fn test_ready_for_retry() { 248 let tmp = tempdir().unwrap(); 249 let db = sled::open(tmp.path()).unwrap(); 250 let tree = db.open_tree("replication").unwrap(); 251 let svc = ReplicationService::new(tree); 252 253 let cid = test_cid(1); 254 svc.queue(cid, test_target()).unwrap(); 255 256 let ready = svc.ready_for_retry().unwrap(); 257 assert_eq!(ready.len(), 1); 258 259 // Mark as attempted 260 let mut op = ready[0].clone(); 261 op.mark_attempt(); 262 svc.update(&op).unwrap(); 263 264 // Should not be ready immediately 265 let ready = svc.ready_for_retry().unwrap(); 266 assert_eq!(ready.len(), 0); 267 } 268 269 #[test] 270 fn test_persistence() { 271 let tmp = tempdir().unwrap(); 272 let path = tmp.path().to_path_buf(); 273 let cid = test_cid(1); 274 275 // Queue 276 { 277 let db = sled::open(&path).unwrap(); 278 let tree = db.open_tree("replication").unwrap(); 279 let svc = ReplicationService::new(tree); 280 svc.queue(cid, test_target()).unwrap(); 281 svc.flush().unwrap(); 282 } 283 284 // Verify after "restart" 285 { 286 let db = sled::open(&path).unwrap(); 287 let tree = db.open_tree("replication").unwrap(); 288 let svc = ReplicationService::new(tree); 289 assert_eq!(svc.pending_count(), 1); 290 } 291 } 292 }