switchboard.rs
1 //! Switchboard 2 //! 3 //! Handles incoming frames and routes them to their destination. 4 //! This is the core routing logic that makes us a mesh router. 5 6 use std::collections::HashMap; 7 use std::sync::Arc; 8 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 9 10 use scalable_cuckoo_filter::ScalableCuckooFilterBuilder; 11 use rand_09::{rngs::StdRng, SeedableRng}; 12 13 use tokio::sync::{RwLock, oneshot}; 14 use tracing::{debug, info, trace, warn}; 15 16 use abzu_router::PeerKey; 17 use abzu_transport::{AbzuFrame, AbzuInterfaceExt, DhtRequestKind, DhtResponseKind, DhtNodeInfo, TransportError}; 18 19 // DHT imports for full integration 20 use abzu_dht::{ 21 RoutingTable, ValueStore, 22 routing::{NodeInfo as DhtNodeInfoFull, NodeState}, 23 store::{StoredValue, ValueType}, 24 security::{RateLimiter, RateLimitResult}, 25 NodeLookup, ValueLookup, 26 DhtKey, K, 27 }; 28 29 use crate::node::{Node, NodeError, ServiceEvent}; 30 31 /// Type alias for pending DHT operations map to reduce type complexity 32 type PendingDhtOps = HashMap<u64, (u64, oneshot::Sender<DhtResponseKind>)>; 33 34 /// Frame routing result 35 #[derive(Debug)] 36 pub enum RouteResult { 37 /// Frame is for us, was processed locally 38 Local, 39 /// Frame was forwarded to a peer 40 Forwarded(PeerKey), 41 /// No route found 42 NoRoute, 43 /// Error occurred 44 Error(NodeError), 45 } 46 47 /// Result of a DHT value lookup 48 #[derive(Debug)] 49 pub enum FindValueResult { 50 /// Value(s) found 51 Found(Vec<StoredValue>), 52 /// Value not found, returning closest nodes 53 NotFound(Vec<DhtNodeInfoFull>), 54 } 55 56 /// Internal result type for a single FindValue query 57 enum FindValueQueryResult { 58 /// Remote returned a value 59 Value(StoredValue), 60 /// Remote returned closer nodes 61 Nodes(Vec<DhtNodeInfoFull>), 62 } 63 64 /// Result of a DHT store operation 65 #[derive(Debug, Clone)] 66 pub struct StoreResult { 67 /// Whether the value was stored locally 68 pub stored_locally: bool, 69 /// Number of remote nodes that accepted the store 70 pub nodes_stored: usize, 71 /// Number of remote nodes that failed to store 72 pub nodes_failed: usize, 73 } 74 75 /// The Switchboard handles all incoming frames 76 pub struct Switchboard { 77 node: Arc<Node>, 78 /// Probabilistic seen-set for gossip deduplication using CuckooFilter 79 /// Uses dual-filter rotation for TTL-based eviction 80 seen_filter: Arc<RwLock<SeenFilter>>, 81 /// DHT routing table for peer discovery 82 dht_routing: Arc<RwLock<RoutingTable>>, 83 /// DHT value store for content/provider records 84 dht_values: Arc<RwLock<ValueStore>>, 85 /// Pending DHT operations awaiting responses: request_id -> (created_at, response sender) 86 /// Tuple stores creation timestamp for M4 expiration cleanup 87 pending_dht_ops: Arc<RwLock<PendingDhtOps>>, 88 /// Rate limiter for incoming DHT requests (H1 protection) 89 dht_rate_limiter: Arc<RwLock<RateLimiter>>, 90 } 91 92 /// TTL for seen messages (5 minutes) 93 const SEEN_MESSAGE_TTL_SECS: u64 = 300; 94 95 /// Rotation interval: half TTL (2.5 minutes) 96 /// Entries live for 1.5x to 2x this interval 97 const SEEN_FILTER_ROTATION_SECS: u64 = SEEN_MESSAGE_TTL_SECS / 2; 98 99 /// Initial capacity for each CuckooFilter (auto-scales as needed) 100 const SEEN_FILTER_INITIAL_CAPACITY: usize = 1000; 101 102 /// Target false positive rate for deduplication 103 /// 0.001 = 0.1% chance of incorrectly rejecting a unique message 104 const SEEN_FILTER_FP_RATE: f64 = 0.001; 105 106 // ============================================================================ 107 // SeenFilter: Probabilistic Deduplication with TTL 108 // ============================================================================ 109 110 /// Type alias for our Send+Sync compatible CuckooFilter. 111 /// Uses StdRng instead of ThreadRng (which is not Send). 112 /// Uses the crate's default SipHasher13 hasher. 113 type SendCuckooFilter = scalable_cuckoo_filter::ScalableCuckooFilter<MessageFingerprint, siphasher::sip::SipHasher13, StdRng>; 114 115 /// Create a new Send-compatible CuckooFilter. 116 fn new_cuckoo_filter() -> SendCuckooFilter { 117 ScalableCuckooFilterBuilder::new() 118 .initial_capacity(SEEN_FILTER_INITIAL_CAPACITY) 119 .false_positive_probability(SEEN_FILTER_FP_RATE) 120 .rng(StdRng::from_os_rng()) 121 .finish() 122 } 123 124 /// A hash-based message fingerprint for the CuckooFilter. 125 /// Combines circle_id and msg_id into a 40-byte key. 126 #[derive(Clone, PartialEq, Eq, Hash)] 127 struct MessageFingerprint { 128 circle_id: [u8; 32], 129 msg_id: u64, 130 } 131 132 impl MessageFingerprint { 133 fn new(circle_id: &[u8; 32], msg_id: u64) -> Self { 134 Self { 135 circle_id: *circle_id, 136 msg_id, 137 } 138 } 139 } 140 141 /// Dual-filter probabilistic seen-set with time-based rotation. 142 /// 143 /// ## Design 144 /// 145 /// Uses two CuckooFilters: 146 /// - **current**: Accepts new inserts, checked first 147 /// - **previous**: Read-only, holds entries from previous rotation window 148 /// 149 /// Every `SEEN_FILTER_ROTATION_SECS` seconds: 150 /// 1. Discard `previous` 151 /// 2. Move `current` → `previous` 152 /// 3. Create new empty `current` 153 /// 154 /// This gives entries an effective lifetime of 1.5x to 2x the rotation interval, 155 /// which maps to the original TTL behavior. 156 /// 157 /// ## Tradeoffs vs HashMap 158 /// 159 /// | Aspect | HashMap | CuckooFilter | 160 /// |--------|---------|-------------| 161 /// | Memory | O(n) | O(log n) | 162 /// | Lookup | O(1) exact | O(1) probabilistic | 163 /// | False positives | None | ~0.1% | 164 /// | False negatives | None | None | 165 /// 166 /// False positives mean we might drop a legitimate unique message that 167 /// hashes to the same fingerprint as a seen one. At 0.1% FP rate and 168 /// typical gossip volumes, this is acceptable. 169 pub struct SeenFilter { 170 current: SendCuckooFilter, 171 previous: Option<SendCuckooFilter>, 172 last_rotation: Instant, 173 } 174 175 impl SeenFilter { 176 /// Create a new dual-filter seen set. 177 pub fn new() -> Self { 178 Self { 179 current: new_cuckoo_filter(), 180 previous: None, 181 last_rotation: Instant::now(), 182 } 183 } 184 185 /// Check if a message has been seen in either filter. 186 pub fn contains(&self, circle_id: &[u8; 32], msg_id: u64) -> bool { 187 let fp = MessageFingerprint::new(circle_id, msg_id); 188 189 // Check current filter first (most likely hit) 190 if self.current.contains(&fp) { 191 return true; 192 } 193 194 // Fall back to previous filter 195 if let Some(ref prev) = self.previous { 196 return prev.contains(&fp); 197 } 198 199 false 200 } 201 202 /// Insert a message fingerprint into the current filter. 203 pub fn insert(&mut self, circle_id: &[u8; 32], msg_id: u64) { 204 let fp = MessageFingerprint::new(circle_id, msg_id); 205 self.current.insert(&fp); 206 } 207 208 /// Rotate filters if enough time has passed. 209 /// Returns true if rotation occurred. 210 pub fn maybe_rotate(&mut self) -> bool { 211 if self.last_rotation.elapsed().as_secs() >= SEEN_FILTER_ROTATION_SECS { 212 // Discard previous, promote current, create new 213 self.previous = Some(std::mem::replace( 214 &mut self.current, 215 new_cuckoo_filter(), 216 )); 217 self.last_rotation = Instant::now(); 218 true 219 } else { 220 false 221 } 222 } 223 224 /// Get approximate capacity for diagnostics. 225 pub fn capacity(&self) -> usize { 226 let current_cap = self.current.capacity(); 227 let prev_cap = self.previous.as_ref().map(|p| p.capacity()).unwrap_or(0); 228 current_cap + prev_cap 229 } 230 231 /// Force cleanup by discarding previous filter. 232 /// Use when memory pressure is high. 233 pub fn force_cleanup(&mut self) { 234 self.previous = None; 235 } 236 } 237 238 impl Default for SeenFilter { 239 fn default() -> Self { 240 Self::new() 241 } 242 } 243 244 /// Maximum onion routing depth before rejecting frame (DoS protection) 245 /// Prevents stack overflow from deeply nested Route frames. 246 const MAX_ROUTE_DEPTH: usize = 8; 247 248 // ============================================================================ 249 // DHT Type Conversion Helpers 250 // ============================================================================ 251 252 /// Get current unix timestamp in seconds 253 fn unix_now() -> u64 { 254 SystemTime::now() 255 .duration_since(UNIX_EPOCH) 256 .unwrap_or_default() 257 .as_secs() 258 } 259 260 /// Convert a wire-format DhtNodeInfo to a full domain NodeInfo 261 /// 262 /// The wire format is minimal (just key + addr bytes), so we populate 263 /// defaults for fields that aren't transmitted. 264 fn wire_to_domain_node(wire: &DhtNodeInfo, now: u64) -> DhtNodeInfoFull { 265 // Derive DHT key from the public key 266 let id = abzu_dht::key::node_id(&wire.key); 267 268 // Convert address bytes to string (assuming UTF-8 encoded address) 269 // PRIVACY: addr may be None for Ghost Mode nodes 270 let address = wire.addr 271 .as_ref() 272 .map(|a| String::from_utf8_lossy(a).to_string()) 273 .unwrap_or_default(); 274 275 DhtNodeInfoFull { 276 id, 277 pubkey: wire.key, 278 address, 279 tree_coords: None, // Wire format doesn't include tree coords 280 state: NodeState::Good, // Assume good if they're responding 281 failures: 0, 282 last_seen: now, 283 first_seen: now, // Will be updated if node already exists 284 rtt_ms: None, 285 } 286 } 287 288 /// Convert a domain NodeInfo to wire-format DhtNodeInfo 289 fn domain_to_wire_node(node: &DhtNodeInfoFull) -> DhtNodeInfo { 290 DhtNodeInfo { 291 key: node.pubkey, 292 // PRIVACY: Use None for empty addresses (Ghost Mode nodes) 293 addr: if node.address.is_empty() { 294 None 295 } else { 296 Some(node.address.as_bytes().to_vec()) 297 }, 298 relay_descriptor: None, // TODO: Populate for Ghost Mode relay routing 299 } 300 } 301 302 /// Convert a PeerKey ([u8; 32]) to a hex string for address storage 303 fn peer_key_to_hex(key: &[u8; 32]) -> String { 304 key.iter().map(|b| format!("{:02x}", b)).collect() 305 } 306 307 308 impl Switchboard { 309 /// Create a new switchboard for a node 310 pub fn new(node: Arc<Node>) -> Self { 311 // Derive DHT key from our node's public key 312 let our_key: DhtKey = abzu_dht::key::node_id(&node.peer_key()); 313 314 Self { 315 node, 316 seen_filter: Arc::new(RwLock::new(SeenFilter::new())), 317 dht_routing: Arc::new(RwLock::new(RoutingTable::new(our_key))), 318 dht_values: Arc::new(RwLock::new(ValueStore::new())), 319 pending_dht_ops: Arc::new(RwLock::new(HashMap::new())), 320 dht_rate_limiter: Arc::new(RwLock::new(RateLimiter::new())), 321 } 322 } 323 324 /// Generate a unique request ID for DHT operations 325 /// 326 /// SECURITY: Uses random IDs to prevent response hijacking (H4 in audit). 327 /// Sequential counters are predictable; attackers could pre-compute future IDs. 328 fn next_request_id(&self) -> u64 { 329 use rand::Rng; 330 rand::thread_rng().r#gen() 331 } 332 333 /// Send a DHT request to a target peer and await the response. 334 /// 335 /// This is the primitive for active DHT operations. It: 336 /// 1. Generates a unique request ID 337 /// 2. Registers a pending operation with a oneshot channel 338 /// 3. Returns the request_id and receiver for the caller to use 339 /// 340 /// The caller is responsible for: 341 /// - Sending the frame via the transport layer 342 /// - Awaiting the receiver with appropriate timeout 343 /// - Handling timeout cleanup 344 /// 345 /// Returns (request_id, response_receiver), or None if too many pending ops (H3 protection) 346 pub async fn register_dht_op(&self) -> Option<(u64, oneshot::Receiver<DhtResponseKind>)> { 347 // SECURITY: Limit pending operations to prevent memory exhaustion (H3 in audit). 348 const MAX_PENDING_OPS: usize = 1000; 349 350 let request_id = self.next_request_id(); 351 let (tx, rx) = oneshot::channel(); 352 let now = unix_now(); 353 354 { 355 let mut pending = self.pending_dht_ops.write().await; 356 if pending.len() >= MAX_PENDING_OPS { 357 warn!( 358 pending_count = pending.len(), 359 "Too many pending DHT operations, rejecting new operation" 360 ); 361 return None; 362 } 363 pending.insert(request_id, (now, tx)); 364 } 365 366 trace!(request_id, "Registered pending DHT operation"); 367 Some((request_id, rx)) 368 } 369 370 /// Cancel a pending DHT operation (e.g., on timeout). 371 /// 372 /// Removes the pending operation from the registry without completing it. 373 pub async fn cancel_dht_op(&self, request_id: u64) { 374 let mut pending = self.pending_dht_ops.write().await; 375 if pending.remove(&request_id).is_some() { 376 debug!(request_id, "Cancelled pending DHT operation"); 377 } 378 } 379 380 /// SECURITY: Reap expired pending DHT operations (M4 in audit). 381 /// 382 /// Removes operations older than the specified timeout to prevent memory leaks 383 /// from failed/orphaned operations that never received a response. 384 /// Returns the count of reaped operations. 385 pub async fn reap_expired_dht_ops(&self, max_age_secs: u64) -> usize { 386 let now = unix_now(); 387 let mut pending = self.pending_dht_ops.write().await; 388 let before_count = pending.len(); 389 390 pending.retain(|_request_id, (created_at, _sender)| { 391 now.saturating_sub(*created_at) < max_age_secs 392 }); 393 394 let reaped = before_count.saturating_sub(pending.len()); 395 if reaped > 0 { 396 debug!(reaped, remaining = pending.len(), "Reaped expired DHT operations"); 397 } 398 reaped 399 } 400 401 /// Get our DHT node key (derived from peer key). 402 pub fn our_dht_key(&self) -> DhtKey { 403 abzu_dht::key::node_id(&self.node.peer_key()) 404 } 405 406 /// Get our peer key bytes for DHT messages. 407 pub fn our_peer_key(&self) -> [u8; 32] { 408 self.node.peer_key() 409 } 410 411 /// Add a connected peer as a DHT seed node. 412 /// 413 /// This seeds the routing table with the peer so DHT queries 414 /// can use it as an initial contact point. 415 pub async fn add_dht_seed(&self, peer_key: PeerKey) { 416 let id = abzu_dht::key::node_id(&peer_key); 417 let now = unix_now(); 418 let node_info = DhtNodeInfoFull { 419 id, 420 pubkey: peer_key, 421 address: String::new(), // Will be filled in when we have transport info 422 tree_coords: None, 423 state: NodeState::Good, 424 failures: 0, 425 last_seen: now, 426 first_seen: now, 427 rtt_ms: None, 428 }; 429 430 let mut rt = self.dht_routing.write().await; 431 rt.add(node_info, now); 432 debug!(peer = ?&peer_key[..8], "Added DHT seed node"); 433 } 434 435 /// Send a frame to a specific peer. 436 /// 437 /// Returns Ok(()) if the frame was sent, or an error if the peer is not connected 438 /// or the send failed. 439 pub async fn send_frame_to_peer(&self, peer_key: &PeerKey, frame: &AbzuFrame) -> Result<(), NodeError> { 440 use abzu_transport::AbzuInterfaceExt; 441 442 let peers_arc = self.node.peers(); 443 let mut peers = peers_arc.lock().await; 444 445 if let Some(conn) = peers.get_mut(peer_key) { 446 match conn.interface.send_frame(frame).await { 447 Ok(()) => { 448 conn.touch(); 449 Ok(()) 450 } 451 Err(e) => Err(NodeError::Transport(e)), 452 } 453 } else { 454 Err(NodeError::NoPeerConnection) 455 } 456 } 457 458 /// Send a DHT ping to a peer and await the response. 459 /// 460 /// This is the simplest active DHT operation. It verifies the peer is reachable 461 /// and updates the routing table on success. 462 /// 463 /// Returns Ok(()) if the peer responded with Pong, or an error on timeout/failure. 464 pub async fn dht_ping(&self, peer_key: &PeerKey, timeout_ms: u64) -> Result<(), NodeError> { 465 use std::time::Duration; 466 467 // Register the pending operation 468 let (request_id, rx) = self.register_dht_op().await 469 .ok_or(NodeError::OperationRejected)?; 470 471 // Construct and send the ping frame 472 let frame = AbzuFrame::dht_ping(request_id, self.our_peer_key()); 473 if let Err(e) = self.send_frame_to_peer(peer_key, &frame).await { 474 self.cancel_dht_op(request_id).await; 475 return Err(e); 476 } 477 478 debug!(request_id, peer = ?&peer_key[..8], "DHT Ping sent"); 479 480 // Await response with timeout 481 let timeout = Duration::from_millis(timeout_ms); 482 match tokio::time::timeout(timeout, rx).await { 483 Ok(Ok(DhtResponseKind::Pong)) => { 484 trace!(request_id, "DHT Ping succeeded"); 485 Ok(()) 486 } 487 Ok(Ok(DhtResponseKind::Error { code, message })) => { 488 warn!(request_id, code, %message, "DHT Ping failed with error"); 489 Err(NodeError::DhtError(format!("DHT error {}: {}", code, message))) 490 } 491 Ok(Ok(_)) => { 492 warn!(request_id, "DHT Ping received unexpected response type"); 493 Err(NodeError::DhtError("Unexpected response type".to_string())) 494 } 495 Ok(Err(_)) => { 496 // Channel closed - response handler dropped the sender 497 Err(NodeError::DhtError("Response channel closed".to_string())) 498 } 499 Err(_) => { 500 // Timeout - clean up and return error 501 self.cancel_dht_op(request_id).await; 502 debug!(request_id, "DHT Ping timed out"); 503 Err(NodeError::Timeout) 504 } 505 } 506 } 507 508 /// Find the K closest nodes to a target key using iterative lookup. 509 /// 510 /// This drives the NodeLookup state machine, sending parallel FindNode 511 /// requests and processing responses until the lookup converges. 512 /// 513 /// Returns the K closest nodes discovered (may be fewer if network is sparse). 514 pub async fn dht_find_node( 515 &self, 516 target: &DhtKey, 517 timeout_ms: u64, 518 ) -> Result<Vec<DhtNodeInfoFull>, NodeError> { 519 use futures::future::join_all; 520 use std::time::Duration; 521 522 let timeout = Duration::from_millis(timeout_ms); 523 let now = unix_now(); 524 525 // Get our node ID and seed nodes from local routing table 526 let our_id = self.our_dht_key(); 527 let seeds: Vec<DhtNodeInfoFull> = { 528 let rt = self.dht_routing.read().await; 529 rt.closest(target, K).nodes 530 }; 531 532 if seeds.is_empty() { 533 debug!("No seed nodes for lookup"); 534 return Ok(Vec::new()); 535 } 536 537 debug!(target = ?&target[..8], seeds = seeds.len(), "Starting node lookup"); 538 539 // Create the lookup state machine 540 let mut lookup = NodeLookup::new(*target, our_id, seeds.clone()); 541 542 // Iterative lookup loop 543 loop { 544 // Get next batch of nodes to query 545 let to_query = lookup.next_queries(); 546 547 if to_query.is_empty() { 548 // No more queries needed - lookup is complete 549 break; 550 } 551 552 trace!(querying = to_query.len(), "Sending FindNode batch"); 553 554 // Send all queries in parallel, collect results 555 let query_futures: Vec<_> = to_query.iter().map(|node| { 556 self.send_find_node_query(node, target, timeout) 557 }).collect(); 558 559 let results = join_all(query_futures).await; 560 561 // Process each result 562 for (node, result) in to_query.iter().zip(results) { 563 match result { 564 Ok(nodes) => { 565 // Success - feed nodes back to lookup 566 lookup.on_response(&node.id, nodes.clone()); 567 568 // Update routing table with responding node 569 let mut rt = self.dht_routing.write().await; 570 if let Some(entry) = rt.get_mut(&node.id) { 571 entry.mark_good(now, None); 572 } 573 574 // Also add newly discovered nodes 575 for discovered in nodes { 576 let _ = rt.add(discovered, now); 577 } 578 } 579 Err(e) => { 580 // Failure - mark node as failed 581 trace!(node = ?&node.id[..8], error = %e, "Query failed"); 582 lookup.on_failure(&node.id); 583 584 // Update routing table failure count 585 let mut rt = self.dht_routing.write().await; 586 if let Some(entry) = rt.get_mut(&node.id) { 587 entry.mark_failed(); 588 } 589 } 590 } 591 } 592 } 593 594 // Extract the K closest nodes that responded 595 let closest: Vec<DhtNodeInfoFull> = lookup.closest_responding() 596 .into_iter() 597 .cloned() 598 .collect(); 599 600 debug!(found = closest.len(), "Node lookup complete"); 601 Ok(closest) 602 } 603 604 /// Find a value by key, or the K closest nodes if not found. 605 /// 606 /// This drives the ValueLookup state machine, returning early if 607 /// the value is discovered, or continuing until the lookup converges. 608 pub async fn dht_find_value( 609 &self, 610 target: &DhtKey, 611 timeout_ms: u64, 612 ) -> Result<FindValueResult, NodeError> { 613 use futures::future::join_all; 614 use std::time::Duration; 615 616 let timeout = Duration::from_millis(timeout_ms); 617 let now = unix_now(); 618 619 // Check local store first 620 { 621 let store = self.dht_values.read().await; 622 let values = store.get(target, now); 623 if !values.is_empty() { 624 debug!(target = ?&target[..8], "Value found locally"); 625 return Ok(FindValueResult::Found(values.into_iter().cloned().collect())); 626 } 627 } 628 629 // ROLE CHECK: Edge nodes can only query local cache, not network 630 if !self.node.node_role().participates_in_dht() { 631 debug!("Edge node: value not in local cache, skipping network lookup"); 632 return Ok(FindValueResult::NotFound(Vec::new())); 633 } 634 635 // Get our node ID and seed nodes 636 let our_id = self.our_dht_key(); 637 let seeds: Vec<DhtNodeInfoFull> = { 638 let rt = self.dht_routing.read().await; 639 rt.closest(target, K).nodes 640 }; 641 642 if seeds.is_empty() { 643 debug!("No seed nodes for value lookup"); 644 return Ok(FindValueResult::NotFound(Vec::new())); 645 } 646 647 debug!(target = ?&target[..8], seeds = seeds.len(), "Starting value lookup"); 648 649 // Create the value lookup state machine 650 let mut lookup = ValueLookup::new(*target, our_id, seeds); 651 652 // Iterative lookup loop 653 loop { 654 let to_query = lookup.next_queries(); 655 656 if to_query.is_empty() { 657 break; 658 } 659 660 trace!(querying = to_query.len(), "Sending FindValue batch"); 661 662 let query_futures: Vec<_> = to_query.iter().map(|node| { 663 self.send_find_value_query(node, target, timeout) 664 }).collect(); 665 666 let results = join_all(query_futures).await; 667 668 for (node, result) in to_query.iter().zip(results) { 669 match result { 670 Ok(FindValueQueryResult::Value(value)) => { 671 // Got a value! Feed to lookup and cache locally 672 lookup.on_value(&node.id, vec![value.clone()], Vec::new()); 673 674 // Cache the value 675 let mut store = self.dht_values.write().await; 676 let _ = store.store(value, now); 677 678 // Update routing table 679 let mut rt = self.dht_routing.write().await; 680 if let Some(entry) = rt.get_mut(&node.id) { 681 entry.mark_good(now, None); 682 } 683 } 684 Ok(FindValueQueryResult::Nodes(nodes)) => { 685 // Got nodes, continue lookup 686 lookup.on_nodes(&node.id, nodes.clone()); 687 688 // Update routing table 689 let mut rt = self.dht_routing.write().await; 690 if let Some(entry) = rt.get_mut(&node.id) { 691 entry.mark_good(now, None); 692 } 693 for discovered in nodes { 694 let _ = rt.add(discovered, now); 695 } 696 } 697 Err(e) => { 698 trace!(node = ?&node.id[..8], error = %e, "Query failed"); 699 lookup.on_failure(&node.id); 700 701 let mut rt = self.dht_routing.write().await; 702 if let Some(entry) = rt.get_mut(&node.id) { 703 entry.mark_failed(); 704 } 705 } 706 } 707 } 708 709 // Check if we found a value 710 if !lookup.values().is_empty() { 711 break; 712 } 713 } 714 715 // Return results 716 let values = lookup.values().to_vec(); 717 if !values.is_empty() { 718 debug!(count = values.len(), "Value(s) found"); 719 Ok(FindValueResult::Found(values)) 720 } else { 721 let closest: Vec<DhtNodeInfoFull> = lookup.closest_responding() 722 .into_iter() 723 .cloned() 724 .collect(); 725 debug!(nodes = closest.len(), "Value not found, returning closest nodes"); 726 Ok(FindValueResult::NotFound(closest)) 727 } 728 } 729 730 /// Send a single FindNode query to a peer and await the response. 731 async fn send_find_node_query( 732 &self, 733 node: &DhtNodeInfoFull, 734 target: &DhtKey, 735 timeout: std::time::Duration, 736 ) -> Result<Vec<DhtNodeInfoFull>, NodeError> { 737 // Register the pending operation 738 let (request_id, rx) = self.register_dht_op().await 739 .ok_or(NodeError::OperationRejected)?; 740 741 // Construct the FindNode frame 742 let frame = AbzuFrame::dht_find_node(request_id, self.our_peer_key(), *target); 743 744 // Send to peer (by pubkey) 745 if let Err(e) = self.send_frame_to_peer(&node.pubkey, &frame).await { 746 self.cancel_dht_op(request_id).await; 747 return Err(e); 748 } 749 750 // Await response 751 match tokio::time::timeout(timeout, rx).await { 752 Ok(Ok(DhtResponseKind::Nodes { nodes: wire_nodes })) => { 753 let now = unix_now(); 754 let nodes: Vec<DhtNodeInfoFull> = wire_nodes 755 .into_iter() 756 .map(|n| wire_to_domain_node(&n, now)) 757 .collect(); 758 Ok(nodes) 759 } 760 Ok(Ok(DhtResponseKind::Error { code, message })) => { 761 Err(NodeError::DhtError(format!("DHT error {}: {}", code, message))) 762 } 763 Ok(Ok(_)) => { 764 Err(NodeError::DhtError("Unexpected response type".to_string())) 765 } 766 Ok(Err(_)) => { 767 Err(NodeError::DhtError("Response channel closed".to_string())) 768 } 769 Err(_) => { 770 self.cancel_dht_op(request_id).await; 771 Err(NodeError::Timeout) 772 } 773 } 774 } 775 776 /// Send a single FindValue query to a peer and await the response. 777 async fn send_find_value_query( 778 &self, 779 node: &DhtNodeInfoFull, 780 target: &DhtKey, 781 timeout: std::time::Duration, 782 ) -> Result<FindValueQueryResult, NodeError> { 783 let (request_id, rx) = self.register_dht_op().await 784 .ok_or(NodeError::OperationRejected)?; 785 786 let frame = AbzuFrame::dht_find_value(request_id, self.our_peer_key(), *target); 787 788 if let Err(e) = self.send_frame_to_peer(&node.pubkey, &frame).await { 789 self.cancel_dht_op(request_id).await; 790 return Err(e); 791 } 792 793 match tokio::time::timeout(timeout, rx).await { 794 Ok(Ok(DhtResponseKind::Nodes { nodes: wire_nodes })) => { 795 let now = unix_now(); 796 let nodes: Vec<DhtNodeInfoFull> = wire_nodes 797 .into_iter() 798 .map(|n| wire_to_domain_node(&n, now)) 799 .collect(); 800 Ok(FindValueQueryResult::Nodes(nodes)) 801 } 802 Ok(Ok(DhtResponseKind::Value { value: data, publisher, ttl_remaining_secs })) => { 803 // Construct StoredValue from wire response 804 // We use Application type for generic values; caller can interpret payload 805 let value = StoredValue::new( 806 *target, 807 ValueType::Application, 808 publisher, 809 data, 810 unix_now(), 811 Some(ttl_remaining_secs as u64), 812 ); 813 Ok(FindValueQueryResult::Value(value)) 814 } 815 Ok(Ok(DhtResponseKind::Error { code, message })) => { 816 Err(NodeError::DhtError(format!("DHT error {}: {}", code, message))) 817 } 818 Ok(Ok(_)) => { 819 Err(NodeError::DhtError("Unexpected response type".to_string())) 820 } 821 Ok(Err(_)) => { 822 Err(NodeError::DhtError("Response channel closed".to_string())) 823 } 824 Err(_) => { 825 self.cancel_dht_op(request_id).await; 826 Err(NodeError::Timeout) 827 } 828 } 829 } 830 831 /// Store a value in the DHT network. 832 /// 833 /// Stores locally first, then replicates to K closest nodes. 834 /// Returns a StoreResult indicating success/failure counts. 835 pub async fn dht_store( 836 &self, 837 value: StoredValue, 838 timeout_ms: u64, 839 ) -> Result<StoreResult, NodeError> { 840 let target = value.key; 841 let timeout = Duration::from_millis(timeout_ms); 842 let now = unix_now(); 843 844 // Step 1: Store locally 845 let stored_locally = { 846 let mut store = self.dht_values.write().await; 847 store.store(value.clone(), now).is_ok() 848 }; 849 850 if stored_locally { 851 debug!(key = ?&target[..8], "Value stored locally"); 852 } 853 854 // ROLE CHECK: Edge nodes store locally only, no network replication 855 if !self.node.node_role().participates_in_dht() { 856 debug!("Edge node: stored locally, skipping network replication"); 857 return Ok(StoreResult { 858 stored_locally, 859 nodes_stored: 0, 860 nodes_failed: 0, 861 }); 862 } 863 864 // Step 2: Find K closest nodes 865 let closest_nodes = { 866 let rt = self.dht_routing.read().await; 867 rt.closest(&target, K) 868 }; 869 870 if closest_nodes.nodes.is_empty() { 871 // No nodes to replicate to - local only 872 return Ok(StoreResult { 873 stored_locally, 874 nodes_stored: 0, 875 nodes_failed: 0, 876 }); 877 } 878 879 debug!( 880 key = ?&target[..8], 881 nodes = closest_nodes.nodes.len(), 882 "Replicating value to closest nodes" 883 ); 884 885 // Step 3: Send STORE to all closest nodes in parallel 886 let store_futures: Vec<_> = closest_nodes.nodes 887 .iter() 888 .map(|node| self.send_store_to_node(node, &value, timeout)) 889 .collect(); 890 891 let results = futures::future::join_all(store_futures).await; 892 893 // Step 4: Count successes and failures 894 let mut nodes_stored = 0usize; 895 let mut nodes_failed = 0usize; 896 897 for result in results { 898 match result { 899 Ok(true) => nodes_stored += 1, 900 Ok(false) => nodes_failed += 1, 901 Err(_) => nodes_failed += 1, 902 } 903 } 904 905 debug!( 906 key = ?&target[..8], 907 stored = nodes_stored, 908 failed = nodes_failed, 909 "DHT store complete" 910 ); 911 912 Ok(StoreResult { 913 stored_locally, 914 nodes_stored, 915 nodes_failed, 916 }) 917 } 918 919 /// Send a STORE request to a single node and await acknowledgment. 920 async fn send_store_to_node( 921 &self, 922 node: &DhtNodeInfoFull, 923 value: &StoredValue, 924 timeout: Duration, 925 ) -> Result<bool, NodeError> { 926 let (request_id, rx) = self.register_dht_op().await 927 .ok_or(NodeError::OperationRejected)?; 928 929 // Calculate TTL remaining 930 let now = unix_now(); 931 let ttl_secs = value.metadata.expires_at.saturating_sub(now) as u32; 932 933 let frame = AbzuFrame::dht_store( 934 request_id, 935 self.our_peer_key(), 936 value.key, 937 value.payload.clone(), 938 ttl_secs, 939 value.signature.to_vec(), 940 ); 941 942 if let Err(e) = self.send_frame_to_peer(&node.pubkey, &frame).await { 943 self.cancel_dht_op(request_id).await; 944 return Err(e); 945 } 946 947 match tokio::time::timeout(timeout, rx).await { 948 Ok(Ok(DhtResponseKind::StoreAck { success })) => { 949 Ok(success) 950 } 951 Ok(Ok(DhtResponseKind::Error { code, message })) => { 952 warn!(node = ?&node.id[..8], code, message, "Store rejected"); 953 Ok(false) 954 } 955 Ok(Ok(_)) => { 956 Ok(false) // Unexpected response 957 } 958 Ok(Err(_)) => { 959 Err(NodeError::DhtError("Response channel closed".to_string())) 960 } 961 Err(_) => { 962 self.cancel_dht_op(request_id).await; 963 Err(NodeError::Timeout) 964 } 965 } 966 } 967 968 /// Refresh (republish) values that are approaching expiration. 969 /// 970 /// Finds values published by this node that are below the TTL threshold 971 /// and re-replicates them to K closest nodes. 972 /// 973 /// Returns the number of values successfully refreshed. 974 pub async fn dht_refresh_values(&self, threshold_secs: u64, timeout_ms: u64) -> usize { 975 let our_key = self.our_peer_key(); 976 let now = unix_now(); 977 978 // Get values needing refresh 979 let values_to_refresh = { 980 let store = self.dht_values.read().await; 981 store.get_values_needing_refresh(&our_key, threshold_secs, now) 982 }; 983 984 if values_to_refresh.is_empty() { 985 return 0; 986 } 987 988 debug!( 989 count = values_to_refresh.len(), 990 threshold_secs, 991 "Refreshing DHT values" 992 ); 993 994 let mut refreshed = 0; 995 996 for value in values_to_refresh { 997 match self.dht_store(value, timeout_ms).await { 998 Ok(result) if result.nodes_stored > 0 => { 999 refreshed += 1; 1000 } 1001 Ok(_) => { 1002 // Stored locally but no remote nodes - still counts 1003 refreshed += 1; 1004 } 1005 Err(e) => { 1006 warn!(error = %e, "Failed to refresh value"); 1007 } 1008 } 1009 } 1010 1011 info!(refreshed, "DHT value refresh complete"); 1012 refreshed 1013 } 1014 1015 /// Spawn a background task to periodically refresh values. 1016 /// 1017 /// The task runs until the shutdown signal is received or the JoinHandle is aborted. 1018 /// SECURITY: Also runs pending ops reaper (M4) and accepts graceful shutdown (M6 in audit). 1019 pub fn start_refresh_task( 1020 self: Arc<Self>, 1021 interval_secs: u64, 1022 threshold_secs: u64, 1023 shutdown_rx: oneshot::Receiver<()>, 1024 ) -> tokio::task::JoinHandle<()> { 1025 let timeout_ms = 10_000; // 10s timeout per store 1026 const PENDING_OP_MAX_AGE_SECS: u64 = 120; // 2x typical timeout (M4) 1027 1028 tokio::spawn(async move { 1029 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs)); 1030 interval.tick().await; // Skip immediate first tick 1031 1032 tokio::pin!(shutdown_rx); 1033 1034 loop { 1035 tokio::select! { 1036 _ = interval.tick() => { 1037 // Run value refresh 1038 let refreshed = self.dht_refresh_values(threshold_secs, timeout_ms).await; 1039 debug!(refreshed, "Periodic refresh cycle complete"); 1040 1041 // SECURITY: Reap expired pending operations (M4 in audit) 1042 self.reap_expired_dht_ops(PENDING_OP_MAX_AGE_SECS).await; 1043 } 1044 _ = &mut shutdown_rx => { 1045 info!("DHT refresh task received shutdown signal"); 1046 break; 1047 } 1048 } 1049 } 1050 1051 info!("DHT refresh task terminated gracefully"); 1052 }) 1053 } 1054 1055 /// Handle an incoming frame from a peer 1056 /// 1057 /// Returns the routing result indicating what happened to the frame. 1058 pub async fn handle_frame(&self, frame: AbzuFrame, source: PeerKey) -> RouteResult { 1059 trace!(source = ?source, frame = ?std::mem::discriminant(&frame), "Handling frame"); 1060 1061 match frame { 1062 AbzuFrame::KeepAlive => { 1063 // KeepAlive is just to maintain the connection 1064 // Update last activity for the source peer 1065 self.touch_peer(&source).await; 1066 RouteResult::Local 1067 } 1068 1069 AbzuFrame::Chunk { cid, data } => { 1070 // Content-addressed chunk - store it locally 1071 self.handle_chunk(cid, data, source).await 1072 } 1073 1074 AbzuFrame::Route { target, next_hop, payload } => { 1075 // Routing frame - check if we're the target or need to forward 1076 // Start at depth 0 for external frames 1077 self.handle_route(target, next_hop, payload, source, 0).await 1078 } 1079 1080 AbzuFrame::Hello { ephemeral_pub, timestamp, .. } => { 1081 // Handshake initiation (version already negotiated at transport layer) 1082 self.handle_hello(ephemeral_pub, timestamp, source).await 1083 } 1084 1085 AbzuFrame::HelloAck { ephemeral_pub, confirmation, .. } => { 1086 // Handshake response (version already negotiated at transport layer) 1087 self.handle_hello_ack(ephemeral_pub, confirmation, source).await 1088 } 1089 1090 AbzuFrame::Request { cid, requester } => { 1091 // Content request - check if we have it and send back 1092 self.handle_request(cid, requester, source).await 1093 } 1094 1095 AbzuFrame::Chat { id, to, msg, timestamp } => { 1096 // Chat message - store and send ack 1097 self.handle_chat(id, to, msg, timestamp, source).await 1098 } 1099 1100 AbzuFrame::ChatAck { id } => { 1101 // Delivery acknowledgment 1102 self.handle_chat_ack(id, source).await 1103 } 1104 1105 AbzuFrame::ReadReceipt { id } => { 1106 // Read receipt - message was read by recipient 1107 self.handle_read_receipt(id, source).await 1108 } 1109 1110 AbzuFrame::Cover { .. } => { 1111 // Cover traffic - silently discard after decryption 1112 // This is the post-decrypt discrimination point: 1113 // on the wire, Cover frames are indistinguishable from real traffic. 1114 // Only after decryption can we identify and discard them. 1115 trace!("Discarding cover frame"); 1116 RouteResult::Local 1117 } 1118 1119 AbzuFrame::Announce { peer_key, public_addr, nat_type, timestamp } => { 1120 // Peer address announcement for NAT traversal 1121 self.handle_announce(peer_key, public_addr, nat_type, timestamp, source).await 1122 } 1123 1124 AbzuFrame::ServiceRequest { service_id, request_id, requester, payload } => { 1125 self.handle_service_request(service_id, request_id, requester, payload, source).await 1126 } 1127 1128 AbzuFrame::ServiceResponse { service_id, request_id, payload, status } => { 1129 self.handle_service_response(service_id, request_id, payload, status, source).await 1130 } 1131 1132 // ───────────────────────────────────────────────────────────────── 1133 // Circle (Group) Frames 1134 // ───────────────────────────────────────────────────────────────── 1135 1136 AbzuFrame::CircleCreate { id, name, founder_key, timestamp } => { 1137 // Handle incoming circle creation (for sync/replication) 1138 self.handle_circle_create(id, name, founder_key, timestamp, source).await 1139 } 1140 1141 AbzuFrame::CircleInvite { circle_id, invitee, signature } => { 1142 // Handle circle invitation 1143 self.handle_circle_invite(circle_id, invitee, signature, source).await 1144 } 1145 1146 AbzuFrame::CircleJoin { circle_id, member_key, epoch } => { 1147 // Handle member joining 1148 self.handle_circle_join(circle_id, member_key, epoch, source).await 1149 } 1150 1151 AbzuFrame::CircleLeave { circle_id, member_key } => { 1152 // Handle member leaving 1153 self.handle_circle_leave(circle_id, member_key, source).await 1154 } 1155 1156 AbzuFrame::CircleMessage { circle_id, id, encrypted_envelope } => { 1157 // Handle incoming circle message with privacy-preserving envelope 1158 self.handle_circle_message(circle_id, id, encrypted_envelope, source).await 1159 } 1160 1161 AbzuFrame::CircleAck { circle_id, msg_id, member_key } => { 1162 // Handle circle message acknowledgment 1163 self.handle_circle_ack(circle_id, msg_id, member_key, source).await 1164 } 1165 1166 AbzuFrame::CircleVouch { circle_id, voucher, target, signature, timestamp } => { 1167 // Handle secondary endorsement (vouch) for a member 1168 self.handle_circle_vouch(circle_id, voucher, target, signature, timestamp, source).await 1169 } 1170 1171 AbzuFrame::CirclePrune { circle_id, pruner, target, signature, reason_hash, timestamp } => { 1172 // Handle member pruning (moderation action) 1173 self.handle_circle_prune(circle_id, pruner, target, signature, reason_hash, timestamp, source).await 1174 } 1175 1176 AbzuFrame::GossipHave { circle_id, msg_ids, epoch } => { 1177 // Handle gossip "I have" announcement for lazy repair 1178 self.handle_gossip_have(circle_id, msg_ids, epoch, source).await 1179 } 1180 1181 // ───────────────────────────────────────────────────────────────── 1182 // DHT (Distributed Hash Table) Frames 1183 // ───────────────────────────────────────────────────────────────── 1184 1185 AbzuFrame::DhtRequest { request_id, sender_key, kind } => { 1186 // Handle incoming DHT request 1187 self.handle_dht_request(request_id, sender_key, kind, source).await 1188 } 1189 1190 AbzuFrame::DhtResponse { request_id, responder_key, kind } => { 1191 // Handle incoming DHT response 1192 self.handle_dht_response(request_id, responder_key, kind, source).await 1193 } 1194 1195 } 1196 } 1197 1198 /// Handle a content chunk 1199 async fn handle_chunk(&self, cid: [u8; 32], data: Vec<u8>, source: PeerKey) -> RouteResult { 1200 debug!(cid = ?cid, size = data.len(), "Received chunk"); 1201 1202 // Verify CID matches data hash 1203 let computed_cid = blake3::hash(&data); 1204 if computed_cid.as_bytes() != &cid { 1205 warn!("CID mismatch - rejecting chunk"); 1206 return RouteResult::Error(NodeError::Crypto("CID mismatch".to_string())); 1207 } 1208 1209 // Store the chunk 1210 if let Err(e) = self.node.store_chunk(&cid, &data) { 1211 return RouteResult::Error(e); 1212 } 1213 1214 // Notify any pending fetches for this CID 1215 self.node.notify_content_arrived(&cid); 1216 1217 self.touch_peer(&source).await; 1218 RouteResult::Local 1219 } 1220 1221 /// Handle a content request 1222 async fn handle_request( 1223 &self, 1224 cid: [u8; 32], 1225 requester: [u8; 32], 1226 source: PeerKey, 1227 ) -> RouteResult { 1228 info!(cid = ?cid, requester = ?requester, "Received content request"); 1229 1230 // Check if we have the content 1231 match self.node.get_chunk(&cid) { 1232 Ok(Some(data)) => { 1233 // We have it - send Chunk back to requester 1234 let chunk_frame = AbzuFrame::chunk(cid, data); 1235 1236 // Send to the source peer (who forwarded the request) 1237 let peers_arc = self.node.peers(); 1238 let mut peers = peers_arc.lock().await; 1239 1240 if let Some(conn) = peers.get_mut(&source) { 1241 if let Err(e) = conn.interface.send_frame(&chunk_frame).await { 1242 warn!(error = %e, "Failed to send chunk response"); 1243 } else { 1244 conn.touch(); 1245 info!(cid = ?cid, "Sent chunk response"); 1246 } 1247 } 1248 } 1249 Ok(None) => { 1250 debug!(cid = ?cid, "Content not found locally"); 1251 // TODO: Forward request to other peers (gossip) 1252 } 1253 Err(e) => { 1254 warn!(error = %e, "Error checking store"); 1255 } 1256 } 1257 1258 self.touch_peer(&source).await; 1259 RouteResult::Local 1260 } 1261 1262 /// Handle a routing frame (multi-hop onion routing) 1263 /// 1264 /// Route frames can be nested. Each node: 1265 /// 1. Checks if it's the final target → process payload locally 1266 /// 2. Checks if next_hop is us → unwrap inner frame and forward 1267 /// 3. Otherwise → forward the whole frame to next_hop 1268 /// 1269 /// # Security 1270 /// The `depth` parameter prevents DoS from deeply nested onion frames. 1271 /// Frames exceeding MAX_ROUTE_DEPTH are rejected. 1272 async fn handle_route( 1273 &self, 1274 target: [u8; 32], 1275 next_hop: [u8; 32], 1276 payload: Vec<u8>, 1277 source: PeerKey, 1278 depth: usize, 1279 ) -> RouteResult { 1280 // SECURITY: Reject frames that exceed max routing depth (DoS protection) 1281 if depth >= MAX_ROUTE_DEPTH { 1282 warn!(depth, max = MAX_ROUTE_DEPTH, "Route frame exceeded max depth, dropping"); 1283 return RouteResult::Error(NodeError::Validation("Route depth exceeded".into())); 1284 } 1285 1286 let our_key = self.node.peer_key(); 1287 1288 // Case 1: We are the final destination 1289 if target == our_key { 1290 debug!("Route frame arrived at destination"); 1291 return self.process_local_payload(payload, source).await; 1292 } 1293 1294 // ROLE CHECK: Edge nodes don't route for others 1295 if !self.node.node_role().can_route() { 1296 debug!("Edge node refusing to route frame (not for us)"); 1297 return RouteResult::NoRoute; 1298 } 1299 1300 // Case 2: next_hop is us - we need to unwrap and forward 1301 if next_hop == our_key { 1302 debug!(target = ?target, depth, "Peeling onion layer, forwarding inner frame"); 1303 1304 // Try to decode the payload as an inner Route frame 1305 match AbzuFrame::decode(&payload) { 1306 Ok(inner_frame) => { 1307 // Forward the inner frame to its next_hop 1308 if let AbzuFrame::Route { target: inner_target, next_hop: inner_next_hop, payload: inner_payload } = inner_frame { 1309 // Recursively handle the inner route with incremented depth 1310 return Box::pin(self.handle_route( 1311 inner_target, 1312 inner_next_hop, 1313 inner_payload, 1314 source, 1315 depth + 1, 1316 )).await; 1317 } else { 1318 // Inner frame is not a Route - process locally 1319 // This is the terminal case, no further recursion risk 1320 warn!("Inner frame is not a Route, processing as local"); 1321 return Box::pin(self.handle_frame(inner_frame, source)).await; 1322 } 1323 } 1324 Err(e) => { 1325 warn!(error = %e, "Failed to decode inner route frame"); 1326 return RouteResult::Error(NodeError::Crypto(format!("Invalid inner frame: {}", e))); 1327 } 1328 } 1329 } 1330 1331 // Case 3: Forward the whole frame to next_hop (we're not the next hop) 1332 debug!(next_hop = ?next_hop, target = ?target, "Forwarding route frame to next hop"); 1333 let frame = AbzuFrame::Route { target, next_hop, payload }; 1334 let frame_bytes = frame.encode().expect("Failed to encode route frame"); 1335 self.forward_frame_to_peer(&next_hop, &frame_bytes).await 1336 } 1337 1338 /// Forward encoded frame bytes to a specific peer 1339 async fn forward_frame_to_peer(&self, peer_key: &PeerKey, frame_bytes: &[u8]) -> RouteResult { 1340 let peers_arc = self.node.peers(); 1341 let mut peers = peers_arc.lock().await; 1342 1343 if let Some(conn) = peers.get_mut(peer_key) { 1344 match conn.interface.send(frame_bytes).await { 1345 Ok(()) => { 1346 conn.tx_bytes += frame_bytes.len() as u64; 1347 conn.touch(); 1348 RouteResult::Forwarded(*peer_key) 1349 } 1350 Err(e) => RouteResult::Error(NodeError::Transport(e)), 1351 } 1352 } else { 1353 debug!(peer = ?peer_key, "No direct connection, trying router"); 1354 // Fall back to routing if no direct connection 1355 drop(peers); // Release lock before routing 1356 self.forward_via_router(peer_key, frame_bytes).await 1357 } 1358 } 1359 1360 /// Forward via router when no direct connection exists 1361 async fn forward_via_router(&self, target: &PeerKey, payload: &[u8]) -> RouteResult { 1362 let next_hop_result = { 1363 let router_arc = self.node.router(); 1364 let router = router_arc.read().await; 1365 // Derive address from key bytes (same pattern as PeerInfo::from_key_bytes) 1366 let mut addr_bytes = [0u8; 16]; 1367 addr_bytes[0] = 0x02; 1368 addr_bytes[1] = target[0] & 0x7F; 1369 addr_bytes[2..].copy_from_slice(&target[..14]); 1370 let target_addr = abzu_router::Address::from_bytes(addr_bytes); 1371 router.find_next_hop(&target_addr) 1372 }; 1373 1374 match next_hop_result { 1375 Ok(Some(next_peer)) => { 1376 self.send_to_peer(&next_peer, payload).await 1377 } 1378 Ok(None) => { 1379 RouteResult::NoRoute 1380 } 1381 Err(e) => RouteResult::Error(NodeError::Routing(e)), 1382 } 1383 } 1384 1385 1386 /// Send payload to a specific peer 1387 async fn send_to_peer(&self, peer_key: &PeerKey, payload: &[u8]) -> RouteResult { 1388 let peers_arc = self.node.peers(); 1389 let mut peers = peers_arc.lock().await; 1390 1391 if let Some(conn) = peers.get_mut(peer_key) { 1392 match conn.interface.send(payload).await { 1393 Ok(()) => { 1394 conn.tx_bytes += payload.len() as u64; 1395 conn.touch(); 1396 RouteResult::Forwarded(*peer_key) 1397 } 1398 Err(e) => RouteResult::Error(NodeError::Transport(e)), 1399 } 1400 } else { 1401 RouteResult::NoRoute 1402 } 1403 } 1404 1405 /// Process a payload that has arrived at its destination 1406 async fn process_local_payload(&self, payload: Vec<u8>, source: PeerKey) -> RouteResult { 1407 // Try to decode as an inner AbzuFrame 1408 match AbzuFrame::decode(&payload) { 1409 Ok(inner_frame) => { 1410 // Recursively handle the inner frame 1411 Box::pin(self.handle_frame(inner_frame, source)).await 1412 } 1413 Err(_) => { 1414 // Raw payload - store as content 1415 let cid = blake3::hash(&payload); 1416 if let Err(e) = self.node.store_chunk(cid.as_bytes(), &payload) { 1417 return RouteResult::Error(e); 1418 } 1419 RouteResult::Local 1420 } 1421 } 1422 } 1423 1424 /// Handle handshake initiation 1425 async fn handle_hello( 1426 &self, 1427 _ephemeral_pub: [u8; 32], 1428 _timestamp: u64, 1429 source: PeerKey, 1430 ) -> RouteResult { 1431 info!(source = ?source, "Received Hello from peer"); 1432 1433 // TODO: Implement proper key exchange 1434 // For now, just acknowledge receipt 1435 self.touch_peer(&source).await; 1436 RouteResult::Local 1437 } 1438 1439 /// Handle handshake acknowledgment 1440 async fn handle_hello_ack( 1441 &self, 1442 _ephemeral_pub: [u8; 32], 1443 _confirmation: Vec<u8>, 1444 source: PeerKey, 1445 ) -> RouteResult { 1446 info!(source = ?source, "Received HelloAck from peer"); 1447 1448 // TODO: Complete key exchange 1449 self.touch_peer(&source).await; 1450 RouteResult::Local 1451 } 1452 1453 /// Handle an incoming chat message 1454 async fn handle_chat( 1455 &self, 1456 msg_id: u64, 1457 _to: [u8; 32], 1458 msg: Vec<u8>, 1459 timestamp: u64, 1460 source: PeerKey, 1461 ) -> RouteResult { 1462 info!(id = msg_id, from = ?source, "Received chat message"); 1463 1464 // Store the inbound message 1465 if let Err(e) = self.node.store_inbound_chat(source, msg_id, msg, timestamp) { 1466 warn!(error = %e, "Failed to store inbound chat"); 1467 return RouteResult::Error(e); 1468 } 1469 1470 // Send ChatAck back to sender 1471 let ack_frame = AbzuFrame::chat_ack(msg_id); 1472 let peers_arc = self.node.peers(); 1473 let mut peers = peers_arc.lock().await; 1474 1475 if let Some(conn) = peers.get_mut(&source) { 1476 if let Err(e) = conn.interface.send_frame(&ack_frame).await { 1477 warn!(error = %e, "Failed to send ChatAck"); 1478 } else { 1479 conn.touch(); 1480 debug!(id = msg_id, "Sent ChatAck"); 1481 } 1482 } 1483 1484 RouteResult::Local 1485 } 1486 1487 /// Handle a chat delivery acknowledgment 1488 async fn handle_chat_ack(&self, msg_id: u64, source: PeerKey) -> RouteResult { 1489 debug!(id = msg_id, from = ?source, "Received ChatAck"); 1490 1491 // Mark the message as delivered 1492 if let Err(e) = self.node.mark_delivered(msg_id) { 1493 // Not finding the message is not fatal - might be a duplicate ack 1494 debug!(error = %e, "Could not mark message delivered (may be duplicate)"); 1495 } 1496 1497 self.touch_peer(&source).await; 1498 RouteResult::Local 1499 } 1500 1501 /// Handle a read receipt - recipient read the message 1502 async fn handle_read_receipt(&self, msg_id: u64, source: PeerKey) -> RouteResult { 1503 debug!(id = msg_id, from = ?source, "Received ReadReceipt"); 1504 1505 // Mark the message as read 1506 if let Err(e) = self.node.mark_read(msg_id) { 1507 // Not finding the message is not fatal - might be a duplicate receipt 1508 debug!(error = %e, "Could not mark message read (may be duplicate)"); 1509 } 1510 1511 self.touch_peer(&source).await; 1512 RouteResult::Local 1513 } 1514 1515 /// Handle a peer address announcement for NAT traversal 1516 async fn handle_announce( 1517 &self, 1518 peer_key: [u8; 32], 1519 public_addr: Vec<u8>, 1520 nat_type: u8, 1521 timestamp: u64, 1522 source: PeerKey, 1523 ) -> RouteResult { 1524 // Validate timestamp (reject old announcements > 5 min) 1525 let now = std::time::SystemTime::now() 1526 .duration_since(std::time::UNIX_EPOCH) 1527 .unwrap() 1528 .as_secs(); 1529 1530 if now.saturating_sub(timestamp) > 300 { 1531 debug!(peer = ?peer_key, "Rejecting stale announcement"); 1532 return RouteResult::Local; 1533 } 1534 1535 // Log the announcement 1536 info!( 1537 peer = ?peer_key, 1538 addr_len = public_addr.len(), 1539 nat_type = nat_type, 1540 "Received peer address announcement" 1541 ); 1542 1543 // Store in peer table for future connection attempts 1544 // For now, just update the peer's last activity 1545 self.touch_peer(&source).await; 1546 1547 // TODO: Store public_addr in a peer directory for later NAT-aware connections 1548 // TODO: Optionally forward to other peers (gossip) 1549 1550 RouteResult::Local 1551 } 1552 1553 /// Update last activity for a peer 1554 async fn touch_peer(&self, peer_key: &PeerKey) { 1555 let peers_arc = self.node.peers(); 1556 let mut peers = peers_arc.lock().await; 1557 if let Some(conn) = peers.get_mut(peer_key) { 1558 conn.touch(); 1559 } 1560 } 1561 1562 /// Periodic maintenance of seen filter (call every ~60 seconds) 1563 /// 1564 /// Rotates filters if needed. Returns true if rotation occurred. 1565 pub async fn cleanup_seen_filter(&self) -> bool { 1566 let mut filter = self.seen_filter.write().await; 1567 let rotated = filter.maybe_rotate(); 1568 if rotated { 1569 debug!(capacity = filter.capacity(), "Rotated seen filter"); 1570 } 1571 rotated 1572 } 1573 1574 /// Send keepalive to all idle connections 1575 pub async fn send_keepalives(&self, idle_threshold_ms: u64) -> Vec<(PeerKey, Result<(), TransportError>)> { 1576 let mut results = Vec::new(); 1577 let peers_arc = self.node.peers(); 1578 let mut peers = peers_arc.lock().await; 1579 1580 for (key, conn) in peers.iter_mut() { 1581 if conn.needs_keepalive(idle_threshold_ms) { 1582 let frame = AbzuFrame::KeepAlive; 1583 let result = conn.interface.send_frame(&frame).await; 1584 1585 if result.is_ok() { 1586 conn.touch(); 1587 } 1588 1589 results.push((*key, result)); 1590 } 1591 } 1592 1593 results 1594 } 1595 1596 /// Broadcast a frame to all peers 1597 pub async fn broadcast(&self, frame: &AbzuFrame) -> Vec<(PeerKey, Result<(), TransportError>)> { 1598 let mut results = Vec::new(); 1599 let peers_arc = self.node.peers(); 1600 let mut peers = peers_arc.lock().await; 1601 1602 for (key, conn) in peers.iter_mut() { 1603 let result = conn.interface.send_frame(frame).await; 1604 1605 if result.is_ok() { 1606 conn.touch(); 1607 } 1608 1609 results.push((*key, result)); 1610 } 1611 1612 results 1613 } 1614 1615 /// Broadcast raw bytes to all peers (for cover traffic) 1616 pub async fn broadcast_raw(&self, data: &[u8]) { 1617 let peers_arc = self.node.peers(); 1618 let mut peers = peers_arc.lock().await; 1619 1620 for (key, conn) in peers.iter_mut() { 1621 if let Err(e) = conn.interface.send(data).await { 1622 trace!(peer = ?key, error = %e, "Failed to send cover packet"); 1623 } 1624 } 1625 } 1626 1627 /// Retry pending messages that haven't been acknowledged 1628 /// 1629 /// This checks all pending outbound messages and resends them if they've 1630 /// been waiting longer than the retry_after_ms threshold. 1631 /// 1632 /// Returns (retried_count, failed_count) 1633 pub async fn retry_pending_messages( 1634 &self, 1635 retry_after_ms: u64, 1636 max_age_ms: u64, 1637 ) -> (usize, usize) { 1638 let now = std::time::SystemTime::now() 1639 .duration_since(std::time::UNIX_EPOCH) 1640 .unwrap() 1641 .as_millis() as u64; 1642 1643 let pending = match self.node.get_pending_messages() { 1644 Ok(msgs) => msgs, 1645 Err(e) => { 1646 warn!(error = %e, "Failed to get pending messages"); 1647 return (0, 0); 1648 } 1649 }; 1650 1651 let mut retried = 0; 1652 let mut failed = 0; 1653 1654 for msg in pending { 1655 let age = now.saturating_sub(msg.timestamp); 1656 1657 // Too old - mark as failed 1658 if age > max_age_ms { 1659 debug!(id = msg.id, age_ms = age, "Message too old, marking failed"); 1660 if let Err(e) = self.node.mark_failed(msg.id) { 1661 warn!(id = msg.id, error = %e, "Failed to mark message as failed"); 1662 } 1663 failed += 1; 1664 continue; 1665 } 1666 1667 // Not ready for retry yet 1668 if age < retry_after_ms { 1669 continue; 1670 } 1671 1672 // Attempt to resend 1673 let chat_frame = AbzuFrame::Chat { 1674 id: msg.id, 1675 to: msg.peer, 1676 msg: msg.content.clone(), 1677 timestamp: msg.timestamp, 1678 }; 1679 1680 let peer_key: PeerKey = msg.peer; 1681 1682 // Check if peer is connected 1683 let peers_arc = self.node.peers(); 1684 let peers = peers_arc.lock().await; 1685 1686 if let Some(conn) = peers.get(&peer_key) { 1687 let encoded = match chat_frame.encode() { 1688 Ok(e) => e, 1689 Err(e) => { 1690 warn!(id = msg.id, error = %e, "Failed to encode retry frame"); 1691 continue; 1692 } 1693 }; 1694 1695 match conn.interface.send(&encoded).await { 1696 Ok(_) => { 1697 debug!(id = msg.id, peer = ?peer_key, "Retried pending message"); 1698 retried += 1; 1699 } 1700 Err(e) => { 1701 debug!(id = msg.id, error = %e, "Retry send failed (peer offline)"); 1702 } 1703 } 1704 } else { 1705 trace!(id = msg.id, peer = ?peer_key, "Peer not connected, skipping retry"); 1706 } 1707 } 1708 1709 if retried > 0 || failed > 0 { 1710 info!(retried = retried, failed = failed, "Message retry pass complete"); 1711 } 1712 1713 (retried, failed) 1714 } 1715 1716 // ───────────────────────────────────────────────────────────────────────── 1717 // Circle (Group) Handlers 1718 // ───────────────────────────────────────────────────────────────────────── 1719 1720 /// Handle incoming circle creation (for state replication) 1721 async fn handle_circle_create( 1722 &self, 1723 _id: [u8; 32], 1724 _name: Vec<u8>, 1725 _founder_key: [u8; 32], 1726 _timestamp: u64, 1727 _source: PeerKey, 1728 ) -> RouteResult { 1729 // TODO: Validate signature, store circle if we're invited 1730 debug!("Received CircleCreate - replication not yet implemented"); 1731 RouteResult::Local 1732 } 1733 1734 /// Handle circle invitation 1735 /// Handle circle invitation 1736 async fn handle_circle_invite( 1737 &self, 1738 circle_id: [u8; 32], 1739 invitee: [u8; 32], 1740 signature: Vec<u8>, 1741 source: PeerKey, 1742 ) -> RouteResult { 1743 use ed25519_dalek::{VerifyingKey, Signature, Signer, Verifier}; 1744 1745 // 1. Verify that the invitation is for us 1746 let our_key = self.node.machine_identity().public_bytes(); 1747 if invitee != our_key { 1748 debug!("Received invitation for someone else, ignoring"); 1749 return RouteResult::Local; 1750 } 1751 1752 // 2. Verify signature: invitee signed by inviter (source) 1753 // message = circle_id || invitee 1754 let mut message = Vec::with_capacity(64); 1755 message.extend_from_slice(&circle_id); 1756 message.extend_from_slice(&invitee); 1757 1758 let inviter_key = match VerifyingKey::from_bytes(&source) { 1759 Ok(k) => k, 1760 Err(_) => { 1761 warn!("Invalid inviter public key"); 1762 return RouteResult::Local; // Drop 1763 } 1764 }; 1765 1766 let sig_bytes: [u8; 64] = match signature.clone().try_into() { 1767 Ok(b) => b, 1768 Err(_) => { 1769 warn!("Invalid signature length"); 1770 return RouteResult::Local; 1771 } 1772 }; 1773 1774 if inviter_key.verify(&message, &Signature::from_bytes(&sig_bytes)).is_err() { 1775 warn!("Invalid invitation signature"); 1776 return RouteResult::Local; 1777 } 1778 1779 info!(circle = ?&circle_id[..8], inviter = ?&source[..8], "Received valid Circle invitation"); 1780 1781 // 3. Pass to TrustEngine to handle acceptance logic 1782 // TODO: This auto-accepts for now. In reality, we should queue this for user approval. 1783 // For the readiness gap, we just ensure the plumbing works. 1784 match self.node.trust_engine().accept_invite(circle_id, source, signature) { 1785 Ok(_) => { 1786 info!("Accepted invitation to circle"); 1787 // TODO: Send CircleJoin to announce presence? 1788 } 1789 Err(e) => { 1790 warn!(error = %e, "Failed to accept invitation"); 1791 } 1792 } 1793 1794 RouteResult::Local 1795 } 1796 1797 /// Handle member joining a circle 1798 /// Handle member joining a circle 1799 async fn handle_circle_join( 1800 &self, 1801 circle_id: [u8; 32], 1802 member_key: [u8; 32], 1803 epoch: u64, 1804 _source: PeerKey, 1805 ) -> RouteResult { 1806 debug!( 1807 circle = ?&circle_id[..8], 1808 member = ?&member_key[..8], 1809 epoch, 1810 "Received CircleJoin" 1811 ); 1812 1813 // Update TrustEngine with new member 1814 // This is usually a gossip/sync event 1815 if let Err(e) = self.node.trust_engine().member_joined(circle_id, member_key, epoch) { 1816 warn!(error = %e, "Failed to process member join"); 1817 } else { 1818 info!(circle = ?&circle_id[..8], member = ?&member_key[..8], "Member join recorded"); 1819 } 1820 1821 RouteResult::Local 1822 } 1823 1824 /// Handle member leaving a circle 1825 /// Handle member leaving a circle 1826 async fn handle_circle_leave( 1827 &self, 1828 circle_id: [u8; 32], 1829 member_key: [u8; 32], 1830 _source: PeerKey, 1831 ) -> RouteResult { 1832 debug!( 1833 circle = ?&circle_id[..8], 1834 member = ?&member_key[..8], 1835 "Received CircleLeave" 1836 ); 1837 1838 // Update TrustEngine 1839 if let Err(e) = self.node.trust_engine().member_left(circle_id, member_key) { 1840 warn!(error = %e, "Failed to process member leave"); 1841 } else { 1842 info!(circle = ?&circle_id[..8], member = ?&member_key[..8], "Member leave recorded"); 1843 } 1844 1845 RouteResult::Local 1846 } 1847 1848 /// Handle incoming circle message 1849 /// 1850 /// SECURITY: The encrypted_envelope contains sender and timestamp inside, 1851 /// preventing traffic analysis from revealing who is communicating when. 1852 async fn handle_circle_message( 1853 &self, 1854 circle_id: [u8; 32], 1855 id: u64, 1856 encrypted_envelope: Vec<u8>, 1857 source: PeerKey, 1858 ) -> RouteResult { 1859 debug!(circle = ?&circle_id[..8], id = id, "Received circle message (envelope encrypted)"); 1860 1861 // Verify we're a member of this circle 1862 match self.node.get_circle(&circle_id) { 1863 Ok(Some(circle)) => { 1864 let our_key = self.node.peer_key(); 1865 if !circle.members.iter().any(|m| m.pubkey == our_key) { 1866 debug!("Received message for circle we're not a member of"); 1867 return RouteResult::Local; 1868 } 1869 1870 // SECURITY: Decrypt the envelope to get sender, timestamp, and content 1871 // Sender and timestamp are now protected from network observers 1872 let envelope = match self.node.decrypt_circle_envelope(&circle_id, &encrypted_envelope) { 1873 Ok(env) => env, 1874 Err(e) => { 1875 debug!("Failed to decrypt circle envelope: {:?}", e); 1876 return RouteResult::Error(e); 1877 } 1878 }; 1879 1880 debug!(sender = ?&envelope.sender[..8], timestamp = envelope.timestamp, "Decrypted envelope"); 1881 1882 // Store decrypted message 1883 if let Err(e) = self.node.store_inbound_circle_message( 1884 circle_id, 1885 id, 1886 envelope.sender, 1887 envelope.content, 1888 envelope.timestamp 1889 ) { 1890 return RouteResult::Error(e); 1891 } 1892 1893 // Send ack back to sender 1894 let ack_frame = AbzuFrame::CircleAck { 1895 circle_id, 1896 msg_id: id, 1897 member_key: our_key, 1898 }; 1899 if let Ok(encoded) = ack_frame.encode() { 1900 let _ = self.send_to_peer(&source, &encoded).await; 1901 } 1902 } 1903 Ok(None) => { 1904 debug!("Received message for unknown circle"); 1905 } 1906 Err(e) => { 1907 return RouteResult::Error(e); 1908 } 1909 } 1910 1911 RouteResult::Local 1912 } 1913 1914 /// Broadcast a frame to all members of a circle (except ourselves) 1915 pub async fn broadcast_to_circle(&self, circle_id: &[u8; 32], frame: &AbzuFrame) -> Result<usize, NodeError> { 1916 let circle = self.node.get_circle(circle_id)? 1917 .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?; 1918 1919 let our_key = self.node.peer_key(); 1920 let encoded = frame.encode().map_err(|e| NodeError::Serialization(e.to_string()))?; 1921 1922 let mut sent_count = 0; 1923 for member in &circle.members { 1924 // Skip ourselves 1925 if member.pubkey == our_key { 1926 continue; 1927 } 1928 1929 // Try to send to this member 1930 match self.send_to_peer(&member.pubkey, &encoded).await { 1931 RouteResult::Forwarded(_) => { 1932 sent_count += 1; 1933 } 1934 RouteResult::NoRoute => { 1935 debug!(member = ?&member.pubkey[..8], "No route to circle member"); 1936 } 1937 RouteResult::Error(e) => { 1938 debug!(member = ?&member.pubkey[..8], error = ?e, "Failed to send to circle member"); 1939 } 1940 _ => {} 1941 } 1942 } 1943 1944 debug!(circle = ?&circle_id[..8], sent = sent_count, total = circle.members.len() - 1, "Broadcast complete"); 1945 Ok(sent_count) 1946 } 1947 1948 /// Handle circle message acknowledgment 1949 async fn handle_circle_ack( 1950 &self, 1951 _circle_id: [u8; 32], 1952 _msg_id: u64, 1953 _member_key: [u8; 32], 1954 _source: PeerKey, 1955 ) -> RouteResult { 1956 // TODO: Track delivery status per member 1957 debug!("Received CircleAck - delivery tracking not yet implemented"); 1958 RouteResult::Local 1959 } 1960 1961 /// Handle secondary endorsement (vouch) for an existing Circle member. 1962 /// 1963 /// A vouch is a weaker form of trust than an invite. Multiple vouches 1964 /// can elevate a member to admin status. 1965 async fn handle_circle_vouch( 1966 &self, 1967 circle_id: [u8; 32], 1968 voucher: [u8; 32], 1969 target: [u8; 32], 1970 _signature: Vec<u8>, 1971 timestamp: u64, 1972 _source: PeerKey, 1973 ) -> RouteResult { 1974 debug!( 1975 circle = ?&circle_id[..8], 1976 voucher = ?&voucher[..8], 1977 target = ?&target[..8], 1978 timestamp, 1979 "Received CircleVouch" 1980 ); 1981 1982 // Verify signature: voucher signed (circle_id || target || timestamp) 1983 let message = { 1984 let mut msg = Vec::with_capacity(32 + 32 + 8); 1985 msg.extend_from_slice(&circle_id); 1986 msg.extend_from_slice(&target); 1987 msg.extend_from_slice(×tamp.to_be_bytes()); 1988 msg 1989 }; 1990 1991 let voucher_vk = match ed25519_dalek::VerifyingKey::from_bytes(&voucher) { 1992 Ok(vk) => vk, 1993 Err(e) => { 1994 warn!("Invalid voucher public key: {}", e); 1995 return RouteResult::Local; 1996 } 1997 }; 1998 1999 let sig_bytes: [u8; 64] = match _signature.clone().try_into() { 2000 Ok(bytes) => bytes, 2001 Err(_) => { 2002 warn!("Invalid signature length"); 2003 return RouteResult::Local; 2004 } 2005 }; 2006 2007 let signature = ed25519_dalek::Signature::from_bytes(&sig_bytes); 2008 if voucher_vk.verify_strict(&message, &signature).is_err() { 2009 warn!("Vouch signature verification failed"); 2010 return RouteResult::Local; 2011 } 2012 2013 // Apply the vouch using our local node 2014 // Note: This only applies if we are also a member and can record it 2015 match self.node.vouch_member(voucher, &circle_id, target, _signature) { 2016 Ok(_circle) => { 2017 info!( 2018 circle = ?&circle_id[..8], 2019 voucher = ?&voucher[..8], 2020 target = ?&target[..8], 2021 "Vouch recorded successfully" 2022 ); 2023 2024 // Broadcast to other members 2025 let vouch_frame = AbzuFrame::CircleVouch { 2026 circle_id, 2027 voucher, 2028 target, 2029 signature: sig_bytes.to_vec(), 2030 timestamp, 2031 }; 2032 if let Err(e) = self.broadcast_to_circle(&circle_id, &vouch_frame).await { 2033 warn!("Failed to broadcast vouch: {}", e); 2034 } 2035 } 2036 Err(e) => { 2037 warn!("Failed to apply vouch: {}", e); 2038 } 2039 } 2040 2041 RouteResult::Local 2042 } 2043 2044 /// Handle member pruning (moderation action). 2045 /// 2046 /// Removes a member AND all members they invited (recursively). 2047 /// This is the accountability mechanism - if you invite someone bad, 2048 /// you risk being pruned along with them. 2049 #[allow(clippy::too_many_arguments)] 2050 async fn handle_circle_prune( 2051 &self, 2052 circle_id: [u8; 32], 2053 pruner: [u8; 32], 2054 target: [u8; 32], 2055 _signature: Vec<u8>, 2056 _reason_hash: Option<[u8; 32]>, 2057 timestamp: u64, 2058 _source: PeerKey, 2059 ) -> RouteResult { 2060 debug!( 2061 circle = ?&circle_id[..8], 2062 pruner = ?&pruner[..8], 2063 target = ?&target[..8], 2064 timestamp, 2065 "Received CirclePrune" 2066 ); 2067 2068 // Build message to verify: circle_id || target || reason_hash || timestamp 2069 let message = { 2070 let mut msg = Vec::with_capacity(32 + 32 + 32 + 8); 2071 msg.extend_from_slice(&circle_id); 2072 msg.extend_from_slice(&target); 2073 if let Some(reason) = _reason_hash { 2074 msg.extend_from_slice(&reason); 2075 } 2076 msg.extend_from_slice(×tamp.to_be_bytes()); 2077 msg 2078 }; 2079 2080 // Verify pruner's signature 2081 let pruner_vk = match ed25519_dalek::VerifyingKey::from_bytes(&pruner) { 2082 Ok(vk) => vk, 2083 Err(e) => { 2084 warn!("Invalid pruner public key: {}", e); 2085 return RouteResult::Local; 2086 } 2087 }; 2088 2089 let sig_bytes: [u8; 64] = match _signature.clone().try_into() { 2090 Ok(bytes) => bytes, 2091 Err(_) => { 2092 warn!("Invalid prune signature length"); 2093 return RouteResult::Local; 2094 } 2095 }; 2096 2097 let signature = ed25519_dalek::Signature::from_bytes(&sig_bytes); 2098 if pruner_vk.verify_strict(&message, &signature).is_err() { 2099 warn!("Prune signature verification failed"); 2100 return RouteResult::Local; 2101 } 2102 2103 // Apply pruning using our local node 2104 match self.node.prune_branch(pruner, &circle_id, target, _signature, _reason_hash) { 2105 Ok((_circle, removed)) => { 2106 info!( 2107 circle = ?&circle_id[..8], 2108 pruner = ?&pruner[..8], 2109 target = ?&target[..8], 2110 removed_count = removed.len(), 2111 "Branch pruned successfully" 2112 ); 2113 2114 // Broadcast to remaining members 2115 let prune_frame = AbzuFrame::CirclePrune { 2116 circle_id, 2117 pruner, 2118 target, 2119 signature: sig_bytes.to_vec(), 2120 reason_hash: _reason_hash, 2121 timestamp, 2122 }; 2123 if let Err(e) = self.broadcast_to_circle(&circle_id, &prune_frame).await { 2124 warn!("Failed to broadcast prune: {}", e); 2125 } 2126 } 2127 Err(e) => { 2128 warn!("Failed to apply prune: {}", e); 2129 } 2130 } 2131 2132 RouteResult::Local 2133 } 2134 2135 2136 /// Handle "I have" announcements for lazy-push repair. 2137 /// 2138 /// When we receive a GossipHave frame, we can identify messages we're missing 2139 /// and request them from the sender. 2140 async fn handle_gossip_have( 2141 &self, 2142 circle_id: [u8; 32], 2143 msg_ids: Vec<u64>, 2144 epoch: u64, 2145 source: PeerKey, 2146 ) -> RouteResult { 2147 debug!( 2148 circle = ?&circle_id[..8], 2149 msg_count = msg_ids.len(), 2150 epoch, 2151 source = ?&source[..8], 2152 "Received GossipHave announcement" 2153 ); 2154 2155 // Check if we're a member of this circle 2156 if self.node.get_circle(&circle_id).ok().flatten().is_none() { 2157 debug!(circle = ?&circle_id[..8], "Not a member of this circle, ignoring gossip"); 2158 return RouteResult::Local; 2159 } 2160 2161 // TODO: Implement pull mechanism for missing messages. 2162 // For now, just log the announcement - eager push handles most propagation. 2163 // Future: Compare msg_ids with our seen-set and request missing ones. 2164 2165 RouteResult::Local 2166 } 2167 2168 // ───────────────────────────────────────────────────────────── 2169 // Gossip Protocol (Epidemic Broadcast) 2170 // ───────────────────────────────────────────────────────────── 2171 2172 /// Calculate gossip fanout: √n peers, clamped to [2, 8] 2173 pub fn calculate_fanout(member_count: usize) -> usize { 2174 let n = member_count.saturating_sub(1); // Exclude self 2175 let fanout = (n as f64).sqrt().ceil() as usize; 2176 fanout.clamp(2, 8) 2177 } 2178 2179 /// Check if we've seen this message recently (probabilistic) 2180 pub async fn already_seen(&self, circle_id: &[u8; 32], msg_id: u64) -> bool { 2181 let filter = self.seen_filter.read().await; 2182 filter.contains(circle_id, msg_id) 2183 } 2184 2185 /// Mark a message as seen (insert into probabilistic filter) 2186 pub async fn mark_seen(&self, circle_id: &[u8; 32], msg_id: u64) { 2187 let mut filter = self.seen_filter.write().await; 2188 filter.insert(circle_id, msg_id); 2189 // Opportunistic rotation check 2190 filter.maybe_rotate(); 2191 } 2192 2193 /// Gossip a message to √n random circle members (epidemic broadcast) 2194 /// 2195 /// Returns the number of peers we gossiped to. 2196 pub async fn gossip_to_circle( 2197 &self, 2198 circle_id: &[u8; 32], 2199 msg_id: u64, 2200 frame: &AbzuFrame, 2201 ) -> Result<usize, NodeError> { 2202 // Check deduplication first 2203 if self.already_seen(circle_id, msg_id).await { 2204 trace!(circle = ?&circle_id[..8], msg_id, "Message already seen, skipping gossip"); 2205 return Ok(0); 2206 } 2207 2208 // Mark as seen 2209 self.mark_seen(circle_id, msg_id).await; 2210 2211 // Get circle 2212 let circle = self.node.get_circle(circle_id)? 2213 .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?; 2214 2215 let our_key = self.node.peer_key(); 2216 let encoded = frame.encode().map_err(|e| NodeError::Serialization(e.to_string()))?; 2217 2218 // Calculate fanout 2219 let fanout = Self::calculate_fanout(circle.members.len()); 2220 2221 // Filter to eligible peers (not us) 2222 let eligible: Vec<_> = circle.members.iter() 2223 .filter(|m| m.pubkey != our_key) 2224 .collect(); 2225 2226 if eligible.is_empty() { 2227 return Ok(0); 2228 } 2229 2230 // Select random subset 2231 use rand::seq::SliceRandom; 2232 let mut rng = rand::thread_rng(); 2233 let targets: Vec<_> = eligible.choose_multiple(&mut rng, fanout.min(eligible.len())).collect(); 2234 2235 let mut sent_count = 0; 2236 for member in targets { 2237 match self.send_to_peer(&member.pubkey, &encoded).await { 2238 RouteResult::Forwarded(_) => { 2239 sent_count += 1; 2240 } 2241 RouteResult::NoRoute => { 2242 debug!(member = ?&member.pubkey[..8], "No route to gossip target"); 2243 } 2244 RouteResult::Error(e) => { 2245 debug!(member = ?&member.pubkey[..8], error = ?e, "Failed to gossip to member"); 2246 } 2247 _ => {} 2248 } 2249 } 2250 2251 debug!( 2252 circle = ?&circle_id[..8], 2253 msg_id, 2254 sent = sent_count, 2255 fanout, 2256 total_members = circle.members.len(), 2257 "Gossip broadcast complete" 2258 ); 2259 Ok(sent_count) 2260 } 2261 2262 // ───────────────────────────────────────────────────────────────────────── 2263 // DHT (Distributed Hash Table) Handlers 2264 // ───────────────────────────────────────────────────────────────────────── 2265 2266 /// Handle an incoming DHT request. 2267 /// 2268 /// Processes Ping, FindNode, FindValue, and Store requests from peers. 2269 /// Sends appropriate responses back to the requester. 2270 async fn handle_dht_request( 2271 &self, 2272 request_id: u64, 2273 sender_key: [u8; 32], 2274 kind: DhtRequestKind, 2275 source: PeerKey, 2276 ) -> RouteResult { 2277 debug!( 2278 request_id, 2279 sender = ?&sender_key[..8], 2280 kind = ?std::mem::discriminant(&kind), 2281 "Received DHT request" 2282 ); 2283 2284 let our_key = self.node.peer_key(); 2285 let now = unix_now(); 2286 2287 // SECURITY: Rate limit per-peer DHT requests (H1 in audit). 2288 // Check before doing any expensive work like routing table updates. 2289 let sender_id = abzu_dht::key::node_id(&sender_key); 2290 let is_store = matches!(kind, DhtRequestKind::Store { .. }); 2291 { 2292 let mut rate_limiter = self.dht_rate_limiter.write().await; 2293 match rate_limiter.check_request(&sender_id, now, is_store) { 2294 RateLimitResult::Allowed => {} 2295 RateLimitResult::Limited { remaining_secs } => { 2296 warn!( 2297 sender = ?&sender_key[..8], 2298 remaining_secs, 2299 "DHT request rate limited" 2300 ); 2301 return RouteResult::Local; // Silently drop 2302 } 2303 RateLimitResult::StoreLimited { remaining_secs } => { 2304 warn!( 2305 sender = ?&sender_key[..8], 2306 remaining_secs, 2307 "DHT Store request rate limited" 2308 ); 2309 return RouteResult::Local; // Silently drop 2310 } 2311 } 2312 } 2313 2314 // SECURITY: Add sender as Questionable, not Good (C2 in audit). 2315 // They initiated contact, but we haven't verified bidirectional communication. 2316 // They'll be promoted to Good after we successfully send them a message. 2317 let sender_node = DhtNodeInfoFull { 2318 id: abzu_dht::key::node_id(&sender_key), 2319 pubkey: sender_key, 2320 address: peer_key_to_hex(&source), 2321 tree_coords: None, 2322 state: NodeState::Questionable, 2323 failures: 0, 2324 last_seen: now, 2325 first_seen: now, 2326 rtt_ms: None, 2327 }; 2328 2329 { 2330 let mut routing = self.dht_routing.write().await; 2331 let _ = routing.add(sender_node.clone(), now); 2332 } 2333 2334 // Build and send response based on request type 2335 let response_frame = match kind { 2336 DhtRequestKind::Ping => { 2337 // Simple liveness check - respond with Pong 2338 AbzuFrame::dht_pong(request_id, our_key) 2339 } 2340 2341 DhtRequestKind::FindNode { target } => { 2342 // Find K closest nodes to target in our routing table 2343 let routing = self.dht_routing.read().await; 2344 let closest = routing.closest(&target, K); 2345 2346 // Convert domain nodes to wire format 2347 let nodes: Vec<DhtNodeInfo> = closest 2348 .nodes 2349 .iter() 2350 .map(domain_to_wire_node) 2351 .collect(); 2352 2353 debug!( 2354 target = ?&target[..8], 2355 found_count = nodes.len(), 2356 "DHT FindNode - returning closest nodes" 2357 ); 2358 2359 AbzuFrame::dht_nodes(request_id, our_key, nodes) 2360 } 2361 2362 DhtRequestKind::FindValue { key } => { 2363 // Try to find the value in our local store 2364 let values_store = self.dht_values.read().await; 2365 let stored_values = values_store.get(&key, now); 2366 2367 if let Some(first_value) = stored_values.first() { 2368 // Found a value - return it 2369 let ttl_remaining = first_value.metadata.expires_at.saturating_sub(now); 2370 2371 debug!( 2372 key = ?&key[..8], 2373 value_len = first_value.payload.len(), 2374 ttl_remaining, 2375 "DHT FindValue - found value" 2376 ); 2377 2378 // Convert u64 TTL to u32 for wire format (cap at u32::MAX) 2379 let ttl_u32 = ttl_remaining.min(u32::MAX as u64) as u32; 2380 2381 AbzuFrame::dht_value( 2382 request_id, 2383 our_key, 2384 first_value.payload.clone(), 2385 first_value.metadata.publisher, 2386 ttl_u32, 2387 ) 2388 } else { 2389 // Value not found, return closest nodes 2390 drop(values_store); // Release read lock 2391 let routing = self.dht_routing.read().await; 2392 let closest = routing.closest(&key, K); 2393 let nodes: Vec<DhtNodeInfo> = closest.nodes.iter().map(domain_to_wire_node).collect(); 2394 2395 debug!( 2396 key = ?&key[..8], 2397 node_count = nodes.len(), 2398 "DHT FindValue - value not found, returning closest nodes" 2399 ); 2400 AbzuFrame::dht_nodes(request_id, our_key, nodes) 2401 } 2402 } 2403 2404 DhtRequestKind::Store { key, value, ttl_secs, signature } => { 2405 // SECURITY: Reject malformed signatures immediately (H2 in audit). 2406 // Ed25519 signatures are exactly 64 bytes. Don't waste cycles on invalid ones. 2407 if signature.len() != 64 { 2408 warn!( 2409 sender = ?&sender_key[..8], 2410 sig_len = signature.len(), 2411 "DHT Store rejected - invalid signature length" 2412 ); 2413 return RouteResult::Local; // Silently drop malformed request 2414 } 2415 2416 // Create a StoredValue for the DHT store using the constructor 2417 let mut stored = StoredValue::new( 2418 key, 2419 abzu_dht::store::ValueType::Application, // Generic application data 2420 sender_key, 2421 value.clone(), 2422 now, 2423 Some(ttl_secs as u64), 2424 ); 2425 2426 // Signature is exactly 64 bytes, copy directly 2427 stored.signature.copy_from_slice(&signature); 2428 2429 // Store in our value store 2430 let mut store = self.dht_values.write().await; 2431 match store.store(stored, now) { 2432 Ok(_) => { 2433 debug!( 2434 key = ?&key[..8], 2435 value_len = value.len(), 2436 ttl_secs, 2437 "DHT Store - value stored successfully" 2438 ); 2439 AbzuFrame::dht_store_ack(request_id, our_key, true) 2440 } 2441 Err(e) => { 2442 warn!( 2443 key = ?&key[..8], 2444 error = ?e, 2445 "DHT Store - failed to store value" 2446 ); 2447 AbzuFrame::dht_store_ack(request_id, our_key, false) 2448 } 2449 } 2450 } 2451 }; 2452 2453 // Send the response back to the requester 2454 let peers_arc = self.node.peers(); 2455 let mut peers = peers_arc.lock().await; 2456 2457 if let Some(conn) = peers.get_mut(&source) { 2458 if let Err(e) = conn.interface.send_frame(&response_frame).await { 2459 warn!(error = %e, "Failed to send DHT response"); 2460 } else { 2461 conn.touch(); 2462 trace!(request_id, "Sent DHT response"); 2463 } 2464 } 2465 2466 self.touch_peer(&source).await; 2467 RouteResult::Local 2468 } 2469 2470 /// Handle an incoming DHT response. 2471 /// 2472 /// Processes Pong, Nodes, Value, and StoreAck responses from peers. 2473 /// Routes responses to pending DHT operations (lookups, stores). 2474 async fn handle_dht_response( 2475 &self, 2476 request_id: u64, 2477 responder_key: [u8; 32], 2478 kind: DhtResponseKind, 2479 source: PeerKey, 2480 ) -> RouteResult { 2481 debug!( 2482 request_id, 2483 responder = ?&responder_key[..8], 2484 kind = ?std::mem::discriminant(&kind), 2485 "Received DHT response" 2486 ); 2487 2488 let now = unix_now(); 2489 2490 // Update routing table with responder info (they responded, so they're alive) 2491 let responder_node = DhtNodeInfoFull { 2492 id: abzu_dht::key::node_id(&responder_key), 2493 pubkey: responder_key, 2494 address: peer_key_to_hex(&source), 2495 tree_coords: None, 2496 state: NodeState::Good, 2497 failures: 0, 2498 last_seen: now, 2499 first_seen: now, 2500 rtt_ms: None, 2501 }; 2502 2503 { 2504 let mut routing = self.dht_routing.write().await; 2505 let _ = routing.add(responder_node, now); 2506 } 2507 2508 // Complete pending operation if one exists for this request_id 2509 { 2510 let mut pending = self.pending_dht_ops.write().await; 2511 if let Some((_created_at, sender)) = pending.remove(&request_id) { 2512 // Clone the response to send to the waiting operation 2513 let response_clone = kind.clone(); 2514 if sender.send(response_clone).is_err() { 2515 debug!(request_id, "Pending DHT operation receiver dropped"); 2516 } else { 2517 trace!(request_id, "Completed pending DHT operation"); 2518 } 2519 } 2520 } 2521 2522 // Process the response locally (routing table updates, caching, etc.) 2523 match &kind { 2524 DhtResponseKind::Pong => { 2525 // Node is alive - already updated routing table above 2526 trace!(responder = ?&responder_key[..8], "DHT Pong received - node alive"); 2527 } 2528 2529 DhtResponseKind::Nodes { nodes } => { 2530 // Received list of nodes closer to target 2531 debug!( 2532 responder = ?&responder_key[..8], 2533 node_count = nodes.len(), 2534 "DHT Nodes response received" 2535 ); 2536 2537 // SECURITY: Add discovered nodes as Questionable, not Good (C2/C4 in audit). 2538 // These nodes are reported by the responder but haven't been verified yet. 2539 // They will be promoted to Good after successful communication. 2540 // Also cap the number of nodes we accept per response to prevent flooding. 2541 const MAX_NODES_PER_RESPONSE: usize = 20; // K = 20 2542 { 2543 let mut routing = self.dht_routing.write().await; 2544 for wire_node in nodes.iter().take(MAX_NODES_PER_RESPONSE) { 2545 let mut domain_node = wire_to_domain_node(wire_node, now); 2546 domain_node.state = NodeState::Questionable; // Don't trust until verified 2547 let _ = routing.add(domain_node, now); 2548 trace!(node_key = ?&wire_node.key[..8], "Added discovered node as Questionable"); 2549 } 2550 } 2551 } 2552 2553 DhtResponseKind::Value { value, publisher, ttl_remaining_secs } => { 2554 // Received a stored value 2555 info!( 2556 responder = ?&responder_key[..8], 2557 value_len = value.len(), 2558 publisher = ?&publisher[..8], 2559 ttl_remaining = ttl_remaining_secs, 2560 "DHT Value found" 2561 ); 2562 2563 // SECURITY: We intentionally do NOT cache values from FindValue responses. 2564 // The wire format doesn't include signatures, so we can't verify authenticity. 2565 // Caching unverified values would enable cache poisoning attacks (C3 in audit). 2566 // The value is still returned to the caller via the pending operation. 2567 } 2568 2569 DhtResponseKind::StoreAck { success } => { 2570 // Store operation acknowledgment 2571 debug!( 2572 responder = ?&responder_key[..8], 2573 success, 2574 "DHT StoreAck received" 2575 ); 2576 } 2577 2578 DhtResponseKind::Error { code, message } => { 2579 // DHT operation failed - mark responder as having an issue 2580 warn!( 2581 responder = ?&responder_key[..8], 2582 error_code = code, 2583 message = %message, 2584 "DHT Error received" 2585 ); 2586 2587 // We could increment failure count, but for now just log 2588 // The node still responded, so it's reachable 2589 } 2590 } 2591 2592 self.touch_peer(&source).await; 2593 RouteResult::Local 2594 } 2595 2596 2597 /// Handle generic service request 2598 async fn handle_service_request(&self, service_id: u64, request_id: u64, requester: [u8; 32], payload: Vec<u8>, source: PeerKey) -> RouteResult { 2599 if let Some(tx) = self.node.get_service_handler(service_id) { 2600 // Forward to local service handler 2601 let event = ServiceEvent::Request { 2602 request_id, 2603 requester: PeerKey::from(requester), 2604 payload, 2605 }; 2606 2607 if let Err(e) = tx.send(event).await { 2608 warn!(service_id, error = ?e, "Failed to dispatch service request (handler dropped)"); 2609 // Handler dropped - maybe send unavailable response? 2610 } 2611 } else { 2612 // No handler for this service 2613 debug!(service_id, "Received request for unknown service"); 2614 2615 // Send back generic "Service Unavailable" response (status 1) 2616 // This allows the requester to fail fast 2617 let response = AbzuFrame::ServiceResponse { 2618 service_id, 2619 request_id, 2620 payload: vec![], 2621 status: 1, // Generic unavailable 2622 }; 2623 2624 if let Err(e) = self.node.send(source, response).await { 2625 warn!(target = ?source, error = ?e, "Failed to send ServiceUnavailable response"); 2626 } 2627 } 2628 2629 RouteResult::Local 2630 } 2631 2632 /// Handle generic service response 2633 async fn handle_service_response(&self, service_id: u64, request_id: u64, payload: Vec<u8>, status: u8, _source: PeerKey) -> RouteResult { 2634 if let Some(tx) = self.node.get_service_handler(service_id) { 2635 // Forward to local service handler 2636 let event = ServiceEvent::Response { 2637 request_id, 2638 status, 2639 payload, 2640 }; 2641 2642 if let Err(e) = tx.send(event).await { 2643 warn!(service_id, error = ?e, "Failed to dispatch service response (handler dropped)"); 2644 } 2645 } else { 2646 debug!(service_id, "Received response for unknown service (maybe just closed?)"); 2647 } 2648 2649 RouteResult::Local 2650 } 2651 } 2652 2653 #[cfg(test)] 2654 mod tests { 2655 use super::*; 2656 use crate::node::NodeConfig; 2657 use tempfile::tempdir; 2658 2659 fn create_test_node() -> Arc<Node> { 2660 let tmp = tempdir().unwrap(); 2661 let config = NodeConfig { 2662 storage_path: tmp.path().to_str().unwrap().to_string(), 2663 ..Default::default() 2664 }; 2665 Arc::new(Node::new(config).unwrap()) 2666 } 2667 2668 #[tokio::test] 2669 async fn test_handle_keepalive() { 2670 let node = create_test_node(); 2671 let switchboard = Switchboard::new(node); 2672 2673 let source = [0xAB; 32]; 2674 let result = switchboard.handle_frame(AbzuFrame::KeepAlive, source).await; 2675 2676 assert!(matches!(result, RouteResult::Local)); 2677 } 2678 2679 #[tokio::test] 2680 async fn test_handle_chunk() { 2681 let node = create_test_node(); 2682 let switchboard = Switchboard::new(Arc::clone(&node)); 2683 2684 let data = b"test data"; 2685 let cid = *blake3::hash(data).as_bytes(); 2686 let source = [0xAB; 32]; 2687 2688 let result = switchboard 2689 .handle_frame(AbzuFrame::Chunk { cid, data: data.to_vec() }, source) 2690 .await; 2691 2692 assert!(matches!(result, RouteResult::Local)); 2693 2694 // Verify chunk was stored 2695 let retrieved = node.get_chunk(&cid).unwrap().unwrap(); 2696 assert_eq!(retrieved, data); 2697 } 2698 2699 #[tokio::test] 2700 async fn test_handle_chunk_invalid_cid() { 2701 let node = create_test_node(); 2702 let switchboard = Switchboard::new(node); 2703 2704 let data = b"test data"; 2705 let bad_cid = [0xFF; 32]; // Wrong CID 2706 let source = [0xAB; 32]; 2707 2708 let result = switchboard 2709 .handle_frame(AbzuFrame::Chunk { cid: bad_cid, data: data.to_vec() }, source) 2710 .await; 2711 2712 assert!(matches!(result, RouteResult::Error(_))); 2713 } 2714 2715 #[tokio::test] 2716 async fn test_handle_route_local_target() { 2717 let node = create_test_node(); 2718 let our_key = node.peer_key(); 2719 let switchboard = Switchboard::new(Arc::clone(&node)); 2720 2721 // Route frame where we are the target 2722 let payload = b"secret message".to_vec(); 2723 let route_frame = AbzuFrame::Route { 2724 target: our_key, 2725 next_hop: our_key, 2726 payload: payload.clone(), 2727 }; 2728 2729 let source = [0xAB; 32]; 2730 let result = switchboard.handle_frame(route_frame, source).await; 2731 2732 // Should process locally 2733 assert!(matches!(result, RouteResult::Local)); 2734 } 2735 2736 #[tokio::test] 2737 async fn test_onion_wrap_unwrap() { 2738 // Test that wrap_onion creates proper nested structure 2739 let target = [0xCC; 32]; 2740 let hop1 = [0xAA; 32]; 2741 let hop2 = [0xBB; 32]; 2742 let payload = b"inner payload".to_vec(); 2743 2744 // Create onion: us -> hop1 -> hop2 -> target 2745 let onion = AbzuFrame::wrap_onion(payload.clone(), target, &[hop1, hop2]); 2746 2747 // Outer layer should have next_hop = hop1 2748 if let AbzuFrame::Route { next_hop, target: t, payload: p } = &onion { 2749 assert_eq!(*next_hop, hop1); 2750 assert_eq!(*t, target); 2751 2752 // Unwrap first layer 2753 let inner = AbzuFrame::decode(p).unwrap(); 2754 if let AbzuFrame::Route { next_hop: nh2, target: t2, payload: p2 } = &inner { 2755 assert_eq!(*nh2, hop2); 2756 assert_eq!(*t2, target); 2757 2758 // Unwrap second layer 2759 let innermost = AbzuFrame::decode(p2).unwrap(); 2760 if let AbzuFrame::Route { next_hop: nh3, target: t3, payload: p3 } = &innermost { 2761 assert_eq!(*nh3, target); 2762 assert_eq!(*t3, target); 2763 assert_eq!(p3, &payload); 2764 } else { 2765 panic!("Innermost should be Route"); 2766 } 2767 } else { 2768 panic!("Inner should be Route"); 2769 } 2770 } else { 2771 panic!("Outer should be Route"); 2772 } 2773 } 2774 2775 // ───────────────────────────────────────────────────────────── 2776 // Gossip Protocol Tests 2777 // ───────────────────────────────────────────────────────────── 2778 2779 #[test] 2780 fn test_fanout_calculation() { 2781 // √n fanout, clamped to [2, 8] 2782 2783 // Small groups: clamp to minimum of 2 2784 assert_eq!(Switchboard::calculate_fanout(1), 2); // solo, effectively 0 peers 2785 assert_eq!(Switchboard::calculate_fanout(2), 2); // 1 peer -> √1 = 1, clamped to 2 2786 assert_eq!(Switchboard::calculate_fanout(3), 2); // 2 peers -> √2 = 1.4 → 2 2787 2788 // Medium groups: √n applies 2789 assert_eq!(Switchboard::calculate_fanout(5), 2); // 4 peers -> √4 = 2 2790 assert_eq!(Switchboard::calculate_fanout(10), 3); // 9 peers -> √9 = 3 2791 assert_eq!(Switchboard::calculate_fanout(17), 4); // 16 peers -> √16 = 4 2792 assert_eq!(Switchboard::calculate_fanout(26), 5); // 25 peers -> √25 = 5 2793 2794 // Large groups: clamp to maximum of 8 2795 assert_eq!(Switchboard::calculate_fanout(65), 8); // √64 = 8 2796 assert_eq!(Switchboard::calculate_fanout(100), 8); // √99 = ~10, clamped to 8 2797 assert_eq!(Switchboard::calculate_fanout(1000), 8); // √999 = ~32, clamped to 8 2798 } 2799 2800 #[tokio::test] 2801 async fn test_seen_set_deduplication() { 2802 let node = create_test_node(); 2803 let switchboard = Switchboard::new(node); 2804 2805 let circle_id = [0x42; 32]; 2806 let msg_id = 12345u64; 2807 2808 // First time: not seen 2809 assert!(!switchboard.already_seen(&circle_id, msg_id).await); 2810 2811 // Mark as seen 2812 switchboard.mark_seen(&circle_id, msg_id).await; 2813 2814 // Second time: should be seen 2815 assert!(switchboard.already_seen(&circle_id, msg_id).await); 2816 2817 // Different message: not seen 2818 assert!(!switchboard.already_seen(&circle_id, 99999).await); 2819 2820 // Different circle: not seen 2821 assert!(!switchboard.already_seen(&[0x99; 32], msg_id).await); 2822 } 2823 }