persistence.rs
1 //! Peer Persistence 2 //! 3 //! Enables the "Submarine Test" by persisting known peers to disk. 4 //! After a restart, nodes can reconnect to previously known peers 5 //! without relying solely on bootstrap servers. 6 7 use std::net::SocketAddr; 8 use std::time::{SystemTime, UNIX_EPOCH}; 9 10 use serde::{Deserialize, Serialize}; 11 use sled::Tree; 12 use thiserror::Error; 13 use tracing::{debug, info, warn}; 14 15 use abzu_router::{PeerKey, TreeCoords}; 16 17 /// Persistence errors 18 #[derive(Debug, Error)] 19 pub enum PersistenceError { 20 #[error("Storage error: {0}")] 21 Storage(#[from] sled::Error), 22 23 #[error("Serialization error: {0}")] 24 Serialization(String), 25 } 26 27 /// A persistable peer record 28 #[derive(Debug, Clone, Serialize, Deserialize)] 29 pub struct PersistedPeer { 30 /// Peer's public key 31 pub key: PeerKey, 32 /// Last known socket address 33 pub address: SocketAddr, 34 /// Last known tree coordinates 35 pub coords: TreeCoords, 36 /// Unix timestamp of last successful connection 37 pub last_seen: u64, 38 /// Number of successful connections 39 pub success_count: u32, 40 /// Number of failed connection attempts since last success 41 pub fail_count: u32, 42 } 43 44 impl PersistedPeer { 45 /// Create a new persisted peer record 46 pub fn new(key: PeerKey, address: SocketAddr, coords: TreeCoords) -> Self { 47 Self { 48 key, 49 address, 50 coords, 51 last_seen: current_timestamp(), 52 success_count: 1, 53 fail_count: 0, 54 } 55 } 56 57 /// Record a successful connection 58 pub fn mark_success(&mut self) { 59 self.last_seen = current_timestamp(); 60 self.success_count = self.success_count.saturating_add(1); 61 self.fail_count = 0; 62 } 63 64 /// Record a failed connection attempt 65 pub fn mark_failure(&mut self) { 66 self.fail_count = self.fail_count.saturating_add(1); 67 } 68 69 /// Check if this peer should be pruned (too many failures) 70 pub fn should_prune(&self) -> bool { 71 // Prune if 5+ consecutive failures and last seen > 7 days ago 72 let week_ago = current_timestamp().saturating_sub(7 * 24 * 60 * 60); 73 self.fail_count >= 5 && self.last_seen < week_ago 74 } 75 76 /// Calculate a priority score for reconnection (higher = try first) 77 pub fn priority(&self) -> u64 { 78 // Favor recently seen peers with high success counts 79 let recency_score = self.last_seen / 3600; // Hours since epoch 80 let success_score = (self.success_count as u64) * 100; 81 let fail_penalty = (self.fail_count as u64) * 50; 82 recency_score + success_score - fail_penalty 83 } 84 } 85 86 /// Peer store backed by sled 87 pub struct PeerStore { 88 tree: Tree, 89 } 90 91 impl PeerStore { 92 /// Create a new peer store from a sled tree 93 pub fn new(tree: Tree) -> Self { 94 Self { tree } 95 } 96 97 /// Save a peer (insert or update) 98 pub fn save(&self, peer: &PersistedPeer) -> Result<(), PersistenceError> { 99 let value = postcard::to_allocvec(peer) 100 .map_err(|e| PersistenceError::Serialization(e.to_string()))?; 101 self.tree.insert(peer.key, value)?; 102 debug!( 103 peer = hex::encode(&peer.key[..8]), 104 addr = %peer.address, 105 "Persisted peer" 106 ); 107 Ok(()) 108 } 109 110 /// Load a peer by key 111 pub fn load(&self, key: &PeerKey) -> Result<Option<PersistedPeer>, PersistenceError> { 112 match self.tree.get(key)? { 113 Some(data) => { 114 let peer: PersistedPeer = postcard::from_bytes(&data) 115 .map_err(|e| PersistenceError::Serialization(e.to_string()))?; 116 Ok(Some(peer)) 117 } 118 None => Ok(None), 119 } 120 } 121 122 /// Remove a peer 123 pub fn remove(&self, key: &PeerKey) -> Result<bool, PersistenceError> { 124 Ok(self.tree.remove(key)?.is_some()) 125 } 126 127 /// Load all persisted peers, sorted by priority (highest first) 128 pub fn load_all(&self) -> Result<Vec<PersistedPeer>, PersistenceError> { 129 let mut peers = Vec::new(); 130 for item in self.tree.iter() { 131 let (_, value) = item?; 132 match postcard::from_bytes::<PersistedPeer>(&value) { 133 Ok(peer) => peers.push(peer), 134 Err(e) => warn!("Failed to deserialize peer: {}", e), 135 } 136 } 137 // Sort by priority (highest first) 138 peers.sort_by_key(|b| std::cmp::Reverse(b.priority())); 139 Ok(peers) 140 } 141 142 /// Record a successful connection to a peer 143 pub fn record_success( 144 &self, 145 key: PeerKey, 146 address: SocketAddr, 147 coords: TreeCoords, 148 ) -> Result<(), PersistenceError> { 149 let peer = match self.load(&key)? { 150 Some(mut existing) => { 151 existing.address = address; 152 existing.coords = coords; 153 existing.mark_success(); 154 existing 155 } 156 None => PersistedPeer::new(key, address, coords), 157 }; 158 self.save(&peer) 159 } 160 161 /// Record a failed connection attempt 162 pub fn record_failure(&self, key: &PeerKey) -> Result<(), PersistenceError> { 163 if let Some(mut peer) = self.load(key)? { 164 peer.mark_failure(); 165 self.save(&peer)?; 166 } 167 Ok(()) 168 } 169 170 /// Prune stale peers (too many failures, too old) 171 pub fn prune(&self) -> Result<usize, PersistenceError> { 172 let mut pruned = 0; 173 let mut to_remove = Vec::new(); 174 175 for item in self.tree.iter() { 176 let (key, value) = item?; 177 if let Ok(peer) = postcard::from_bytes::<PersistedPeer>(&value) 178 && peer.should_prune() { 179 to_remove.push(key.to_vec()); 180 } 181 } 182 183 for key in to_remove { 184 self.tree.remove(key)?; 185 pruned += 1; 186 } 187 188 if pruned > 0 { 189 info!(count = pruned, "Pruned stale peers"); 190 } 191 Ok(pruned) 192 } 193 194 /// Get peer count 195 pub fn count(&self) -> usize { 196 self.tree.len() 197 } 198 199 /// Flush to disk 200 pub fn flush(&self) -> Result<(), PersistenceError> { 201 self.tree.flush()?; 202 Ok(()) 203 } 204 } 205 206 /// Get current Unix timestamp in seconds 207 fn current_timestamp() -> u64 { 208 SystemTime::now() 209 .duration_since(UNIX_EPOCH) 210 .unwrap_or_default() 211 .as_secs() 212 } 213 214 /// Hex encode helper (used for logging) 215 mod hex { 216 pub fn encode(bytes: &[u8]) -> String { 217 bytes.iter().map(|b| format!("{:02x}", b)).collect() 218 } 219 } 220 221 #[cfg(test)] 222 mod tests { 223 use super::*; 224 use std::net::{IpAddr, Ipv4Addr}; 225 use tempfile::tempdir; 226 227 fn test_peer(seed: u8) -> PersistedPeer { 228 let mut key = [seed; 32]; 229 key[0] = seed; 230 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, seed)), 8080 + seed as u16); 231 let coords = TreeCoords::from_path(vec![1, seed as u64]); 232 PersistedPeer::new(key, addr, coords) 233 } 234 235 #[test] 236 fn test_save_load_peer() { 237 let tmp = tempdir().unwrap(); 238 let db = sled::open(tmp.path()).unwrap(); 239 let tree = db.open_tree("peers").unwrap(); 240 let store = PeerStore::new(tree); 241 242 let peer = test_peer(1); 243 store.save(&peer).unwrap(); 244 245 let loaded = store.load(&peer.key).unwrap().unwrap(); 246 assert_eq!(loaded.key, peer.key); 247 assert_eq!(loaded.address, peer.address); 248 } 249 250 #[test] 251 fn test_load_all_sorted() { 252 let tmp = tempdir().unwrap(); 253 let db = sled::open(tmp.path()).unwrap(); 254 let tree = db.open_tree("peers").unwrap(); 255 let store = PeerStore::new(tree); 256 257 // Save peers with different success counts 258 let mut peer1 = test_peer(1); 259 peer1.success_count = 10; 260 let mut peer2 = test_peer(2); 261 peer2.success_count = 5; 262 let mut peer3 = test_peer(3); 263 peer3.success_count = 20; 264 265 store.save(&peer1).unwrap(); 266 store.save(&peer2).unwrap(); 267 store.save(&peer3).unwrap(); 268 269 let all = store.load_all().unwrap(); 270 assert_eq!(all.len(), 3); 271 // Should be sorted by priority (peer3 highest success count) 272 assert_eq!(all[0].key[0], 3); 273 assert_eq!(all[1].key[0], 1); 274 assert_eq!(all[2].key[0], 2); 275 } 276 277 #[test] 278 fn test_record_success_failure() { 279 let tmp = tempdir().unwrap(); 280 let db = sled::open(tmp.path()).unwrap(); 281 let tree = db.open_tree("peers").unwrap(); 282 let store = PeerStore::new(tree); 283 284 let peer = test_peer(1); 285 store.record_success(peer.key, peer.address, peer.coords.clone()).unwrap(); 286 287 let loaded = store.load(&peer.key).unwrap().unwrap(); 288 assert_eq!(loaded.success_count, 1); 289 290 // Record another success 291 store.record_success(peer.key, peer.address, peer.coords.clone()).unwrap(); 292 let loaded = store.load(&peer.key).unwrap().unwrap(); 293 assert_eq!(loaded.success_count, 2); 294 295 // Record failure 296 store.record_failure(&peer.key).unwrap(); 297 let loaded = store.load(&peer.key).unwrap().unwrap(); 298 assert_eq!(loaded.fail_count, 1); 299 assert_eq!(loaded.success_count, 2); // Unchanged 300 } 301 302 #[test] 303 fn test_prune() { 304 let tmp = tempdir().unwrap(); 305 let db = sled::open(tmp.path()).unwrap(); 306 let tree = db.open_tree("peers").unwrap(); 307 let store = PeerStore::new(tree); 308 309 // Create a stale peer (old + many failures) 310 let mut stale = test_peer(1); 311 stale.last_seen = 0; // Very old 312 stale.fail_count = 10; 313 store.save(&stale).unwrap(); 314 315 // Create a healthy peer 316 let healthy = test_peer(2); 317 store.save(&healthy).unwrap(); 318 319 assert_eq!(store.count(), 2); 320 321 let pruned = store.prune().unwrap(); 322 assert_eq!(pruned, 1); 323 assert_eq!(store.count(), 1); 324 325 // Healthy peer should remain 326 assert!(store.load(&healthy.key).unwrap().is_some()); 327 assert!(store.load(&stale.key).unwrap().is_none()); 328 } 329 330 #[test] 331 fn test_persistence_across_restart() { 332 let tmp = tempdir().unwrap(); 333 let path = tmp.path().to_path_buf(); 334 335 let peer = test_peer(1); 336 337 // Write 338 { 339 let db = sled::open(&path).unwrap(); 340 let tree = db.open_tree("peers").unwrap(); 341 let store = PeerStore::new(tree); 342 store.save(&peer).unwrap(); 343 store.flush().unwrap(); 344 } 345 346 // Read after "restart" 347 { 348 let db = sled::open(&path).unwrap(); 349 let tree = db.open_tree("peers").unwrap(); 350 let store = PeerStore::new(tree); 351 let loaded = store.load(&peer.key).unwrap().unwrap(); 352 assert_eq!(loaded.address, peer.address); 353 } 354 } 355 }