table.rs
1 //! Routing Table — the full set of K-buckets 2 //! 3 //! The routing table maintains 256 buckets, each holding nodes at a specific 4 //! XOR distance range from our own ID. This enables O(log n) lookups. 5 6 use super::bucket::{BucketInsertResult, KBucket}; 7 use super::node::NodeInfo; 8 use crate::key::{bucket_index, closer, DhtKey}; 9 10 /// Result of adding a node to the routing table 11 #[derive(Debug, Clone)] 12 pub enum AddResult { 13 /// Node was added to the routing table 14 Added, 15 /// Existing node was updated 16 Updated, 17 /// A bad node was evicted to make room 18 Evicted(NodeInfo), 19 /// Bucket was full, node was cached for later 20 Cached, 21 /// Attempted to add our own node (rejected) 22 SelfRejected, 23 } 24 25 /// Closest nodes result with iteration support 26 #[derive(Debug, Clone)] 27 pub struct ClosestNodes { 28 /// Nodes sorted by distance to target 29 pub nodes: Vec<NodeInfo>, 30 /// The target key we searched for 31 pub target: DhtKey, 32 } 33 34 impl ClosestNodes { 35 /// Get up to `count` nodes 36 pub fn take(&self, count: usize) -> Vec<&NodeInfo> { 37 self.nodes.iter().take(count).collect() 38 } 39 40 /// Filter to only responsive nodes 41 pub fn responsive(&self, now: u64, stale_after_secs: u64) -> Vec<&NodeInfo> { 42 self.nodes 43 .iter() 44 .filter(|n| n.is_responsive(now, stale_after_secs)) 45 .collect() 46 } 47 } 48 49 /// The full routing table 50 #[derive(Debug)] 51 pub struct RoutingTable { 52 /// Our node's DHT ID 53 our_id: DhtKey, 54 55 /// 256 k-buckets (bucket 0 = most distant, bucket 255 = closest) 56 buckets: Vec<KBucket>, 57 58 /// Configuration 59 config: RoutingTableConfig, 60 } 61 62 /// Configuration for routing table behavior 63 #[derive(Debug, Clone)] 64 pub struct RoutingTableConfig { 65 /// Seconds before a node is considered stale 66 pub stale_after_secs: u64, 67 /// Seconds between pinging unresponsive nodes 68 pub ping_interval_secs: u64, 69 /// Seconds between bucket refresh lookups 70 pub refresh_interval_secs: u64, 71 } 72 73 impl Default for RoutingTableConfig { 74 fn default() -> Self { 75 Self { 76 stale_after_secs: 900, // 15 minutes 77 ping_interval_secs: 300, // 5 minutes 78 refresh_interval_secs: 3600, // 1 hour 79 } 80 } 81 } 82 83 impl RoutingTable { 84 /// Create a new routing table for the given node ID 85 pub fn new(our_id: DhtKey) -> Self { 86 Self::with_config(our_id, RoutingTableConfig::default()) 87 } 88 89 /// Create with custom configuration 90 pub fn with_config(our_id: DhtKey, config: RoutingTableConfig) -> Self { 91 let mut buckets = Vec::with_capacity(256); 92 for _ in 0..256 { 93 buckets.push(KBucket::new()); 94 } 95 96 Self { 97 our_id, 98 buckets, 99 config, 100 } 101 } 102 103 /// Get our node ID 104 pub fn our_id(&self) -> &DhtKey { 105 &self.our_id 106 } 107 108 /// Add a node to the routing table 109 pub fn add(&mut self, node: NodeInfo, now: u64) -> AddResult { 110 // Never add ourselves 111 if node.id == self.our_id { 112 return AddResult::SelfRejected; 113 } 114 115 let bucket_idx = bucket_index(&self.our_id, &node.id); 116 117 // Handle edge case: if bucket_idx is 256 (same key), reject 118 if bucket_idx >= 256 { 119 return AddResult::SelfRejected; 120 } 121 122 match self.buckets[bucket_idx].insert(node, now) { 123 BucketInsertResult::Inserted => AddResult::Added, 124 BucketInsertResult::Updated => AddResult::Updated, 125 BucketInsertResult::Evicted(evicted) => AddResult::Evicted(evicted), 126 BucketInsertResult::Cached => AddResult::Cached, 127 } 128 } 129 130 /// Get a node by ID 131 pub fn get(&self, id: &DhtKey) -> Option<&NodeInfo> { 132 if *id == self.our_id { 133 return None; 134 } 135 136 let bucket_idx = bucket_index(&self.our_id, id); 137 if bucket_idx >= 256 { 138 return None; 139 } 140 141 self.buckets[bucket_idx].get(id) 142 } 143 144 /// Get mutable reference to a node 145 pub fn get_mut(&mut self, id: &DhtKey) -> Option<&mut NodeInfo> { 146 if *id == self.our_id { 147 return None; 148 } 149 150 let bucket_idx = bucket_index(&self.our_id, id); 151 if bucket_idx >= 256 { 152 return None; 153 } 154 155 self.buckets[bucket_idx].get_mut(id) 156 } 157 158 /// Mark a node as successfully contacted 159 pub fn mark_good(&mut self, id: &DhtKey, now: u64, rtt_ms: Option<u16>) { 160 if let Some(node) = self.get_mut(id) { 161 node.mark_good(now, rtt_ms); 162 } 163 } 164 165 /// Mark a node as failed, potentially evicting it 166 /// 167 /// Returns the promoted replacement node if one was promoted 168 pub fn mark_failed(&mut self, id: &DhtKey, now: u64) -> Option<NodeInfo> { 169 if *id == self.our_id { 170 return None; 171 } 172 173 let bucket_idx = bucket_index(&self.our_id, id); 174 if bucket_idx >= 256 { 175 return None; 176 } 177 178 self.buckets[bucket_idx].mark_failed(id, now) 179 } 180 181 /// Remove a node from the routing table 182 pub fn remove(&mut self, id: &DhtKey) -> Option<NodeInfo> { 183 if *id == self.our_id { 184 return None; 185 } 186 187 let bucket_idx = bucket_index(&self.our_id, id); 188 if bucket_idx >= 256 { 189 return None; 190 } 191 192 self.buckets[bucket_idx].remove(id) 193 } 194 195 /// Find the K closest nodes to a target 196 /// 197 /// This is the core operation for DHT lookups. Returns nodes 198 /// sorted by XOR distance to target. 199 pub fn closest(&self, target: &DhtKey, count: usize) -> ClosestNodes { 200 let mut candidates: Vec<NodeInfo> = Vec::new(); 201 202 // Start with the bucket containing the target 203 let target_bucket = bucket_index(&self.our_id, target).min(255); 204 205 // Collect from target bucket first 206 candidates.extend(self.buckets[target_bucket].nodes().iter().cloned()); 207 208 // Expand outward from target bucket 209 let mut offset = 1; 210 while candidates.len() < count && offset < 256 { 211 // Check bucket closer to us (higher index) 212 let closer_idx = target_bucket.checked_add(offset); 213 if let Some(idx) = closer_idx 214 && idx < 256 { 215 candidates.extend(self.buckets[idx].nodes().iter().cloned()); 216 } 217 218 // Check bucket farther from us (lower index) 219 let farther_idx = target_bucket.checked_sub(offset); 220 if let Some(idx) = farther_idx { 221 candidates.extend(self.buckets[idx].nodes().iter().cloned()); 222 } 223 224 offset += 1; 225 } 226 227 // Sort by distance to target 228 candidates.sort_by(|a, b| closer(target, &a.id, &b.id)); 229 230 // Truncate to requested count 231 candidates.truncate(count); 232 233 ClosestNodes { 234 nodes: candidates, 235 target: *target, 236 } 237 } 238 239 /// Find K closest routing-eligible nodes with Sybil defense 240 /// 241 /// Unlike `closest()`, this applies Sybil defense filters: 242 /// - Age threshold: only nodes older than `min_age_ms` 243 /// - Subnet diversity: max `max_per_subnet` nodes per /24 (0 = disabled) 244 /// - Trust-based sorting: higher routing_score = preferred 245 /// 246 /// Use this for outbound queries where security matters. 247 pub fn closest_eligible( 248 &self, 249 target: &DhtKey, 250 count: usize, 251 now: u64, 252 min_age_ms: u64, 253 max_per_subnet: usize, 254 ) -> ClosestNodes { 255 // Start with all candidates 256 let all_candidates = self.closest(target, count * 4); // Get extra to filter 257 258 // Filter by age threshold 259 let mut eligible: Vec<NodeInfo> = all_candidates 260 .nodes 261 .into_iter() 262 .filter(|n| n.is_routing_eligible(now, min_age_ms)) 263 .collect(); 264 265 // Sort by routing score (trust-based) then by distance 266 eligible.sort_by(|a, b| { 267 // Primary: higher routing score = better 268 let score_cmp = b.routing_score(now).cmp(&a.routing_score(now)); 269 if score_cmp != std::cmp::Ordering::Equal { 270 return score_cmp; 271 } 272 // Secondary: closer in XOR = better 273 crate::key::closer(target, &a.id, &b.id) 274 }); 275 276 // Apply subnet diversity limit 277 let mut selected = Vec::with_capacity(count); 278 let mut subnet_counts: std::collections::HashMap<[u8; 3], usize> = 279 std::collections::HashMap::new(); 280 281 for node in eligible { 282 // Check subnet limit (if enabled) 283 if max_per_subnet > 0 284 && let Some(subnet) = node.subnet_key() { 285 let count_in_subnet = subnet_counts.entry(subnet).or_insert(0); 286 if *count_in_subnet >= max_per_subnet { 287 continue; // Skip, subnet at capacity 288 } 289 *count_in_subnet += 1; 290 } 291 292 selected.push(node); 293 if selected.len() >= count { 294 break; 295 } 296 } 297 298 ClosestNodes { 299 nodes: selected, 300 target: *target, 301 } 302 } 303 304 /// Get nodes that need to be pinged 305 pub fn nodes_needing_ping(&self, now: u64) -> Vec<NodeInfo> { 306 self.buckets 307 .iter() 308 .flat_map(|b| b.nodes_needing_ping(now, self.config.ping_interval_secs)) 309 .cloned() 310 .collect() 311 } 312 313 /// Get bucket indices that need refresh 314 pub fn buckets_needing_refresh(&self, now: u64) -> Vec<usize> { 315 self.buckets 316 .iter() 317 .enumerate() 318 .filter(|(_, b)| b.needs_refresh(now, self.config.refresh_interval_secs)) 319 .map(|(i, _)| i) 320 .collect() 321 } 322 323 /// Mark a bucket as refreshed 324 pub fn mark_bucket_refreshed(&mut self, bucket_idx: usize, now: u64) { 325 if bucket_idx < 256 { 326 self.buckets[bucket_idx].mark_refreshed(now); 327 } 328 } 329 330 /// Generate a random key in a specific bucket range (for refresh) 331 /// 332 /// This is used to find new nodes in sparse buckets. 333 /// Returns a key that would fall into the specified bucket. 334 pub fn random_key_in_bucket(&self, bucket_idx: usize, random_bytes: &[u8; 32]) -> DhtKey { 335 let mut key = *random_bytes; 336 337 // The bucket index is determined by the number of leading zeros in XOR. 338 // So we need to set bits such that XOR with our_id has exactly 339 // (255 - bucket_idx) leading zeros. 340 341 // Start with our ID 342 for i in 0..32 { 343 key[i] = self.our_id[i] ^ random_bytes[i]; 344 } 345 346 // Ensure the first differing bit is at position (255 - bucket_idx) 347 let bit_pos = 255 - bucket_idx; 348 let byte_idx = bit_pos / 8; 349 let bit_in_byte = 7 - (bit_pos % 8); 350 351 // Set all bytes before byte_idx to match our_id (XOR = 0) 352 key[..byte_idx].copy_from_slice(&self.our_id[..byte_idx]); 353 354 // Set the target bit to 1 in the XOR (differ from our_id) 355 let mask = 1u8 << bit_in_byte; 356 key[byte_idx] = (self.our_id[byte_idx] & !mask) | ((!self.our_id[byte_idx]) & mask); 357 358 key 359 } 360 361 /// Total number of nodes in the routing table 362 pub fn len(&self) -> usize { 363 self.buckets.iter().map(|b| b.len()).sum() 364 } 365 366 /// Check if routing table is empty 367 pub fn is_empty(&self) -> bool { 368 self.buckets.iter().all(|b| b.is_empty()) 369 } 370 371 /// Get all nodes (for iteration/debugging) 372 pub fn all_nodes(&self) -> impl Iterator<Item = &NodeInfo> { 373 self.buckets.iter().flat_map(|b| b.nodes()) 374 } 375 376 /// Count responsive nodes 377 pub fn responsive_count(&self, now: u64) -> usize { 378 self.buckets 379 .iter() 380 .flat_map(|b| b.nodes()) 381 .filter(|n| n.is_responsive(now, self.config.stale_after_secs)) 382 .count() 383 } 384 } 385 386 #[cfg(test)] 387 mod tests { 388 use super::*; 389 use super::super::node::NodeState; 390 use std::cmp::Ordering; 391 392 fn our_id() -> DhtKey { 393 [0u8; 32] 394 } 395 396 fn make_node(byte: u8, now: u64) -> NodeInfo { 397 let pubkey = [byte; 32]; 398 NodeInfo::new(pubkey, format!("addr-{byte}"), now) 399 } 400 401 #[test] 402 fn test_add_and_get() { 403 let mut table = RoutingTable::new(our_id()); 404 let node = make_node(0x42, 1000); 405 let id = node.id; 406 407 table.add(node, 1000); 408 assert!(table.get(&id).is_some()); 409 } 410 411 #[test] 412 fn test_reject_self() { 413 let mut table = RoutingTable::new(our_id()); 414 let mut node = make_node(0x01, 1000); 415 node.id = our_id(); 416 417 match table.add(node, 1000) { 418 AddResult::SelfRejected => {} 419 _ => panic!("Should reject self"), 420 } 421 } 422 423 #[test] 424 fn test_closest_basic() { 425 let mut table = RoutingTable::new(our_id()); 426 427 // Add nodes with different distances 428 for byte in 1..20u8 { 429 table.add(make_node(byte, 1000), 1000); 430 } 431 432 let target = [0x05u8; 32]; 433 let closest = table.closest(&target, 5); 434 435 assert_eq!(closest.nodes.len(), 5); 436 437 // Verify they're sorted by distance 438 for i in 1..closest.nodes.len() { 439 assert!( 440 closer(&target, &closest.nodes[i - 1].id, &closest.nodes[i].id) 441 != Ordering::Greater 442 ); 443 } 444 } 445 446 #[test] 447 fn test_mark_good_and_failed() { 448 let mut table = RoutingTable::new(our_id()); 449 let node = make_node(0x42, 1000); 450 let id = node.id; 451 452 table.add(node, 1000); 453 454 // Mark as good 455 table.mark_good(&id, 1500, Some(25)); 456 assert_eq!(table.get(&id).unwrap().state, NodeState::Good); 457 assert_eq!(table.get(&id).unwrap().rtt_ms, Some(25)); 458 459 // Mark as failed repeatedly 460 for _ in 0..5 { 461 table.mark_failed(&id, 2000); 462 } 463 464 // Node should be evicted 465 assert!(table.get(&id).is_none()); 466 } 467 468 #[test] 469 fn test_responsive_count() { 470 let mut table = RoutingTable::new(our_id()); 471 472 let mut good = make_node(0x01, 1000); 473 good.state = NodeState::Good; 474 table.add(good, 1000); 475 476 let mut bad = make_node(0x02, 1000); 477 bad.state = NodeState::Bad; 478 table.add(bad, 1000); 479 480 assert_eq!(table.responsive_count(1000), 1); 481 } 482 }