/ abzu-core / src / persistence.rs
persistence.rs
  1  //! Peer Persistence
  2  //!
  3  //! Enables the "Submarine Test" by persisting known peers to disk.
  4  //! After a restart, nodes can reconnect to previously known peers
  5  //! without relying solely on bootstrap servers.
  6  
  7  use std::net::SocketAddr;
  8  use std::time::{SystemTime, UNIX_EPOCH};
  9  
 10  use serde::{Deserialize, Serialize};
 11  use sled::Tree;
 12  use thiserror::Error;
 13  use tracing::{debug, info, warn};
 14  
 15  use abzu_router::{PeerKey, TreeCoords};
 16  
 17  /// Persistence errors
 18  #[derive(Debug, Error)]
 19  pub enum PersistenceError {
 20      #[error("Storage error: {0}")]
 21      Storage(#[from] sled::Error),
 22  
 23      #[error("Serialization error: {0}")]
 24      Serialization(String),
 25  }
 26  
 27  /// A persistable peer record
 28  #[derive(Debug, Clone, Serialize, Deserialize)]
 29  pub struct PersistedPeer {
 30      /// Peer's public key
 31      pub key: PeerKey,
 32      /// Last known socket address
 33      pub address: SocketAddr,
 34      /// Last known tree coordinates
 35      pub coords: TreeCoords,
 36      /// Unix timestamp of last successful connection
 37      pub last_seen: u64,
 38      /// Number of successful connections
 39      pub success_count: u32,
 40      /// Number of failed connection attempts since last success
 41      pub fail_count: u32,
 42  }
 43  
 44  impl PersistedPeer {
 45      /// Create a new persisted peer record
 46      pub fn new(key: PeerKey, address: SocketAddr, coords: TreeCoords) -> Self {
 47          Self {
 48              key,
 49              address,
 50              coords,
 51              last_seen: current_timestamp(),
 52              success_count: 1,
 53              fail_count: 0,
 54          }
 55      }
 56  
 57      /// Record a successful connection
 58      pub fn mark_success(&mut self) {
 59          self.last_seen = current_timestamp();
 60          self.success_count = self.success_count.saturating_add(1);
 61          self.fail_count = 0;
 62      }
 63  
 64      /// Record a failed connection attempt
 65      pub fn mark_failure(&mut self) {
 66          self.fail_count = self.fail_count.saturating_add(1);
 67      }
 68  
 69      /// Check if this peer should be pruned (too many failures)
 70      pub fn should_prune(&self) -> bool {
 71          // Prune if 5+ consecutive failures and last seen > 7 days ago
 72          let week_ago = current_timestamp().saturating_sub(7 * 24 * 60 * 60);
 73          self.fail_count >= 5 && self.last_seen < week_ago
 74      }
 75  
 76      /// Calculate a priority score for reconnection (higher = try first)
 77      pub fn priority(&self) -> u64 {
 78          // Favor recently seen peers with high success counts
 79          let recency_score = self.last_seen / 3600; // Hours since epoch
 80          let success_score = (self.success_count as u64) * 100;
 81          let fail_penalty = (self.fail_count as u64) * 50;
 82          recency_score + success_score - fail_penalty
 83      }
 84  }
 85  
 86  /// Peer store backed by sled
 87  pub struct PeerStore {
 88      tree: Tree,
 89  }
 90  
 91  impl PeerStore {
 92      /// Create a new peer store from a sled tree
 93      pub fn new(tree: Tree) -> Self {
 94          Self { tree }
 95      }
 96  
 97      /// Save a peer (insert or update)
 98      pub fn save(&self, peer: &PersistedPeer) -> Result<(), PersistenceError> {
 99          let value = postcard::to_allocvec(peer)
100              .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
101          self.tree.insert(peer.key, value)?;
102          debug!(
103              peer = hex::encode(&peer.key[..8]),
104              addr = %peer.address,
105              "Persisted peer"
106          );
107          Ok(())
108      }
109  
110      /// Load a peer by key
111      pub fn load(&self, key: &PeerKey) -> Result<Option<PersistedPeer>, PersistenceError> {
112          match self.tree.get(key)? {
113              Some(data) => {
114                  let peer: PersistedPeer = postcard::from_bytes(&data)
115                      .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
116                  Ok(Some(peer))
117              }
118              None => Ok(None),
119          }
120      }
121  
122      /// Remove a peer
123      pub fn remove(&self, key: &PeerKey) -> Result<bool, PersistenceError> {
124          Ok(self.tree.remove(key)?.is_some())
125      }
126  
127      /// Load all persisted peers, sorted by priority (highest first)
128      pub fn load_all(&self) -> Result<Vec<PersistedPeer>, PersistenceError> {
129          let mut peers = Vec::new();
130          for item in self.tree.iter() {
131              let (_, value) = item?;
132              match postcard::from_bytes::<PersistedPeer>(&value) {
133                  Ok(peer) => peers.push(peer),
134                  Err(e) => warn!("Failed to deserialize peer: {}", e),
135              }
136          }
137          // Sort by priority (highest first)
138          peers.sort_by_key(|b| std::cmp::Reverse(b.priority()));
139          Ok(peers)
140      }
141  
142      /// Record a successful connection to a peer
143      pub fn record_success(
144          &self,
145          key: PeerKey,
146          address: SocketAddr,
147          coords: TreeCoords,
148      ) -> Result<(), PersistenceError> {
149          let peer = match self.load(&key)? {
150              Some(mut existing) => {
151                  existing.address = address;
152                  existing.coords = coords;
153                  existing.mark_success();
154                  existing
155              }
156              None => PersistedPeer::new(key, address, coords),
157          };
158          self.save(&peer)
159      }
160  
161      /// Record a failed connection attempt
162      pub fn record_failure(&self, key: &PeerKey) -> Result<(), PersistenceError> {
163          if let Some(mut peer) = self.load(key)? {
164              peer.mark_failure();
165              self.save(&peer)?;
166          }
167          Ok(())
168      }
169  
170      /// Prune stale peers (too many failures, too old)
171      pub fn prune(&self) -> Result<usize, PersistenceError> {
172          let mut pruned = 0;
173          let mut to_remove = Vec::new();
174  
175          for item in self.tree.iter() {
176              let (key, value) = item?;
177              if let Ok(peer) = postcard::from_bytes::<PersistedPeer>(&value)
178                  && peer.should_prune() {
179                      to_remove.push(key.to_vec());
180              }
181          }
182  
183          for key in to_remove {
184              self.tree.remove(key)?;
185              pruned += 1;
186          }
187  
188          if pruned > 0 {
189              info!(count = pruned, "Pruned stale peers");
190          }
191          Ok(pruned)
192      }
193  
194      /// Get peer count
195      pub fn count(&self) -> usize {
196          self.tree.len()
197      }
198  
199      /// Flush to disk
200      pub fn flush(&self) -> Result<(), PersistenceError> {
201          self.tree.flush()?;
202          Ok(())
203      }
204  }
205  
206  /// Get current Unix timestamp in seconds
207  fn current_timestamp() -> u64 {
208      SystemTime::now()
209          .duration_since(UNIX_EPOCH)
210          .unwrap_or_default()
211          .as_secs()
212  }
213  
214  /// Hex encode helper (used for logging)
215  mod hex {
216      pub fn encode(bytes: &[u8]) -> String {
217          bytes.iter().map(|b| format!("{:02x}", b)).collect()
218      }
219  }
220  
221  #[cfg(test)]
222  mod tests {
223      use super::*;
224      use std::net::{IpAddr, Ipv4Addr};
225      use tempfile::tempdir;
226  
227      fn test_peer(seed: u8) -> PersistedPeer {
228          let mut key = [seed; 32];
229          key[0] = seed;
230          let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, seed)), 8080 + seed as u16);
231          let coords = TreeCoords::from_path(vec![1, seed as u64]);
232          PersistedPeer::new(key, addr, coords)
233      }
234  
235      #[test]
236      fn test_save_load_peer() {
237          let tmp = tempdir().unwrap();
238          let db = sled::open(tmp.path()).unwrap();
239          let tree = db.open_tree("peers").unwrap();
240          let store = PeerStore::new(tree);
241  
242          let peer = test_peer(1);
243          store.save(&peer).unwrap();
244  
245          let loaded = store.load(&peer.key).unwrap().unwrap();
246          assert_eq!(loaded.key, peer.key);
247          assert_eq!(loaded.address, peer.address);
248      }
249  
250      #[test]
251      fn test_load_all_sorted() {
252          let tmp = tempdir().unwrap();
253          let db = sled::open(tmp.path()).unwrap();
254          let tree = db.open_tree("peers").unwrap();
255          let store = PeerStore::new(tree);
256  
257          // Save peers with different success counts
258          let mut peer1 = test_peer(1);
259          peer1.success_count = 10;
260          let mut peer2 = test_peer(2);
261          peer2.success_count = 5;
262          let mut peer3 = test_peer(3);
263          peer3.success_count = 20;
264  
265          store.save(&peer1).unwrap();
266          store.save(&peer2).unwrap();
267          store.save(&peer3).unwrap();
268  
269          let all = store.load_all().unwrap();
270          assert_eq!(all.len(), 3);
271          // Should be sorted by priority (peer3 highest success count)
272          assert_eq!(all[0].key[0], 3);
273          assert_eq!(all[1].key[0], 1);
274          assert_eq!(all[2].key[0], 2);
275      }
276  
277      #[test]
278      fn test_record_success_failure() {
279          let tmp = tempdir().unwrap();
280          let db = sled::open(tmp.path()).unwrap();
281          let tree = db.open_tree("peers").unwrap();
282          let store = PeerStore::new(tree);
283  
284          let peer = test_peer(1);
285          store.record_success(peer.key, peer.address, peer.coords.clone()).unwrap();
286  
287          let loaded = store.load(&peer.key).unwrap().unwrap();
288          assert_eq!(loaded.success_count, 1);
289  
290          // Record another success
291          store.record_success(peer.key, peer.address, peer.coords.clone()).unwrap();
292          let loaded = store.load(&peer.key).unwrap().unwrap();
293          assert_eq!(loaded.success_count, 2);
294  
295          // Record failure
296          store.record_failure(&peer.key).unwrap();
297          let loaded = store.load(&peer.key).unwrap().unwrap();
298          assert_eq!(loaded.fail_count, 1);
299          assert_eq!(loaded.success_count, 2); // Unchanged
300      }
301  
302      #[test]
303      fn test_prune() {
304          let tmp = tempdir().unwrap();
305          let db = sled::open(tmp.path()).unwrap();
306          let tree = db.open_tree("peers").unwrap();
307          let store = PeerStore::new(tree);
308  
309          // Create a stale peer (old + many failures)
310          let mut stale = test_peer(1);
311          stale.last_seen = 0; // Very old
312          stale.fail_count = 10;
313          store.save(&stale).unwrap();
314  
315          // Create a healthy peer
316          let healthy = test_peer(2);
317          store.save(&healthy).unwrap();
318  
319          assert_eq!(store.count(), 2);
320  
321          let pruned = store.prune().unwrap();
322          assert_eq!(pruned, 1);
323          assert_eq!(store.count(), 1);
324  
325          // Healthy peer should remain
326          assert!(store.load(&healthy.key).unwrap().is_some());
327          assert!(store.load(&stale.key).unwrap().is_none());
328      }
329  
330      #[test]
331      fn test_persistence_across_restart() {
332          let tmp = tempdir().unwrap();
333          let path = tmp.path().to_path_buf();
334  
335          let peer = test_peer(1);
336  
337          // Write
338          {
339              let db = sled::open(&path).unwrap();
340              let tree = db.open_tree("peers").unwrap();
341              let store = PeerStore::new(tree);
342              store.save(&peer).unwrap();
343              store.flush().unwrap();
344          }
345  
346          // Read after "restart"
347          {
348              let db = sled::open(&path).unwrap();
349              let tree = db.open_tree("peers").unwrap();
350              let store = PeerStore::new(tree);
351              let loaded = store.load(&peer.key).unwrap().unwrap();
352              assert_eq!(loaded.address, peer.address);
353          }
354      }
355  }