/ abzu-dht / src / routing / table.rs
table.rs
  1  //! Routing Table — the full set of K-buckets
  2  //!
  3  //! The routing table maintains 256 buckets, each holding nodes at a specific
  4  //! XOR distance range from our own ID. This enables O(log n) lookups.
  5  
  6  use super::bucket::{BucketInsertResult, KBucket};
  7  use super::node::NodeInfo;
  8  use crate::key::{bucket_index, closer, DhtKey};
  9  
 10  /// Result of adding a node to the routing table
 11  #[derive(Debug, Clone)]
 12  pub enum AddResult {
 13      /// Node was added to the routing table
 14      Added,
 15      /// Existing node was updated
 16      Updated,
 17      /// A bad node was evicted to make room
 18      Evicted(NodeInfo),
 19      /// Bucket was full, node was cached for later
 20      Cached,
 21      /// Attempted to add our own node (rejected)
 22      SelfRejected,
 23  }
 24  
 25  /// Closest nodes result with iteration support
 26  #[derive(Debug, Clone)]
 27  pub struct ClosestNodes {
 28      /// Nodes sorted by distance to target
 29      pub nodes: Vec<NodeInfo>,
 30      /// The target key we searched for
 31      pub target: DhtKey,
 32  }
 33  
 34  impl ClosestNodes {
 35      /// Get up to `count` nodes
 36      pub fn take(&self, count: usize) -> Vec<&NodeInfo> {
 37          self.nodes.iter().take(count).collect()
 38      }
 39  
 40      /// Filter to only responsive nodes
 41      pub fn responsive(&self, now: u64, stale_after_secs: u64) -> Vec<&NodeInfo> {
 42          self.nodes
 43              .iter()
 44              .filter(|n| n.is_responsive(now, stale_after_secs))
 45              .collect()
 46      }
 47  }
 48  
 49  /// The full routing table
 50  #[derive(Debug)]
 51  pub struct RoutingTable {
 52      /// Our node's DHT ID
 53      our_id: DhtKey,
 54  
 55      /// 256 k-buckets (bucket 0 = most distant, bucket 255 = closest)
 56      buckets: Vec<KBucket>,
 57  
 58      /// Configuration
 59      config: RoutingTableConfig,
 60  }
 61  
 62  /// Configuration for routing table behavior
 63  #[derive(Debug, Clone)]
 64  pub struct RoutingTableConfig {
 65      /// Seconds before a node is considered stale
 66      pub stale_after_secs: u64,
 67      /// Seconds between pinging unresponsive nodes
 68      pub ping_interval_secs: u64,
 69      /// Seconds between bucket refresh lookups
 70      pub refresh_interval_secs: u64,
 71  }
 72  
 73  impl Default for RoutingTableConfig {
 74      fn default() -> Self {
 75          Self {
 76              stale_after_secs: 900,      // 15 minutes
 77              ping_interval_secs: 300,     // 5 minutes
 78              refresh_interval_secs: 3600, // 1 hour
 79          }
 80      }
 81  }
 82  
 83  impl RoutingTable {
 84      /// Create a new routing table for the given node ID
 85      pub fn new(our_id: DhtKey) -> Self {
 86          Self::with_config(our_id, RoutingTableConfig::default())
 87      }
 88  
 89      /// Create with custom configuration
 90      pub fn with_config(our_id: DhtKey, config: RoutingTableConfig) -> Self {
 91          let mut buckets = Vec::with_capacity(256);
 92          for _ in 0..256 {
 93              buckets.push(KBucket::new());
 94          }
 95  
 96          Self {
 97              our_id,
 98              buckets,
 99              config,
100          }
101      }
102  
103      /// Get our node ID
104      pub fn our_id(&self) -> &DhtKey {
105          &self.our_id
106      }
107  
108      /// Add a node to the routing table
109      pub fn add(&mut self, node: NodeInfo, now: u64) -> AddResult {
110          // Never add ourselves
111          if node.id == self.our_id {
112              return AddResult::SelfRejected;
113          }
114  
115          let bucket_idx = bucket_index(&self.our_id, &node.id);
116  
117          // Handle edge case: if bucket_idx is 256 (same key), reject
118          if bucket_idx >= 256 {
119              return AddResult::SelfRejected;
120          }
121  
122          match self.buckets[bucket_idx].insert(node, now) {
123              BucketInsertResult::Inserted => AddResult::Added,
124              BucketInsertResult::Updated => AddResult::Updated,
125              BucketInsertResult::Evicted(evicted) => AddResult::Evicted(evicted),
126              BucketInsertResult::Cached => AddResult::Cached,
127          }
128      }
129  
130      /// Get a node by ID
131      pub fn get(&self, id: &DhtKey) -> Option<&NodeInfo> {
132          if *id == self.our_id {
133              return None;
134          }
135  
136          let bucket_idx = bucket_index(&self.our_id, id);
137          if bucket_idx >= 256 {
138              return None;
139          }
140  
141          self.buckets[bucket_idx].get(id)
142      }
143  
144      /// Get mutable reference to a node
145      pub fn get_mut(&mut self, id: &DhtKey) -> Option<&mut NodeInfo> {
146          if *id == self.our_id {
147              return None;
148          }
149  
150          let bucket_idx = bucket_index(&self.our_id, id);
151          if bucket_idx >= 256 {
152              return None;
153          }
154  
155          self.buckets[bucket_idx].get_mut(id)
156      }
157  
158      /// Mark a node as successfully contacted
159      pub fn mark_good(&mut self, id: &DhtKey, now: u64, rtt_ms: Option<u16>) {
160          if let Some(node) = self.get_mut(id) {
161              node.mark_good(now, rtt_ms);
162          }
163      }
164  
165      /// Mark a node as failed, potentially evicting it
166      ///
167      /// Returns the promoted replacement node if one was promoted
168      pub fn mark_failed(&mut self, id: &DhtKey, now: u64) -> Option<NodeInfo> {
169          if *id == self.our_id {
170              return None;
171          }
172  
173          let bucket_idx = bucket_index(&self.our_id, id);
174          if bucket_idx >= 256 {
175              return None;
176          }
177  
178          self.buckets[bucket_idx].mark_failed(id, now)
179      }
180  
181      /// Remove a node from the routing table
182      pub fn remove(&mut self, id: &DhtKey) -> Option<NodeInfo> {
183          if *id == self.our_id {
184              return None;
185          }
186  
187          let bucket_idx = bucket_index(&self.our_id, id);
188          if bucket_idx >= 256 {
189              return None;
190          }
191  
192          self.buckets[bucket_idx].remove(id)
193      }
194  
195      /// Find the K closest nodes to a target
196      ///
197      /// This is the core operation for DHT lookups. Returns nodes
198      /// sorted by XOR distance to target.
199      pub fn closest(&self, target: &DhtKey, count: usize) -> ClosestNodes {
200          let mut candidates: Vec<NodeInfo> = Vec::new();
201  
202          // Start with the bucket containing the target
203          let target_bucket = bucket_index(&self.our_id, target).min(255);
204  
205          // Collect from target bucket first
206          candidates.extend(self.buckets[target_bucket].nodes().iter().cloned());
207  
208          // Expand outward from target bucket
209          let mut offset = 1;
210          while candidates.len() < count && offset < 256 {
211              // Check bucket closer to us (higher index)
212              let closer_idx = target_bucket.checked_add(offset);
213              if let Some(idx) = closer_idx
214                  && idx < 256 {
215                      candidates.extend(self.buckets[idx].nodes().iter().cloned());
216                  }
217  
218              // Check bucket farther from us (lower index)
219              let farther_idx = target_bucket.checked_sub(offset);
220              if let Some(idx) = farther_idx {
221                  candidates.extend(self.buckets[idx].nodes().iter().cloned());
222              }
223  
224              offset += 1;
225          }
226  
227          // Sort by distance to target
228          candidates.sort_by(|a, b| closer(target, &a.id, &b.id));
229  
230          // Truncate to requested count
231          candidates.truncate(count);
232  
233          ClosestNodes {
234              nodes: candidates,
235              target: *target,
236          }
237      }
238  
239      /// Find K closest routing-eligible nodes with Sybil defense
240      ///
241      /// Unlike `closest()`, this applies Sybil defense filters:
242      /// - Age threshold: only nodes older than `min_age_ms`
243      /// - Subnet diversity: max `max_per_subnet` nodes per /24 (0 = disabled)
244      /// - Trust-based sorting: higher routing_score = preferred
245      ///
246      /// Use this for outbound queries where security matters.
247      pub fn closest_eligible(
248          &self,
249          target: &DhtKey,
250          count: usize,
251          now: u64,
252          min_age_ms: u64,
253          max_per_subnet: usize,
254      ) -> ClosestNodes {
255          // Start with all candidates
256          let all_candidates = self.closest(target, count * 4); // Get extra to filter
257  
258          // Filter by age threshold
259          let mut eligible: Vec<NodeInfo> = all_candidates
260              .nodes
261              .into_iter()
262              .filter(|n| n.is_routing_eligible(now, min_age_ms))
263              .collect();
264  
265          // Sort by routing score (trust-based) then by distance
266          eligible.sort_by(|a, b| {
267              // Primary: higher routing score = better
268              let score_cmp = b.routing_score(now).cmp(&a.routing_score(now));
269              if score_cmp != std::cmp::Ordering::Equal {
270                  return score_cmp;
271              }
272              // Secondary: closer in XOR = better
273              crate::key::closer(target, &a.id, &b.id)
274          });
275  
276          // Apply subnet diversity limit
277          let mut selected = Vec::with_capacity(count);
278          let mut subnet_counts: std::collections::HashMap<[u8; 3], usize> =
279              std::collections::HashMap::new();
280  
281          for node in eligible {
282              // Check subnet limit (if enabled)
283              if max_per_subnet > 0
284                  && let Some(subnet) = node.subnet_key() {
285                      let count_in_subnet = subnet_counts.entry(subnet).or_insert(0);
286                      if *count_in_subnet >= max_per_subnet {
287                          continue; // Skip, subnet at capacity
288                      }
289                      *count_in_subnet += 1;
290                  }
291  
292              selected.push(node);
293              if selected.len() >= count {
294                  break;
295              }
296          }
297  
298          ClosestNodes {
299              nodes: selected,
300              target: *target,
301          }
302      }
303  
304      /// Get nodes that need to be pinged
305      pub fn nodes_needing_ping(&self, now: u64) -> Vec<NodeInfo> {
306          self.buckets
307              .iter()
308              .flat_map(|b| b.nodes_needing_ping(now, self.config.ping_interval_secs))
309              .cloned()
310              .collect()
311      }
312  
313      /// Get bucket indices that need refresh
314      pub fn buckets_needing_refresh(&self, now: u64) -> Vec<usize> {
315          self.buckets
316              .iter()
317              .enumerate()
318              .filter(|(_, b)| b.needs_refresh(now, self.config.refresh_interval_secs))
319              .map(|(i, _)| i)
320              .collect()
321      }
322  
323      /// Mark a bucket as refreshed
324      pub fn mark_bucket_refreshed(&mut self, bucket_idx: usize, now: u64) {
325          if bucket_idx < 256 {
326              self.buckets[bucket_idx].mark_refreshed(now);
327          }
328      }
329  
330      /// Generate a random key in a specific bucket range (for refresh)
331      ///
332      /// This is used to find new nodes in sparse buckets.
333      /// Returns a key that would fall into the specified bucket.
334      pub fn random_key_in_bucket(&self, bucket_idx: usize, random_bytes: &[u8; 32]) -> DhtKey {
335          let mut key = *random_bytes;
336  
337          // The bucket index is determined by the number of leading zeros in XOR.
338          // So we need to set bits such that XOR with our_id has exactly
339          // (255 - bucket_idx) leading zeros.
340  
341          // Start with our ID
342          for i in 0..32 {
343              key[i] = self.our_id[i] ^ random_bytes[i];
344          }
345  
346          // Ensure the first differing bit is at position (255 - bucket_idx)
347          let bit_pos = 255 - bucket_idx;
348          let byte_idx = bit_pos / 8;
349          let bit_in_byte = 7 - (bit_pos % 8);
350  
351          // Set all bytes before byte_idx to match our_id (XOR = 0)
352          key[..byte_idx].copy_from_slice(&self.our_id[..byte_idx]);
353  
354          // Set the target bit to 1 in the XOR (differ from our_id)
355          let mask = 1u8 << bit_in_byte;
356          key[byte_idx] = (self.our_id[byte_idx] & !mask) | ((!self.our_id[byte_idx]) & mask);
357  
358          key
359      }
360  
361      /// Total number of nodes in the routing table
362      pub fn len(&self) -> usize {
363          self.buckets.iter().map(|b| b.len()).sum()
364      }
365  
366      /// Check if routing table is empty
367      pub fn is_empty(&self) -> bool {
368          self.buckets.iter().all(|b| b.is_empty())
369      }
370  
371      /// Get all nodes (for iteration/debugging)
372      pub fn all_nodes(&self) -> impl Iterator<Item = &NodeInfo> {
373          self.buckets.iter().flat_map(|b| b.nodes())
374      }
375  
376      /// Count responsive nodes
377      pub fn responsive_count(&self, now: u64) -> usize {
378          self.buckets
379              .iter()
380              .flat_map(|b| b.nodes())
381              .filter(|n| n.is_responsive(now, self.config.stale_after_secs))
382              .count()
383      }
384  }
385  
386  #[cfg(test)]
387  mod tests {
388      use super::*;
389      use super::super::node::NodeState;
390      use std::cmp::Ordering;
391  
392      fn our_id() -> DhtKey {
393          [0u8; 32]
394      }
395  
396      fn make_node(byte: u8, now: u64) -> NodeInfo {
397          let pubkey = [byte; 32];
398          NodeInfo::new(pubkey, format!("addr-{byte}"), now)
399      }
400  
401      #[test]
402      fn test_add_and_get() {
403          let mut table = RoutingTable::new(our_id());
404          let node = make_node(0x42, 1000);
405          let id = node.id;
406  
407          table.add(node, 1000);
408          assert!(table.get(&id).is_some());
409      }
410  
411      #[test]
412      fn test_reject_self() {
413          let mut table = RoutingTable::new(our_id());
414          let mut node = make_node(0x01, 1000);
415          node.id = our_id();
416  
417          match table.add(node, 1000) {
418              AddResult::SelfRejected => {}
419              _ => panic!("Should reject self"),
420          }
421      }
422  
423      #[test]
424      fn test_closest_basic() {
425          let mut table = RoutingTable::new(our_id());
426  
427          // Add nodes with different distances
428          for byte in 1..20u8 {
429              table.add(make_node(byte, 1000), 1000);
430          }
431  
432          let target = [0x05u8; 32];
433          let closest = table.closest(&target, 5);
434  
435          assert_eq!(closest.nodes.len(), 5);
436  
437          // Verify they're sorted by distance
438          for i in 1..closest.nodes.len() {
439              assert!(
440                  closer(&target, &closest.nodes[i - 1].id, &closest.nodes[i].id)
441                      != Ordering::Greater
442              );
443          }
444      }
445  
446      #[test]
447      fn test_mark_good_and_failed() {
448          let mut table = RoutingTable::new(our_id());
449          let node = make_node(0x42, 1000);
450          let id = node.id;
451  
452          table.add(node, 1000);
453  
454          // Mark as good
455          table.mark_good(&id, 1500, Some(25));
456          assert_eq!(table.get(&id).unwrap().state, NodeState::Good);
457          assert_eq!(table.get(&id).unwrap().rtt_ms, Some(25));
458  
459          // Mark as failed repeatedly
460          for _ in 0..5 {
461              table.mark_failed(&id, 2000);
462          }
463  
464          // Node should be evicted
465          assert!(table.get(&id).is_none());
466      }
467  
468      #[test]
469      fn test_responsive_count() {
470          let mut table = RoutingTable::new(our_id());
471  
472          let mut good = make_node(0x01, 1000);
473          good.state = NodeState::Good;
474          table.add(good, 1000);
475  
476          let mut bad = make_node(0x02, 1000);
477          bad.state = NodeState::Bad;
478          table.add(bad, 1000);
479  
480          assert_eq!(table.responsive_count(1000), 1);
481      }
482  }