/ abzu-core / src / switchboard.rs
switchboard.rs
   1  //! Switchboard
   2  //!
   3  //! Handles incoming frames and routes them to their destination.
   4  //! This is the core routing logic that makes us a mesh router.
   5  
   6  use std::collections::HashMap;
   7  use std::sync::Arc;
   8  use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
   9  
  10  use scalable_cuckoo_filter::ScalableCuckooFilterBuilder;
  11  use rand_09::{rngs::StdRng, SeedableRng};
  12  
  13  use tokio::sync::{RwLock, oneshot};
  14  use tracing::{debug, info, trace, warn};
  15  
  16  use abzu_router::PeerKey;
  17  use abzu_transport::{AbzuFrame, AbzuInterfaceExt, DhtRequestKind, DhtResponseKind, DhtNodeInfo, TransportError};
  18  
  19  // DHT imports for full integration
  20  use abzu_dht::{
  21      RoutingTable, ValueStore, 
  22      routing::{NodeInfo as DhtNodeInfoFull, NodeState},
  23      store::{StoredValue, ValueType},
  24      security::{RateLimiter, RateLimitResult},
  25      NodeLookup, ValueLookup,
  26      DhtKey, K,
  27  };
  28  
  29  use crate::node::{Node, NodeError, ServiceEvent};
  30  
  31  /// Type alias for pending DHT operations map to reduce type complexity
  32  type PendingDhtOps = HashMap<u64, (u64, oneshot::Sender<DhtResponseKind>)>;
  33  
  34  /// Frame routing result
  35  #[derive(Debug)]
  36  pub enum RouteResult {
  37      /// Frame is for us, was processed locally
  38      Local,
  39      /// Frame was forwarded to a peer
  40      Forwarded(PeerKey),
  41      /// No route found
  42      NoRoute,
  43      /// Error occurred
  44      Error(NodeError),
  45  }
  46  
  47  /// Result of a DHT value lookup
  48  #[derive(Debug)]
  49  pub enum FindValueResult {
  50      /// Value(s) found
  51      Found(Vec<StoredValue>),
  52      /// Value not found, returning closest nodes
  53      NotFound(Vec<DhtNodeInfoFull>),
  54  }
  55  
  56  /// Internal result type for a single FindValue query
  57  enum FindValueQueryResult {
  58      /// Remote returned a value
  59      Value(StoredValue),
  60      /// Remote returned closer nodes
  61      Nodes(Vec<DhtNodeInfoFull>),
  62  }
  63  
  64  /// Result of a DHT store operation
  65  #[derive(Debug, Clone)]
  66  pub struct StoreResult {
  67      /// Whether the value was stored locally
  68      pub stored_locally: bool,
  69      /// Number of remote nodes that accepted the store
  70      pub nodes_stored: usize,
  71      /// Number of remote nodes that failed to store
  72      pub nodes_failed: usize,
  73  }
  74  
  75  /// The Switchboard handles all incoming frames
  76  pub struct Switchboard {
  77      node: Arc<Node>,
  78      /// Probabilistic seen-set for gossip deduplication using CuckooFilter
  79      /// Uses dual-filter rotation for TTL-based eviction
  80      seen_filter: Arc<RwLock<SeenFilter>>,
  81      /// DHT routing table for peer discovery
  82      dht_routing: Arc<RwLock<RoutingTable>>,
  83      /// DHT value store for content/provider records
  84      dht_values: Arc<RwLock<ValueStore>>,
  85      /// Pending DHT operations awaiting responses: request_id -> (created_at, response sender)
  86      /// Tuple stores creation timestamp for M4 expiration cleanup
  87      pending_dht_ops: Arc<RwLock<PendingDhtOps>>,
  88      /// Rate limiter for incoming DHT requests (H1 protection)
  89      dht_rate_limiter: Arc<RwLock<RateLimiter>>,
  90  }
  91  
  92  /// TTL for seen messages (5 minutes)
  93  const SEEN_MESSAGE_TTL_SECS: u64 = 300;
  94  
  95  /// Rotation interval: half TTL (2.5 minutes)
  96  /// Entries live for 1.5x to 2x this interval
  97  const SEEN_FILTER_ROTATION_SECS: u64 = SEEN_MESSAGE_TTL_SECS / 2;
  98  
  99  /// Initial capacity for each CuckooFilter (auto-scales as needed)
 100  const SEEN_FILTER_INITIAL_CAPACITY: usize = 1000;
 101  
 102  /// Target false positive rate for deduplication
 103  /// 0.001 = 0.1% chance of incorrectly rejecting a unique message
 104  const SEEN_FILTER_FP_RATE: f64 = 0.001;
 105  
 106  // ============================================================================
 107  // SeenFilter: Probabilistic Deduplication with TTL
 108  // ============================================================================
 109  
 110  /// Type alias for our Send+Sync compatible CuckooFilter.
 111  /// Uses StdRng instead of ThreadRng (which is not Send).
 112  /// Uses the crate's default SipHasher13 hasher.
 113  type SendCuckooFilter = scalable_cuckoo_filter::ScalableCuckooFilter<MessageFingerprint, siphasher::sip::SipHasher13, StdRng>;
 114  
 115  /// Create a new Send-compatible CuckooFilter.
 116  fn new_cuckoo_filter() -> SendCuckooFilter {
 117      ScalableCuckooFilterBuilder::new()
 118          .initial_capacity(SEEN_FILTER_INITIAL_CAPACITY)
 119          .false_positive_probability(SEEN_FILTER_FP_RATE)
 120          .rng(StdRng::from_os_rng())
 121          .finish()
 122  }
 123  
 124  /// A hash-based message fingerprint for the CuckooFilter.
 125  /// Combines circle_id and msg_id into a 40-byte key.
 126  #[derive(Clone, PartialEq, Eq, Hash)]
 127  struct MessageFingerprint {
 128      circle_id: [u8; 32],
 129      msg_id: u64,
 130  }
 131  
 132  impl MessageFingerprint {
 133      fn new(circle_id: &[u8; 32], msg_id: u64) -> Self {
 134          Self {
 135              circle_id: *circle_id,
 136              msg_id,
 137          }
 138      }
 139  }
 140  
 141  /// Dual-filter probabilistic seen-set with time-based rotation.
 142  ///
 143  /// ## Design
 144  /// 
 145  /// Uses two CuckooFilters:
 146  /// - **current**: Accepts new inserts, checked first
 147  /// - **previous**: Read-only, holds entries from previous rotation window
 148  ///
 149  /// Every `SEEN_FILTER_ROTATION_SECS` seconds:
 150  /// 1. Discard `previous`
 151  /// 2. Move `current` → `previous` 
 152  /// 3. Create new empty `current`
 153  ///
 154  /// This gives entries an effective lifetime of 1.5x to 2x the rotation interval,
 155  /// which maps to the original TTL behavior.
 156  ///
 157  /// ## Tradeoffs vs HashMap
 158  /// 
 159  /// | Aspect | HashMap | CuckooFilter |
 160  /// |--------|---------|-------------|
 161  /// | Memory | O(n) | O(log n) |
 162  /// | Lookup | O(1) exact | O(1) probabilistic |
 163  /// | False positives | None | ~0.1% |
 164  /// | False negatives | None | None |
 165  ///
 166  /// False positives mean we might drop a legitimate unique message that
 167  /// hashes to the same fingerprint as a seen one. At 0.1% FP rate and
 168  /// typical gossip volumes, this is acceptable.
 169  pub struct SeenFilter {
 170      current: SendCuckooFilter,
 171      previous: Option<SendCuckooFilter>,
 172      last_rotation: Instant,
 173  }
 174  
 175  impl SeenFilter {
 176      /// Create a new dual-filter seen set.
 177      pub fn new() -> Self {
 178          Self {
 179              current: new_cuckoo_filter(),
 180              previous: None,
 181              last_rotation: Instant::now(),
 182          }
 183      }
 184  
 185      /// Check if a message has been seen in either filter.
 186      pub fn contains(&self, circle_id: &[u8; 32], msg_id: u64) -> bool {
 187          let fp = MessageFingerprint::new(circle_id, msg_id);
 188          
 189          // Check current filter first (most likely hit)
 190          if self.current.contains(&fp) {
 191              return true;
 192          }
 193          
 194          // Fall back to previous filter
 195          if let Some(ref prev) = self.previous {
 196              return prev.contains(&fp);
 197          }
 198          
 199          false
 200      }
 201  
 202      /// Insert a message fingerprint into the current filter.
 203      pub fn insert(&mut self, circle_id: &[u8; 32], msg_id: u64) {
 204          let fp = MessageFingerprint::new(circle_id, msg_id);
 205          self.current.insert(&fp);
 206      }
 207  
 208      /// Rotate filters if enough time has passed.
 209      /// Returns true if rotation occurred.
 210      pub fn maybe_rotate(&mut self) -> bool {
 211          if self.last_rotation.elapsed().as_secs() >= SEEN_FILTER_ROTATION_SECS {
 212              // Discard previous, promote current, create new
 213              self.previous = Some(std::mem::replace(
 214                  &mut self.current,
 215                  new_cuckoo_filter(),
 216              ));
 217              self.last_rotation = Instant::now();
 218              true
 219          } else {
 220              false
 221          }
 222      }
 223  
 224      /// Get approximate capacity for diagnostics.
 225      pub fn capacity(&self) -> usize {
 226          let current_cap = self.current.capacity();
 227          let prev_cap = self.previous.as_ref().map(|p| p.capacity()).unwrap_or(0);
 228          current_cap + prev_cap
 229      }
 230  
 231      /// Force cleanup by discarding previous filter.
 232      /// Use when memory pressure is high.
 233      pub fn force_cleanup(&mut self) {
 234          self.previous = None;
 235      }
 236  }
 237  
 238  impl Default for SeenFilter {
 239      fn default() -> Self {
 240          Self::new()
 241      }
 242  }
 243  
 244  /// Maximum onion routing depth before rejecting frame (DoS protection)
 245  /// Prevents stack overflow from deeply nested Route frames.
 246  const MAX_ROUTE_DEPTH: usize = 8;
 247  
 248  // ============================================================================
 249  // DHT Type Conversion Helpers
 250  // ============================================================================
 251  
 252  /// Get current unix timestamp in seconds
 253  fn unix_now() -> u64 {
 254      SystemTime::now()
 255          .duration_since(UNIX_EPOCH)
 256          .unwrap_or_default()
 257          .as_secs()
 258  }
 259  
 260  /// Convert a wire-format DhtNodeInfo to a full domain NodeInfo
 261  /// 
 262  /// The wire format is minimal (just key + addr bytes), so we populate
 263  /// defaults for fields that aren't transmitted.
 264  fn wire_to_domain_node(wire: &DhtNodeInfo, now: u64) -> DhtNodeInfoFull {
 265      // Derive DHT key from the public key
 266      let id = abzu_dht::key::node_id(&wire.key);
 267      
 268      // Convert address bytes to string (assuming UTF-8 encoded address)
 269      // PRIVACY: addr may be None for Ghost Mode nodes
 270      let address = wire.addr
 271          .as_ref()
 272          .map(|a| String::from_utf8_lossy(a).to_string())
 273          .unwrap_or_default();
 274      
 275      DhtNodeInfoFull {
 276          id,
 277          pubkey: wire.key,
 278          address,
 279          tree_coords: None, // Wire format doesn't include tree coords
 280          state: NodeState::Good, // Assume good if they're responding
 281          failures: 0,
 282          last_seen: now,
 283          first_seen: now, // Will be updated if node already exists
 284          rtt_ms: None,
 285      }
 286  }
 287  
 288  /// Convert a domain NodeInfo to wire-format DhtNodeInfo
 289  fn domain_to_wire_node(node: &DhtNodeInfoFull) -> DhtNodeInfo {
 290      DhtNodeInfo {
 291          key: node.pubkey,
 292          // PRIVACY: Use None for empty addresses (Ghost Mode nodes)
 293          addr: if node.address.is_empty() {
 294              None
 295          } else {
 296              Some(node.address.as_bytes().to_vec())
 297          },
 298          relay_descriptor: None, // TODO: Populate for Ghost Mode relay routing
 299      }
 300  }
 301  
 302  /// Convert a PeerKey ([u8; 32]) to a hex string for address storage
 303  fn peer_key_to_hex(key: &[u8; 32]) -> String {
 304      key.iter().map(|b| format!("{:02x}", b)).collect()
 305  }
 306  
 307  
 308  impl Switchboard {
 309      /// Create a new switchboard for a node
 310      pub fn new(node: Arc<Node>) -> Self {
 311          // Derive DHT key from our node's public key
 312          let our_key: DhtKey = abzu_dht::key::node_id(&node.peer_key());
 313          
 314          Self { 
 315              node,
 316              seen_filter: Arc::new(RwLock::new(SeenFilter::new())),
 317              dht_routing: Arc::new(RwLock::new(RoutingTable::new(our_key))),
 318              dht_values: Arc::new(RwLock::new(ValueStore::new())),
 319              pending_dht_ops: Arc::new(RwLock::new(HashMap::new())),
 320              dht_rate_limiter: Arc::new(RwLock::new(RateLimiter::new())),
 321          }
 322      }
 323      
 324      /// Generate a unique request ID for DHT operations
 325      /// 
 326      /// SECURITY: Uses random IDs to prevent response hijacking (H4 in audit).
 327      /// Sequential counters are predictable; attackers could pre-compute future IDs.
 328      fn next_request_id(&self) -> u64 {
 329          use rand::Rng;
 330          rand::thread_rng().r#gen()
 331      }
 332      
 333      /// Send a DHT request to a target peer and await the response.
 334      /// 
 335      /// This is the primitive for active DHT operations. It:
 336      /// 1. Generates a unique request ID
 337      /// 2. Registers a pending operation with a oneshot channel
 338      /// 3. Returns the request_id and receiver for the caller to use
 339      /// 
 340      /// The caller is responsible for:
 341      /// - Sending the frame via the transport layer
 342      /// - Awaiting the receiver with appropriate timeout
 343      /// - Handling timeout cleanup
 344      /// 
 345      /// Returns (request_id, response_receiver), or None if too many pending ops (H3 protection)
 346      pub async fn register_dht_op(&self) -> Option<(u64, oneshot::Receiver<DhtResponseKind>)> {
 347          // SECURITY: Limit pending operations to prevent memory exhaustion (H3 in audit).
 348          const MAX_PENDING_OPS: usize = 1000;
 349          
 350          let request_id = self.next_request_id();
 351          let (tx, rx) = oneshot::channel();
 352          let now = unix_now();
 353          
 354          {
 355              let mut pending = self.pending_dht_ops.write().await;
 356              if pending.len() >= MAX_PENDING_OPS {
 357                  warn!(
 358                      pending_count = pending.len(),
 359                      "Too many pending DHT operations, rejecting new operation"
 360                  );
 361                  return None;
 362              }
 363              pending.insert(request_id, (now, tx));
 364          }
 365          
 366          trace!(request_id, "Registered pending DHT operation");
 367          Some((request_id, rx))
 368      }
 369      
 370      /// Cancel a pending DHT operation (e.g., on timeout).
 371      /// 
 372      /// Removes the pending operation from the registry without completing it.
 373      pub async fn cancel_dht_op(&self, request_id: u64) {
 374          let mut pending = self.pending_dht_ops.write().await;
 375          if pending.remove(&request_id).is_some() {
 376              debug!(request_id, "Cancelled pending DHT operation");
 377          }
 378      }
 379      
 380      /// SECURITY: Reap expired pending DHT operations (M4 in audit).
 381      ///
 382      /// Removes operations older than the specified timeout to prevent memory leaks
 383      /// from failed/orphaned operations that never received a response.
 384      /// Returns the count of reaped operations.
 385      pub async fn reap_expired_dht_ops(&self, max_age_secs: u64) -> usize {
 386          let now = unix_now();
 387          let mut pending = self.pending_dht_ops.write().await;
 388          let before_count = pending.len();
 389          
 390          pending.retain(|_request_id, (created_at, _sender)| {
 391              now.saturating_sub(*created_at) < max_age_secs
 392          });
 393          
 394          let reaped = before_count.saturating_sub(pending.len());
 395          if reaped > 0 {
 396              debug!(reaped, remaining = pending.len(), "Reaped expired DHT operations");
 397          }
 398          reaped
 399      }
 400      
 401      /// Get our DHT node key (derived from peer key).
 402      pub fn our_dht_key(&self) -> DhtKey {
 403          abzu_dht::key::node_id(&self.node.peer_key())
 404      }
 405      
 406      /// Get our peer key bytes for DHT messages.
 407      pub fn our_peer_key(&self) -> [u8; 32] {
 408          self.node.peer_key()
 409      }
 410      
 411      /// Add a connected peer as a DHT seed node.
 412      /// 
 413      /// This seeds the routing table with the peer so DHT queries
 414      /// can use it as an initial contact point.
 415      pub async fn add_dht_seed(&self, peer_key: PeerKey) {
 416          let id = abzu_dht::key::node_id(&peer_key);
 417          let now = unix_now();
 418          let node_info = DhtNodeInfoFull {
 419              id,
 420              pubkey: peer_key,
 421              address: String::new(), // Will be filled in when we have transport info
 422              tree_coords: None,
 423              state: NodeState::Good,
 424              failures: 0,
 425              last_seen: now,
 426              first_seen: now,
 427              rtt_ms: None,
 428          };
 429          
 430          let mut rt = self.dht_routing.write().await;
 431          rt.add(node_info, now);
 432          debug!(peer = ?&peer_key[..8], "Added DHT seed node");
 433      }
 434      
 435      /// Send a frame to a specific peer.
 436      /// 
 437      /// Returns Ok(()) if the frame was sent, or an error if the peer is not connected
 438      /// or the send failed.
 439      pub async fn send_frame_to_peer(&self, peer_key: &PeerKey, frame: &AbzuFrame) -> Result<(), NodeError> {
 440          use abzu_transport::AbzuInterfaceExt;
 441          
 442          let peers_arc = self.node.peers();
 443          let mut peers = peers_arc.lock().await;
 444  
 445          if let Some(conn) = peers.get_mut(peer_key) {
 446              match conn.interface.send_frame(frame).await {
 447                  Ok(()) => {
 448                      conn.touch();
 449                      Ok(())
 450                  }
 451                  Err(e) => Err(NodeError::Transport(e)),
 452              }
 453          } else {
 454              Err(NodeError::NoPeerConnection)
 455          }
 456      }
 457      
 458      /// Send a DHT ping to a peer and await the response.
 459      /// 
 460      /// This is the simplest active DHT operation. It verifies the peer is reachable
 461      /// and updates the routing table on success.
 462      /// 
 463      /// Returns Ok(()) if the peer responded with Pong, or an error on timeout/failure.
 464      pub async fn dht_ping(&self, peer_key: &PeerKey, timeout_ms: u64) -> Result<(), NodeError> {
 465          use std::time::Duration;
 466          
 467          // Register the pending operation
 468          let (request_id, rx) = self.register_dht_op().await
 469              .ok_or(NodeError::OperationRejected)?;
 470          
 471          // Construct and send the ping frame
 472          let frame = AbzuFrame::dht_ping(request_id, self.our_peer_key());
 473          if let Err(e) = self.send_frame_to_peer(peer_key, &frame).await {
 474              self.cancel_dht_op(request_id).await;
 475              return Err(e);
 476          }
 477          
 478          debug!(request_id, peer = ?&peer_key[..8], "DHT Ping sent");
 479          
 480          // Await response with timeout
 481          let timeout = Duration::from_millis(timeout_ms);
 482          match tokio::time::timeout(timeout, rx).await {
 483              Ok(Ok(DhtResponseKind::Pong)) => {
 484                  trace!(request_id, "DHT Ping succeeded");
 485                  Ok(())
 486              }
 487              Ok(Ok(DhtResponseKind::Error { code, message })) => {
 488                  warn!(request_id, code, %message, "DHT Ping failed with error");
 489                  Err(NodeError::DhtError(format!("DHT error {}: {}", code, message)))
 490              }
 491              Ok(Ok(_)) => {
 492                  warn!(request_id, "DHT Ping received unexpected response type");
 493                  Err(NodeError::DhtError("Unexpected response type".to_string()))
 494              }
 495              Ok(Err(_)) => {
 496                  // Channel closed - response handler dropped the sender
 497                  Err(NodeError::DhtError("Response channel closed".to_string()))
 498              }
 499              Err(_) => {
 500                  // Timeout - clean up and return error
 501                  self.cancel_dht_op(request_id).await;
 502                  debug!(request_id, "DHT Ping timed out");
 503                  Err(NodeError::Timeout)
 504              }
 505          }
 506      }
 507      
 508      /// Find the K closest nodes to a target key using iterative lookup.
 509      /// 
 510      /// This drives the NodeLookup state machine, sending parallel FindNode
 511      /// requests and processing responses until the lookup converges.
 512      /// 
 513      /// Returns the K closest nodes discovered (may be fewer if network is sparse).
 514      pub async fn dht_find_node(
 515          &self, 
 516          target: &DhtKey,
 517          timeout_ms: u64,
 518      ) -> Result<Vec<DhtNodeInfoFull>, NodeError> {
 519          use futures::future::join_all;
 520          use std::time::Duration;
 521          
 522          let timeout = Duration::from_millis(timeout_ms);
 523          let now = unix_now();
 524          
 525          // Get our node ID and seed nodes from local routing table
 526          let our_id = self.our_dht_key();
 527          let seeds: Vec<DhtNodeInfoFull> = {
 528              let rt = self.dht_routing.read().await;
 529              rt.closest(target, K).nodes
 530          };
 531          
 532          if seeds.is_empty() {
 533              debug!("No seed nodes for lookup");
 534              return Ok(Vec::new());
 535          }
 536          
 537          debug!(target = ?&target[..8], seeds = seeds.len(), "Starting node lookup");
 538          
 539          // Create the lookup state machine
 540          let mut lookup = NodeLookup::new(*target, our_id, seeds.clone());
 541          
 542          // Iterative lookup loop
 543          loop {
 544              // Get next batch of nodes to query
 545              let to_query = lookup.next_queries();
 546              
 547              if to_query.is_empty() {
 548                  // No more queries needed - lookup is complete
 549                  break;
 550              }
 551              
 552              trace!(querying = to_query.len(), "Sending FindNode batch");
 553              
 554              // Send all queries in parallel, collect results
 555              let query_futures: Vec<_> = to_query.iter().map(|node| {
 556                  self.send_find_node_query(node, target, timeout)
 557              }).collect();
 558              
 559              let results = join_all(query_futures).await;
 560              
 561              // Process each result
 562              for (node, result) in to_query.iter().zip(results) {
 563                  match result {
 564                      Ok(nodes) => {
 565                          // Success - feed nodes back to lookup
 566                          lookup.on_response(&node.id, nodes.clone());
 567                          
 568                          // Update routing table with responding node
 569                          let mut rt = self.dht_routing.write().await;
 570                          if let Some(entry) = rt.get_mut(&node.id) {
 571                              entry.mark_good(now, None);
 572                          }
 573                          
 574                          // Also add newly discovered nodes
 575                          for discovered in nodes {
 576                              let _ = rt.add(discovered, now);
 577                          }
 578                      }
 579                      Err(e) => {
 580                          // Failure - mark node as failed
 581                          trace!(node = ?&node.id[..8], error = %e, "Query failed");
 582                          lookup.on_failure(&node.id);
 583                          
 584                          // Update routing table failure count
 585                          let mut rt = self.dht_routing.write().await;
 586                          if let Some(entry) = rt.get_mut(&node.id) {
 587                              entry.mark_failed();
 588                          }
 589                      }
 590                  }
 591              }
 592          }
 593          
 594          // Extract the K closest nodes that responded
 595          let closest: Vec<DhtNodeInfoFull> = lookup.closest_responding()
 596              .into_iter()
 597              .cloned()
 598              .collect();
 599          
 600          debug!(found = closest.len(), "Node lookup complete");
 601          Ok(closest)
 602      }
 603      
 604      /// Find a value by key, or the K closest nodes if not found.
 605      /// 
 606      /// This drives the ValueLookup state machine, returning early if
 607      /// the value is discovered, or continuing until the lookup converges.
 608      pub async fn dht_find_value(
 609          &self,
 610          target: &DhtKey,
 611          timeout_ms: u64,
 612      ) -> Result<FindValueResult, NodeError> {
 613          use futures::future::join_all;
 614          use std::time::Duration;
 615          
 616          let timeout = Duration::from_millis(timeout_ms);
 617          let now = unix_now();
 618          
 619          // Check local store first
 620          {
 621              let store = self.dht_values.read().await;
 622              let values = store.get(target, now);
 623              if !values.is_empty() {
 624                  debug!(target = ?&target[..8], "Value found locally");
 625                  return Ok(FindValueResult::Found(values.into_iter().cloned().collect()));
 626              }
 627          }
 628          
 629          // ROLE CHECK: Edge nodes can only query local cache, not network
 630          if !self.node.node_role().participates_in_dht() {
 631              debug!("Edge node: value not in local cache, skipping network lookup");
 632              return Ok(FindValueResult::NotFound(Vec::new()));
 633          }
 634          
 635          // Get our node ID and seed nodes
 636          let our_id = self.our_dht_key();
 637          let seeds: Vec<DhtNodeInfoFull> = {
 638              let rt = self.dht_routing.read().await;
 639              rt.closest(target, K).nodes
 640          };
 641          
 642          if seeds.is_empty() {
 643              debug!("No seed nodes for value lookup");
 644              return Ok(FindValueResult::NotFound(Vec::new()));
 645          }
 646          
 647          debug!(target = ?&target[..8], seeds = seeds.len(), "Starting value lookup");
 648          
 649          // Create the value lookup state machine
 650          let mut lookup = ValueLookup::new(*target, our_id, seeds);
 651          
 652          // Iterative lookup loop
 653          loop {
 654              let to_query = lookup.next_queries();
 655              
 656              if to_query.is_empty() {
 657                  break;
 658              }
 659              
 660              trace!(querying = to_query.len(), "Sending FindValue batch");
 661              
 662              let query_futures: Vec<_> = to_query.iter().map(|node| {
 663                  self.send_find_value_query(node, target, timeout)
 664              }).collect();
 665              
 666              let results = join_all(query_futures).await;
 667              
 668              for (node, result) in to_query.iter().zip(results) {
 669                  match result {
 670                      Ok(FindValueQueryResult::Value(value)) => {
 671                          // Got a value! Feed to lookup and cache locally
 672                          lookup.on_value(&node.id, vec![value.clone()], Vec::new());
 673                          
 674                          // Cache the value
 675                          let mut store = self.dht_values.write().await;
 676                          let _ = store.store(value, now);
 677                          
 678                          // Update routing table
 679                          let mut rt = self.dht_routing.write().await;
 680                          if let Some(entry) = rt.get_mut(&node.id) {
 681                              entry.mark_good(now, None);
 682                          }
 683                      }
 684                      Ok(FindValueQueryResult::Nodes(nodes)) => {
 685                          // Got nodes, continue lookup
 686                          lookup.on_nodes(&node.id, nodes.clone());
 687                          
 688                          // Update routing table
 689                          let mut rt = self.dht_routing.write().await;
 690                          if let Some(entry) = rt.get_mut(&node.id) {
 691                              entry.mark_good(now, None);
 692                          }
 693                          for discovered in nodes {
 694                              let _ = rt.add(discovered, now);
 695                          }
 696                      }
 697                      Err(e) => {
 698                          trace!(node = ?&node.id[..8], error = %e, "Query failed");
 699                          lookup.on_failure(&node.id);
 700                          
 701                          let mut rt = self.dht_routing.write().await;
 702                          if let Some(entry) = rt.get_mut(&node.id) {
 703                              entry.mark_failed();
 704                          }
 705                      }
 706                  }
 707              }
 708              
 709              // Check if we found a value
 710              if !lookup.values().is_empty() {
 711                  break;
 712              }
 713          }
 714          
 715          // Return results
 716          let values = lookup.values().to_vec();
 717          if !values.is_empty() {
 718              debug!(count = values.len(), "Value(s) found");
 719              Ok(FindValueResult::Found(values))
 720          } else {
 721              let closest: Vec<DhtNodeInfoFull> = lookup.closest_responding()
 722                  .into_iter()
 723                  .cloned()
 724                  .collect();
 725              debug!(nodes = closest.len(), "Value not found, returning closest nodes");
 726              Ok(FindValueResult::NotFound(closest))
 727          }
 728      }
 729      
 730      /// Send a single FindNode query to a peer and await the response.
 731      async fn send_find_node_query(
 732          &self,
 733          node: &DhtNodeInfoFull,
 734          target: &DhtKey,
 735          timeout: std::time::Duration,
 736      ) -> Result<Vec<DhtNodeInfoFull>, NodeError> {
 737          // Register the pending operation
 738          let (request_id, rx) = self.register_dht_op().await
 739              .ok_or(NodeError::OperationRejected)?;
 740          
 741          // Construct the FindNode frame
 742          let frame = AbzuFrame::dht_find_node(request_id, self.our_peer_key(), *target);
 743          
 744          // Send to peer (by pubkey)
 745          if let Err(e) = self.send_frame_to_peer(&node.pubkey, &frame).await {
 746              self.cancel_dht_op(request_id).await;
 747              return Err(e);
 748          }
 749          
 750          // Await response
 751          match tokio::time::timeout(timeout, rx).await {
 752              Ok(Ok(DhtResponseKind::Nodes { nodes: wire_nodes })) => {
 753                  let now = unix_now();
 754                  let nodes: Vec<DhtNodeInfoFull> = wire_nodes
 755                      .into_iter()
 756                      .map(|n| wire_to_domain_node(&n, now))
 757                      .collect();
 758                  Ok(nodes)
 759              }
 760              Ok(Ok(DhtResponseKind::Error { code, message })) => {
 761                  Err(NodeError::DhtError(format!("DHT error {}: {}", code, message)))
 762              }
 763              Ok(Ok(_)) => {
 764                  Err(NodeError::DhtError("Unexpected response type".to_string()))
 765              }
 766              Ok(Err(_)) => {
 767                  Err(NodeError::DhtError("Response channel closed".to_string()))
 768              }
 769              Err(_) => {
 770                  self.cancel_dht_op(request_id).await;
 771                  Err(NodeError::Timeout)
 772              }
 773          }
 774      }
 775      
 776      /// Send a single FindValue query to a peer and await the response.
 777      async fn send_find_value_query(
 778          &self,
 779          node: &DhtNodeInfoFull,
 780          target: &DhtKey,
 781          timeout: std::time::Duration,
 782      ) -> Result<FindValueQueryResult, NodeError> {
 783          let (request_id, rx) = self.register_dht_op().await
 784              .ok_or(NodeError::OperationRejected)?;
 785          
 786          let frame = AbzuFrame::dht_find_value(request_id, self.our_peer_key(), *target);
 787          
 788          if let Err(e) = self.send_frame_to_peer(&node.pubkey, &frame).await {
 789              self.cancel_dht_op(request_id).await;
 790              return Err(e);
 791          }
 792          
 793          match tokio::time::timeout(timeout, rx).await {
 794              Ok(Ok(DhtResponseKind::Nodes { nodes: wire_nodes })) => {
 795                  let now = unix_now();
 796                  let nodes: Vec<DhtNodeInfoFull> = wire_nodes
 797                      .into_iter()
 798                      .map(|n| wire_to_domain_node(&n, now))
 799                      .collect();
 800                  Ok(FindValueQueryResult::Nodes(nodes))
 801              }
 802              Ok(Ok(DhtResponseKind::Value { value: data, publisher, ttl_remaining_secs })) => {
 803                  // Construct StoredValue from wire response
 804                  // We use Application type for generic values; caller can interpret payload
 805                  let value = StoredValue::new(
 806                      *target,
 807                      ValueType::Application,
 808                      publisher,
 809                      data,
 810                      unix_now(),
 811                      Some(ttl_remaining_secs as u64),
 812                  );
 813                  Ok(FindValueQueryResult::Value(value))
 814              }
 815              Ok(Ok(DhtResponseKind::Error { code, message })) => {
 816                  Err(NodeError::DhtError(format!("DHT error {}: {}", code, message)))
 817              }
 818              Ok(Ok(_)) => {
 819                  Err(NodeError::DhtError("Unexpected response type".to_string()))
 820              }
 821              Ok(Err(_)) => {
 822                  Err(NodeError::DhtError("Response channel closed".to_string()))
 823              }
 824              Err(_) => {
 825                  self.cancel_dht_op(request_id).await;
 826                  Err(NodeError::Timeout)
 827              }
 828          }
 829      }
 830      
 831      /// Store a value in the DHT network.
 832      /// 
 833      /// Stores locally first, then replicates to K closest nodes.
 834      /// Returns a StoreResult indicating success/failure counts.
 835      pub async fn dht_store(
 836          &self,
 837          value: StoredValue,
 838          timeout_ms: u64,
 839      ) -> Result<StoreResult, NodeError> {
 840          let target = value.key;
 841          let timeout = Duration::from_millis(timeout_ms);
 842          let now = unix_now();
 843          
 844          // Step 1: Store locally
 845          let stored_locally = {
 846              let mut store = self.dht_values.write().await;
 847              store.store(value.clone(), now).is_ok()
 848          };
 849          
 850          if stored_locally {
 851              debug!(key = ?&target[..8], "Value stored locally");
 852          }
 853          
 854          // ROLE CHECK: Edge nodes store locally only, no network replication
 855          if !self.node.node_role().participates_in_dht() {
 856              debug!("Edge node: stored locally, skipping network replication");
 857              return Ok(StoreResult {
 858                  stored_locally,
 859                  nodes_stored: 0,
 860                  nodes_failed: 0,
 861              });
 862          }
 863          
 864          // Step 2: Find K closest nodes
 865          let closest_nodes = {
 866              let rt = self.dht_routing.read().await;
 867              rt.closest(&target, K)
 868          };
 869          
 870          if closest_nodes.nodes.is_empty() {
 871              // No nodes to replicate to - local only
 872              return Ok(StoreResult {
 873                  stored_locally,
 874                  nodes_stored: 0,
 875                  nodes_failed: 0,
 876              });
 877          }
 878          
 879          debug!(
 880              key = ?&target[..8],
 881              nodes = closest_nodes.nodes.len(),
 882              "Replicating value to closest nodes"
 883          );
 884          
 885          // Step 3: Send STORE to all closest nodes in parallel
 886          let store_futures: Vec<_> = closest_nodes.nodes
 887              .iter()
 888              .map(|node| self.send_store_to_node(node, &value, timeout))
 889              .collect();
 890          
 891          let results = futures::future::join_all(store_futures).await;
 892          
 893          // Step 4: Count successes and failures
 894          let mut nodes_stored = 0usize;
 895          let mut nodes_failed = 0usize;
 896          
 897          for result in results {
 898              match result {
 899                  Ok(true) => nodes_stored += 1,
 900                  Ok(false) => nodes_failed += 1,
 901                  Err(_) => nodes_failed += 1,
 902              }
 903          }
 904          
 905          debug!(
 906              key = ?&target[..8],
 907              stored = nodes_stored,
 908              failed = nodes_failed,
 909              "DHT store complete"
 910          );
 911          
 912          Ok(StoreResult {
 913              stored_locally,
 914              nodes_stored,
 915              nodes_failed,
 916          })
 917      }
 918      
 919      /// Send a STORE request to a single node and await acknowledgment.
 920      async fn send_store_to_node(
 921          &self,
 922          node: &DhtNodeInfoFull,
 923          value: &StoredValue,
 924          timeout: Duration,
 925      ) -> Result<bool, NodeError> {
 926          let (request_id, rx) = self.register_dht_op().await
 927              .ok_or(NodeError::OperationRejected)?;
 928          
 929          // Calculate TTL remaining
 930          let now = unix_now();
 931          let ttl_secs = value.metadata.expires_at.saturating_sub(now) as u32;
 932          
 933          let frame = AbzuFrame::dht_store(
 934              request_id,
 935              self.our_peer_key(),
 936              value.key,
 937              value.payload.clone(),
 938              ttl_secs,
 939              value.signature.to_vec(),
 940          );
 941          
 942          if let Err(e) = self.send_frame_to_peer(&node.pubkey, &frame).await {
 943              self.cancel_dht_op(request_id).await;
 944              return Err(e);
 945          }
 946          
 947          match tokio::time::timeout(timeout, rx).await {
 948              Ok(Ok(DhtResponseKind::StoreAck { success })) => {
 949                  Ok(success)
 950              }
 951              Ok(Ok(DhtResponseKind::Error { code, message })) => {
 952                  warn!(node = ?&node.id[..8], code, message, "Store rejected");
 953                  Ok(false)
 954              }
 955              Ok(Ok(_)) => {
 956                  Ok(false) // Unexpected response
 957              }
 958              Ok(Err(_)) => {
 959                  Err(NodeError::DhtError("Response channel closed".to_string()))
 960              }
 961              Err(_) => {
 962                  self.cancel_dht_op(request_id).await;
 963                  Err(NodeError::Timeout)
 964              }
 965          }
 966      }
 967      
 968      /// Refresh (republish) values that are approaching expiration.
 969      /// 
 970      /// Finds values published by this node that are below the TTL threshold
 971      /// and re-replicates them to K closest nodes.
 972      /// 
 973      /// Returns the number of values successfully refreshed.
 974      pub async fn dht_refresh_values(&self, threshold_secs: u64, timeout_ms: u64) -> usize {
 975          let our_key = self.our_peer_key();
 976          let now = unix_now();
 977          
 978          // Get values needing refresh
 979          let values_to_refresh = {
 980              let store = self.dht_values.read().await;
 981              store.get_values_needing_refresh(&our_key, threshold_secs, now)
 982          };
 983          
 984          if values_to_refresh.is_empty() {
 985              return 0;
 986          }
 987          
 988          debug!(
 989              count = values_to_refresh.len(),
 990              threshold_secs,
 991              "Refreshing DHT values"
 992          );
 993          
 994          let mut refreshed = 0;
 995          
 996          for value in values_to_refresh {
 997              match self.dht_store(value, timeout_ms).await {
 998                  Ok(result) if result.nodes_stored > 0 => {
 999                      refreshed += 1;
1000                  }
1001                  Ok(_) => {
1002                      // Stored locally but no remote nodes - still counts
1003                      refreshed += 1;
1004                  }
1005                  Err(e) => {
1006                      warn!(error = %e, "Failed to refresh value");
1007                  }
1008              }
1009          }
1010          
1011          info!(refreshed, "DHT value refresh complete");
1012          refreshed
1013      }
1014      
1015      /// Spawn a background task to periodically refresh values.
1016      /// 
1017      /// The task runs until the shutdown signal is received or the JoinHandle is aborted.
1018      /// SECURITY: Also runs pending ops reaper (M4) and accepts graceful shutdown (M6 in audit).
1019      pub fn start_refresh_task(
1020          self: Arc<Self>,
1021          interval_secs: u64,
1022          threshold_secs: u64,
1023          shutdown_rx: oneshot::Receiver<()>,
1024      ) -> tokio::task::JoinHandle<()> {
1025          let timeout_ms = 10_000; // 10s timeout per store
1026          const PENDING_OP_MAX_AGE_SECS: u64 = 120; // 2x typical timeout (M4)
1027          
1028          tokio::spawn(async move {
1029              let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1030              interval.tick().await; // Skip immediate first tick
1031              
1032              tokio::pin!(shutdown_rx);
1033              
1034              loop {
1035                  tokio::select! {
1036                      _ = interval.tick() => {
1037                          // Run value refresh
1038                          let refreshed = self.dht_refresh_values(threshold_secs, timeout_ms).await;
1039                          debug!(refreshed, "Periodic refresh cycle complete");
1040                          
1041                          // SECURITY: Reap expired pending operations (M4 in audit)
1042                          self.reap_expired_dht_ops(PENDING_OP_MAX_AGE_SECS).await;
1043                      }
1044                      _ = &mut shutdown_rx => {
1045                          info!("DHT refresh task received shutdown signal");
1046                          break;
1047                      }
1048                  }
1049              }
1050              
1051              info!("DHT refresh task terminated gracefully");
1052          })
1053      }
1054      
1055      /// Handle an incoming frame from a peer
1056      ///
1057      /// Returns the routing result indicating what happened to the frame.
1058      pub async fn handle_frame(&self, frame: AbzuFrame, source: PeerKey) -> RouteResult {
1059          trace!(source = ?source, frame = ?std::mem::discriminant(&frame), "Handling frame");
1060  
1061          match frame {
1062              AbzuFrame::KeepAlive => {
1063                  // KeepAlive is just to maintain the connection
1064                  // Update last activity for the source peer
1065                  self.touch_peer(&source).await;
1066                  RouteResult::Local
1067              }
1068  
1069              AbzuFrame::Chunk { cid, data } => {
1070                  // Content-addressed chunk - store it locally
1071                  self.handle_chunk(cid, data, source).await
1072              }
1073  
1074              AbzuFrame::Route { target, next_hop, payload } => {
1075                  // Routing frame - check if we're the target or need to forward
1076                  // Start at depth 0 for external frames
1077                  self.handle_route(target, next_hop, payload, source, 0).await
1078              }
1079  
1080              AbzuFrame::Hello { ephemeral_pub, timestamp, .. } => {
1081                  // Handshake initiation (version already negotiated at transport layer)
1082                  self.handle_hello(ephemeral_pub, timestamp, source).await
1083              }
1084  
1085              AbzuFrame::HelloAck { ephemeral_pub, confirmation, .. } => {
1086                  // Handshake response (version already negotiated at transport layer)
1087                  self.handle_hello_ack(ephemeral_pub, confirmation, source).await
1088              }
1089  
1090              AbzuFrame::Request { cid, requester } => {
1091                  // Content request - check if we have it and send back
1092                  self.handle_request(cid, requester, source).await
1093              }
1094  
1095              AbzuFrame::Chat { id, to, msg, timestamp } => {
1096                  // Chat message - store and send ack
1097                  self.handle_chat(id, to, msg, timestamp, source).await
1098              }
1099  
1100              AbzuFrame::ChatAck { id } => {
1101                  // Delivery acknowledgment
1102                  self.handle_chat_ack(id, source).await
1103              }
1104  
1105              AbzuFrame::ReadReceipt { id } => {
1106                  // Read receipt - message was read by recipient
1107                  self.handle_read_receipt(id, source).await
1108              }
1109  
1110              AbzuFrame::Cover { .. } => {
1111                  // Cover traffic - silently discard after decryption
1112                  // This is the post-decrypt discrimination point:
1113                  // on the wire, Cover frames are indistinguishable from real traffic.
1114                  // Only after decryption can we identify and discard them.
1115                  trace!("Discarding cover frame");
1116                  RouteResult::Local
1117              }
1118  
1119              AbzuFrame::Announce { peer_key, public_addr, nat_type, timestamp } => {
1120                  // Peer address announcement for NAT traversal
1121                  self.handle_announce(peer_key, public_addr, nat_type, timestamp, source).await
1122              }
1123  
1124              AbzuFrame::ServiceRequest { service_id, request_id, requester, payload } => {
1125                  self.handle_service_request(service_id, request_id, requester, payload, source).await
1126              }
1127  
1128              AbzuFrame::ServiceResponse { service_id, request_id, payload, status } => {
1129                  self.handle_service_response(service_id, request_id, payload, status, source).await
1130              }
1131  
1132              // ─────────────────────────────────────────────────────────────────
1133              // Circle (Group) Frames
1134              // ─────────────────────────────────────────────────────────────────
1135  
1136              AbzuFrame::CircleCreate { id, name, founder_key, timestamp } => {
1137                  // Handle incoming circle creation (for sync/replication)
1138                  self.handle_circle_create(id, name, founder_key, timestamp, source).await
1139              }
1140  
1141              AbzuFrame::CircleInvite { circle_id, invitee, signature } => {
1142                  // Handle circle invitation
1143                  self.handle_circle_invite(circle_id, invitee, signature, source).await
1144              }
1145  
1146              AbzuFrame::CircleJoin { circle_id, member_key, epoch } => {
1147                  // Handle member joining
1148                  self.handle_circle_join(circle_id, member_key, epoch, source).await
1149              }
1150  
1151              AbzuFrame::CircleLeave { circle_id, member_key } => {
1152                  // Handle member leaving
1153                  self.handle_circle_leave(circle_id, member_key, source).await
1154              }
1155  
1156              AbzuFrame::CircleMessage { circle_id, id, encrypted_envelope } => {
1157                  // Handle incoming circle message with privacy-preserving envelope
1158                  self.handle_circle_message(circle_id, id, encrypted_envelope, source).await
1159              }
1160  
1161              AbzuFrame::CircleAck { circle_id, msg_id, member_key } => {
1162                  // Handle circle message acknowledgment
1163                  self.handle_circle_ack(circle_id, msg_id, member_key, source).await
1164              }
1165  
1166              AbzuFrame::CircleVouch { circle_id, voucher, target, signature, timestamp } => {
1167                  // Handle secondary endorsement (vouch) for a member
1168                  self.handle_circle_vouch(circle_id, voucher, target, signature, timestamp, source).await
1169              }
1170  
1171              AbzuFrame::CirclePrune { circle_id, pruner, target, signature, reason_hash, timestamp } => {
1172                  // Handle member pruning (moderation action)
1173                  self.handle_circle_prune(circle_id, pruner, target, signature, reason_hash, timestamp, source).await
1174              }
1175  
1176              AbzuFrame::GossipHave { circle_id, msg_ids, epoch } => {
1177                  // Handle gossip "I have" announcement for lazy repair
1178                  self.handle_gossip_have(circle_id, msg_ids, epoch, source).await
1179              }
1180  
1181              // ─────────────────────────────────────────────────────────────────
1182              // DHT (Distributed Hash Table) Frames
1183              // ─────────────────────────────────────────────────────────────────
1184  
1185              AbzuFrame::DhtRequest { request_id, sender_key, kind } => {
1186                  // Handle incoming DHT request
1187                  self.handle_dht_request(request_id, sender_key, kind, source).await
1188              }
1189  
1190              AbzuFrame::DhtResponse { request_id, responder_key, kind } => {
1191                  // Handle incoming DHT response
1192                  self.handle_dht_response(request_id, responder_key, kind, source).await
1193              }
1194  
1195          }
1196      }
1197  
1198      /// Handle a content chunk
1199      async fn handle_chunk(&self, cid: [u8; 32], data: Vec<u8>, source: PeerKey) -> RouteResult {
1200          debug!(cid = ?cid, size = data.len(), "Received chunk");
1201  
1202          // Verify CID matches data hash
1203          let computed_cid = blake3::hash(&data);
1204          if computed_cid.as_bytes() != &cid {
1205              warn!("CID mismatch - rejecting chunk");
1206              return RouteResult::Error(NodeError::Crypto("CID mismatch".to_string()));
1207          }
1208  
1209          // Store the chunk
1210          if let Err(e) = self.node.store_chunk(&cid, &data) {
1211              return RouteResult::Error(e);
1212          }
1213  
1214          // Notify any pending fetches for this CID
1215          self.node.notify_content_arrived(&cid);
1216  
1217          self.touch_peer(&source).await;
1218          RouteResult::Local
1219      }
1220  
1221      /// Handle a content request
1222      async fn handle_request(
1223          &self,
1224          cid: [u8; 32],
1225          requester: [u8; 32],
1226          source: PeerKey,
1227      ) -> RouteResult {
1228          info!(cid = ?cid, requester = ?requester, "Received content request");
1229  
1230          // Check if we have the content
1231          match self.node.get_chunk(&cid) {
1232              Ok(Some(data)) => {
1233                  // We have it - send Chunk back to requester
1234                  let chunk_frame = AbzuFrame::chunk(cid, data);
1235                  
1236                  // Send to the source peer (who forwarded the request)
1237                  let peers_arc = self.node.peers();
1238                  let mut peers = peers_arc.lock().await;
1239                  
1240                  if let Some(conn) = peers.get_mut(&source) {
1241                      if let Err(e) = conn.interface.send_frame(&chunk_frame).await {
1242                          warn!(error = %e, "Failed to send chunk response");
1243                      } else {
1244                          conn.touch();
1245                          info!(cid = ?cid, "Sent chunk response");
1246                      }
1247                  }
1248              }
1249              Ok(None) => {
1250                  debug!(cid = ?cid, "Content not found locally");
1251                  // TODO: Forward request to other peers (gossip)
1252              }
1253              Err(e) => {
1254                  warn!(error = %e, "Error checking store");
1255              }
1256          }
1257  
1258          self.touch_peer(&source).await;
1259          RouteResult::Local
1260      }
1261  
1262      /// Handle a routing frame (multi-hop onion routing)
1263      ///
1264      /// Route frames can be nested. Each node:
1265      /// 1. Checks if it's the final target → process payload locally
1266      /// 2. Checks if next_hop is us → unwrap inner frame and forward
1267      /// 3. Otherwise → forward the whole frame to next_hop
1268      ///
1269      /// # Security
1270      /// The `depth` parameter prevents DoS from deeply nested onion frames.
1271      /// Frames exceeding MAX_ROUTE_DEPTH are rejected.
1272      async fn handle_route(
1273          &self,
1274          target: [u8; 32],
1275          next_hop: [u8; 32],
1276          payload: Vec<u8>,
1277          source: PeerKey,
1278          depth: usize,
1279      ) -> RouteResult {
1280          // SECURITY: Reject frames that exceed max routing depth (DoS protection)
1281          if depth >= MAX_ROUTE_DEPTH {
1282              warn!(depth, max = MAX_ROUTE_DEPTH, "Route frame exceeded max depth, dropping");
1283              return RouteResult::Error(NodeError::Validation("Route depth exceeded".into()));
1284          }
1285  
1286          let our_key = self.node.peer_key();
1287  
1288          // Case 1: We are the final destination
1289          if target == our_key {
1290              debug!("Route frame arrived at destination");
1291              return self.process_local_payload(payload, source).await;
1292          }
1293  
1294          // ROLE CHECK: Edge nodes don't route for others
1295          if !self.node.node_role().can_route() {
1296              debug!("Edge node refusing to route frame (not for us)");
1297              return RouteResult::NoRoute;
1298          }
1299  
1300          // Case 2: next_hop is us - we need to unwrap and forward
1301          if next_hop == our_key {
1302              debug!(target = ?target, depth, "Peeling onion layer, forwarding inner frame");
1303              
1304              // Try to decode the payload as an inner Route frame
1305              match AbzuFrame::decode(&payload) {
1306                  Ok(inner_frame) => {
1307                      // Forward the inner frame to its next_hop
1308                      if let AbzuFrame::Route { target: inner_target, next_hop: inner_next_hop, payload: inner_payload } = inner_frame {
1309                          // Recursively handle the inner route with incremented depth
1310                          return Box::pin(self.handle_route(
1311                              inner_target,
1312                              inner_next_hop,
1313                              inner_payload,
1314                              source,
1315                              depth + 1,
1316                          )).await;
1317                      } else {
1318                          // Inner frame is not a Route - process locally
1319                          // This is the terminal case, no further recursion risk
1320                          warn!("Inner frame is not a Route, processing as local");
1321                          return Box::pin(self.handle_frame(inner_frame, source)).await;
1322                      }
1323                  }
1324                  Err(e) => {
1325                      warn!(error = %e, "Failed to decode inner route frame");
1326                      return RouteResult::Error(NodeError::Crypto(format!("Invalid inner frame: {}", e)));
1327                  }
1328              }
1329          }
1330  
1331          // Case 3: Forward the whole frame to next_hop (we're not the next hop)
1332          debug!(next_hop = ?next_hop, target = ?target, "Forwarding route frame to next hop");
1333          let frame = AbzuFrame::Route { target, next_hop, payload };
1334          let frame_bytes = frame.encode().expect("Failed to encode route frame");
1335          self.forward_frame_to_peer(&next_hop, &frame_bytes).await
1336      }
1337  
1338      /// Forward encoded frame bytes to a specific peer
1339      async fn forward_frame_to_peer(&self, peer_key: &PeerKey, frame_bytes: &[u8]) -> RouteResult {
1340          let peers_arc = self.node.peers();
1341          let mut peers = peers_arc.lock().await;
1342  
1343          if let Some(conn) = peers.get_mut(peer_key) {
1344              match conn.interface.send(frame_bytes).await {
1345                  Ok(()) => {
1346                      conn.tx_bytes += frame_bytes.len() as u64;
1347                      conn.touch();
1348                      RouteResult::Forwarded(*peer_key)
1349                  }
1350                  Err(e) => RouteResult::Error(NodeError::Transport(e)),
1351              }
1352          } else {
1353              debug!(peer = ?peer_key, "No direct connection, trying router");
1354              // Fall back to routing if no direct connection
1355              drop(peers); // Release lock before routing
1356              self.forward_via_router(peer_key, frame_bytes).await
1357          }
1358      }
1359  
1360      /// Forward via router when no direct connection exists
1361      async fn forward_via_router(&self, target: &PeerKey, payload: &[u8]) -> RouteResult {
1362          let next_hop_result = {
1363              let router_arc = self.node.router();
1364              let router = router_arc.read().await;
1365              // Derive address from key bytes (same pattern as PeerInfo::from_key_bytes)
1366              let mut addr_bytes = [0u8; 16];
1367              addr_bytes[0] = 0x02;
1368              addr_bytes[1] = target[0] & 0x7F;
1369              addr_bytes[2..].copy_from_slice(&target[..14]);
1370              let target_addr = abzu_router::Address::from_bytes(addr_bytes);
1371              router.find_next_hop(&target_addr)
1372          };
1373  
1374          match next_hop_result {
1375              Ok(Some(next_peer)) => {
1376                  self.send_to_peer(&next_peer, payload).await
1377              }
1378              Ok(None) => {
1379                  RouteResult::NoRoute
1380              }
1381              Err(e) => RouteResult::Error(NodeError::Routing(e)),
1382          }
1383      }
1384  
1385  
1386      /// Send payload to a specific peer
1387      async fn send_to_peer(&self, peer_key: &PeerKey, payload: &[u8]) -> RouteResult {
1388          let peers_arc = self.node.peers();
1389          let mut peers = peers_arc.lock().await;
1390  
1391          if let Some(conn) = peers.get_mut(peer_key) {
1392              match conn.interface.send(payload).await {
1393                  Ok(()) => {
1394                      conn.tx_bytes += payload.len() as u64;
1395                      conn.touch();
1396                      RouteResult::Forwarded(*peer_key)
1397                  }
1398                  Err(e) => RouteResult::Error(NodeError::Transport(e)),
1399              }
1400          } else {
1401              RouteResult::NoRoute
1402          }
1403      }
1404  
1405      /// Process a payload that has arrived at its destination
1406      async fn process_local_payload(&self, payload: Vec<u8>, source: PeerKey) -> RouteResult {
1407          // Try to decode as an inner AbzuFrame
1408          match AbzuFrame::decode(&payload) {
1409              Ok(inner_frame) => {
1410                  // Recursively handle the inner frame
1411                  Box::pin(self.handle_frame(inner_frame, source)).await
1412              }
1413              Err(_) => {
1414                  // Raw payload - store as content
1415                  let cid = blake3::hash(&payload);
1416                  if let Err(e) = self.node.store_chunk(cid.as_bytes(), &payload) {
1417                      return RouteResult::Error(e);
1418                  }
1419                  RouteResult::Local
1420              }
1421          }
1422      }
1423  
1424      /// Handle handshake initiation
1425      async fn handle_hello(
1426          &self,
1427          _ephemeral_pub: [u8; 32],
1428          _timestamp: u64,
1429          source: PeerKey,
1430      ) -> RouteResult {
1431          info!(source = ?source, "Received Hello from peer");
1432  
1433          // TODO: Implement proper key exchange
1434          // For now, just acknowledge receipt
1435          self.touch_peer(&source).await;
1436          RouteResult::Local
1437      }
1438  
1439      /// Handle handshake acknowledgment
1440      async fn handle_hello_ack(
1441          &self,
1442          _ephemeral_pub: [u8; 32],
1443          _confirmation: Vec<u8>,
1444          source: PeerKey,
1445      ) -> RouteResult {
1446          info!(source = ?source, "Received HelloAck from peer");
1447  
1448          // TODO: Complete key exchange
1449          self.touch_peer(&source).await;
1450          RouteResult::Local
1451      }
1452  
1453      /// Handle an incoming chat message
1454      async fn handle_chat(
1455          &self,
1456          msg_id: u64,
1457          _to: [u8; 32],
1458          msg: Vec<u8>,
1459          timestamp: u64,
1460          source: PeerKey,
1461      ) -> RouteResult {
1462          info!(id = msg_id, from = ?source, "Received chat message");
1463  
1464          // Store the inbound message
1465          if let Err(e) = self.node.store_inbound_chat(source, msg_id, msg, timestamp) {
1466              warn!(error = %e, "Failed to store inbound chat");
1467              return RouteResult::Error(e);
1468          }
1469  
1470          // Send ChatAck back to sender
1471          let ack_frame = AbzuFrame::chat_ack(msg_id);
1472          let peers_arc = self.node.peers();
1473          let mut peers = peers_arc.lock().await;
1474  
1475          if let Some(conn) = peers.get_mut(&source) {
1476              if let Err(e) = conn.interface.send_frame(&ack_frame).await {
1477                  warn!(error = %e, "Failed to send ChatAck");
1478              } else {
1479                  conn.touch();
1480                  debug!(id = msg_id, "Sent ChatAck");
1481              }
1482          }
1483  
1484          RouteResult::Local
1485      }
1486  
1487      /// Handle a chat delivery acknowledgment
1488      async fn handle_chat_ack(&self, msg_id: u64, source: PeerKey) -> RouteResult {
1489          debug!(id = msg_id, from = ?source, "Received ChatAck");
1490  
1491          // Mark the message as delivered
1492          if let Err(e) = self.node.mark_delivered(msg_id) {
1493              // Not finding the message is not fatal - might be a duplicate ack
1494              debug!(error = %e, "Could not mark message delivered (may be duplicate)");
1495          }
1496  
1497          self.touch_peer(&source).await;
1498          RouteResult::Local
1499      }
1500  
1501      /// Handle a read receipt - recipient read the message
1502      async fn handle_read_receipt(&self, msg_id: u64, source: PeerKey) -> RouteResult {
1503          debug!(id = msg_id, from = ?source, "Received ReadReceipt");
1504  
1505          // Mark the message as read
1506          if let Err(e) = self.node.mark_read(msg_id) {
1507              // Not finding the message is not fatal - might be a duplicate receipt
1508              debug!(error = %e, "Could not mark message read (may be duplicate)");
1509          }
1510  
1511          self.touch_peer(&source).await;
1512          RouteResult::Local
1513      }
1514  
1515      /// Handle a peer address announcement for NAT traversal
1516      async fn handle_announce(
1517          &self,
1518          peer_key: [u8; 32],
1519          public_addr: Vec<u8>,
1520          nat_type: u8,
1521          timestamp: u64,
1522          source: PeerKey,
1523      ) -> RouteResult {
1524          // Validate timestamp (reject old announcements > 5 min)
1525          let now = std::time::SystemTime::now()
1526              .duration_since(std::time::UNIX_EPOCH)
1527              .unwrap()
1528              .as_secs();
1529          
1530          if now.saturating_sub(timestamp) > 300 {
1531              debug!(peer = ?peer_key, "Rejecting stale announcement");
1532              return RouteResult::Local;
1533          }
1534  
1535          // Log the announcement
1536          info!(
1537              peer = ?peer_key,
1538              addr_len = public_addr.len(),
1539              nat_type = nat_type,
1540              "Received peer address announcement"
1541          );
1542  
1543          // Store in peer table for future connection attempts
1544          // For now, just update the peer's last activity
1545          self.touch_peer(&source).await;
1546          
1547          // TODO: Store public_addr in a peer directory for later NAT-aware connections
1548          // TODO: Optionally forward to other peers (gossip)
1549          
1550          RouteResult::Local
1551      }
1552  
1553      /// Update last activity for a peer
1554      async fn touch_peer(&self, peer_key: &PeerKey) {
1555          let peers_arc = self.node.peers();
1556          let mut peers = peers_arc.lock().await;
1557          if let Some(conn) = peers.get_mut(peer_key) {
1558              conn.touch();
1559          }
1560      }
1561  
1562      /// Periodic maintenance of seen filter (call every ~60 seconds)
1563      /// 
1564      /// Rotates filters if needed. Returns true if rotation occurred.
1565      pub async fn cleanup_seen_filter(&self) -> bool {
1566          let mut filter = self.seen_filter.write().await;
1567          let rotated = filter.maybe_rotate();
1568          if rotated {
1569              debug!(capacity = filter.capacity(), "Rotated seen filter");
1570          }
1571          rotated
1572      }
1573  
1574      /// Send keepalive to all idle connections
1575      pub async fn send_keepalives(&self, idle_threshold_ms: u64) -> Vec<(PeerKey, Result<(), TransportError>)> {
1576          let mut results = Vec::new();
1577          let peers_arc = self.node.peers();
1578          let mut peers = peers_arc.lock().await;
1579  
1580          for (key, conn) in peers.iter_mut() {
1581              if conn.needs_keepalive(idle_threshold_ms) {
1582                  let frame = AbzuFrame::KeepAlive;
1583                  let result = conn.interface.send_frame(&frame).await;
1584                  
1585                  if result.is_ok() {
1586                      conn.touch();
1587                  }
1588                  
1589                  results.push((*key, result));
1590              }
1591          }
1592  
1593          results
1594      }
1595  
1596      /// Broadcast a frame to all peers
1597      pub async fn broadcast(&self, frame: &AbzuFrame) -> Vec<(PeerKey, Result<(), TransportError>)> {
1598          let mut results = Vec::new();
1599          let peers_arc = self.node.peers();
1600          let mut peers = peers_arc.lock().await;
1601  
1602          for (key, conn) in peers.iter_mut() {
1603              let result = conn.interface.send_frame(frame).await;
1604              
1605              if result.is_ok() {
1606                  conn.touch();
1607              }
1608              
1609              results.push((*key, result));
1610          }
1611  
1612          results
1613      }
1614  
1615      /// Broadcast raw bytes to all peers (for cover traffic)
1616      pub async fn broadcast_raw(&self, data: &[u8]) {
1617          let peers_arc = self.node.peers();
1618          let mut peers = peers_arc.lock().await;
1619  
1620          for (key, conn) in peers.iter_mut() {
1621              if let Err(e) = conn.interface.send(data).await {
1622                  trace!(peer = ?key, error = %e, "Failed to send cover packet");
1623              }
1624          }
1625      }
1626  
1627      /// Retry pending messages that haven't been acknowledged
1628      /// 
1629      /// This checks all pending outbound messages and resends them if they've
1630      /// been waiting longer than the retry_after_ms threshold.
1631      /// 
1632      /// Returns (retried_count, failed_count)
1633      pub async fn retry_pending_messages(
1634          &self,
1635          retry_after_ms: u64,
1636          max_age_ms: u64,
1637      ) -> (usize, usize) {
1638          let now = std::time::SystemTime::now()
1639              .duration_since(std::time::UNIX_EPOCH)
1640              .unwrap()
1641              .as_millis() as u64;
1642  
1643          let pending = match self.node.get_pending_messages() {
1644              Ok(msgs) => msgs,
1645              Err(e) => {
1646                  warn!(error = %e, "Failed to get pending messages");
1647                  return (0, 0);
1648              }
1649          };
1650  
1651          let mut retried = 0;
1652          let mut failed = 0;
1653  
1654          for msg in pending {
1655              let age = now.saturating_sub(msg.timestamp);
1656  
1657              // Too old - mark as failed
1658              if age > max_age_ms {
1659                  debug!(id = msg.id, age_ms = age, "Message too old, marking failed");
1660                  if let Err(e) = self.node.mark_failed(msg.id) {
1661                      warn!(id = msg.id, error = %e, "Failed to mark message as failed");
1662                  }
1663                  failed += 1;
1664                  continue;
1665              }
1666  
1667              // Not ready for retry yet
1668              if age < retry_after_ms {
1669                  continue;
1670              }
1671  
1672              // Attempt to resend
1673              let chat_frame = AbzuFrame::Chat {
1674                  id: msg.id,
1675                  to: msg.peer,
1676                  msg: msg.content.clone(),
1677                  timestamp: msg.timestamp,
1678              };
1679  
1680              let peer_key: PeerKey = msg.peer;
1681              
1682              // Check if peer is connected
1683              let peers_arc = self.node.peers();
1684              let peers = peers_arc.lock().await;
1685              
1686              if let Some(conn) = peers.get(&peer_key) {
1687                  let encoded = match chat_frame.encode() {
1688                      Ok(e) => e,
1689                      Err(e) => {
1690                          warn!(id = msg.id, error = %e, "Failed to encode retry frame");
1691                          continue;
1692                      }
1693                  };
1694  
1695                  match conn.interface.send(&encoded).await {
1696                      Ok(_) => {
1697                          debug!(id = msg.id, peer = ?peer_key, "Retried pending message");
1698                          retried += 1;
1699                      }
1700                      Err(e) => {
1701                          debug!(id = msg.id, error = %e, "Retry send failed (peer offline)");
1702                      }
1703                  }
1704              } else {
1705                  trace!(id = msg.id, peer = ?peer_key, "Peer not connected, skipping retry");
1706              }
1707          }
1708  
1709          if retried > 0 || failed > 0 {
1710              info!(retried = retried, failed = failed, "Message retry pass complete");
1711          }
1712  
1713          (retried, failed)
1714      }
1715  
1716      // ─────────────────────────────────────────────────────────────────────────
1717      // Circle (Group) Handlers
1718      // ─────────────────────────────────────────────────────────────────────────
1719  
1720      /// Handle incoming circle creation (for state replication)
1721      async fn handle_circle_create(
1722          &self,
1723          _id: [u8; 32],
1724          _name: Vec<u8>,
1725          _founder_key: [u8; 32],
1726          _timestamp: u64,
1727          _source: PeerKey,
1728      ) -> RouteResult {
1729          // TODO: Validate signature, store circle if we're invited
1730          debug!("Received CircleCreate - replication not yet implemented");
1731          RouteResult::Local
1732      }
1733  
1734      /// Handle circle invitation
1735      /// Handle circle invitation
1736      async fn handle_circle_invite(
1737          &self,
1738          circle_id: [u8; 32],
1739          invitee: [u8; 32],
1740          signature: Vec<u8>,
1741          source: PeerKey,
1742      ) -> RouteResult {
1743          use ed25519_dalek::{VerifyingKey, Signature, Signer, Verifier};
1744          
1745          // 1. Verify that the invitation is for us
1746          let our_key = self.node.machine_identity().public_bytes();
1747          if invitee != our_key {
1748              debug!("Received invitation for someone else, ignoring");
1749              return RouteResult::Local;
1750          }
1751  
1752          // 2. Verify signature: invitee signed by inviter (source)
1753          // message = circle_id || invitee
1754          let mut message = Vec::with_capacity(64);
1755          message.extend_from_slice(&circle_id);
1756          message.extend_from_slice(&invitee);
1757  
1758          let inviter_key = match VerifyingKey::from_bytes(&source) {
1759              Ok(k) => k,
1760              Err(_) => {
1761                  warn!("Invalid inviter public key");
1762                  return RouteResult::Local; // Drop
1763              }
1764          };
1765          
1766          let sig_bytes: [u8; 64] = match signature.clone().try_into() {
1767              Ok(b) => b,
1768              Err(_) => {
1769                  warn!("Invalid signature length");
1770                  return RouteResult::Local;
1771              }
1772          };
1773  
1774          if inviter_key.verify(&message, &Signature::from_bytes(&sig_bytes)).is_err() {
1775              warn!("Invalid invitation signature");
1776              return RouteResult::Local;
1777          }
1778  
1779          info!(circle = ?&circle_id[..8], inviter = ?&source[..8], "Received valid Circle invitation");
1780  
1781          // 3. Pass to TrustEngine to handle acceptance logic
1782          // TODO: This auto-accepts for now. In reality, we should queue this for user approval.
1783          // For the readiness gap, we just ensure the plumbing works.
1784          match self.node.trust_engine().accept_invite(circle_id, source, signature) {
1785              Ok(_) => {
1786                  info!("Accepted invitation to circle");
1787                  // TODO: Send CircleJoin to announce presence?
1788              }
1789              Err(e) => {
1790                  warn!(error = %e, "Failed to accept invitation");
1791              }
1792          }
1793  
1794          RouteResult::Local
1795      }
1796  
1797      /// Handle member joining a circle
1798      /// Handle member joining a circle
1799      async fn handle_circle_join(
1800          &self,
1801          circle_id: [u8; 32],
1802          member_key: [u8; 32],
1803          epoch: u64,
1804          _source: PeerKey,
1805      ) -> RouteResult {
1806          debug!(
1807              circle = ?&circle_id[..8], 
1808              member = ?&member_key[..8], 
1809              epoch, 
1810              "Received CircleJoin"
1811          );
1812  
1813          // Update TrustEngine with new member
1814          // This is usually a gossip/sync event
1815          if let Err(e) = self.node.trust_engine().member_joined(circle_id, member_key, epoch) {
1816              warn!(error = %e, "Failed to process member join");
1817          } else {
1818               info!(circle = ?&circle_id[..8], member = ?&member_key[..8], "Member join recorded");
1819          }
1820  
1821          RouteResult::Local
1822      }
1823  
1824      /// Handle member leaving a circle
1825      /// Handle member leaving a circle
1826      async fn handle_circle_leave(
1827          &self,
1828          circle_id: [u8; 32],
1829          member_key: [u8; 32],
1830          _source: PeerKey,
1831      ) -> RouteResult {
1832           debug!(
1833              circle = ?&circle_id[..8], 
1834              member = ?&member_key[..8], 
1835              "Received CircleLeave"
1836          );
1837  
1838          // Update TrustEngine
1839          if let Err(e) = self.node.trust_engine().member_left(circle_id, member_key) {
1840               warn!(error = %e, "Failed to process member leave");
1841          } else {
1842               info!(circle = ?&circle_id[..8], member = ?&member_key[..8], "Member leave recorded");
1843          }
1844  
1845          RouteResult::Local
1846      }
1847  
1848      /// Handle incoming circle message
1849      /// 
1850      /// SECURITY: The encrypted_envelope contains sender and timestamp inside,
1851      /// preventing traffic analysis from revealing who is communicating when.
1852      async fn handle_circle_message(
1853          &self,
1854          circle_id: [u8; 32],
1855          id: u64,
1856          encrypted_envelope: Vec<u8>,
1857          source: PeerKey,
1858      ) -> RouteResult {
1859          debug!(circle = ?&circle_id[..8], id = id, "Received circle message (envelope encrypted)");
1860  
1861          // Verify we're a member of this circle
1862          match self.node.get_circle(&circle_id) {
1863              Ok(Some(circle)) => {
1864                  let our_key = self.node.peer_key();
1865                  if !circle.members.iter().any(|m| m.pubkey == our_key) {
1866                      debug!("Received message for circle we're not a member of");
1867                      return RouteResult::Local;
1868                  }
1869  
1870                  // SECURITY: Decrypt the envelope to get sender, timestamp, and content
1871                  // Sender and timestamp are now protected from network observers
1872                  let envelope = match self.node.decrypt_circle_envelope(&circle_id, &encrypted_envelope) {
1873                      Ok(env) => env,
1874                      Err(e) => {
1875                          debug!("Failed to decrypt circle envelope: {:?}", e);
1876                          return RouteResult::Error(e);
1877                      }
1878                  };
1879  
1880                  debug!(sender = ?&envelope.sender[..8], timestamp = envelope.timestamp, "Decrypted envelope");
1881  
1882                  // Store decrypted message
1883                  if let Err(e) = self.node.store_inbound_circle_message(
1884                      circle_id, 
1885                      id, 
1886                      envelope.sender, 
1887                      envelope.content, 
1888                      envelope.timestamp
1889                  ) {
1890                      return RouteResult::Error(e);
1891                  }
1892  
1893                  // Send ack back to sender
1894                  let ack_frame = AbzuFrame::CircleAck {
1895                      circle_id,
1896                      msg_id: id,
1897                      member_key: our_key,
1898                  };
1899                  if let Ok(encoded) = ack_frame.encode() {
1900                      let _ = self.send_to_peer(&source, &encoded).await;
1901                  }
1902              }
1903              Ok(None) => {
1904                  debug!("Received message for unknown circle");
1905              }
1906              Err(e) => {
1907                  return RouteResult::Error(e);
1908              }
1909          }
1910  
1911          RouteResult::Local
1912      }
1913  
1914      /// Broadcast a frame to all members of a circle (except ourselves)
1915      pub async fn broadcast_to_circle(&self, circle_id: &[u8; 32], frame: &AbzuFrame) -> Result<usize, NodeError> {
1916          let circle = self.node.get_circle(circle_id)?
1917              .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?;
1918          
1919          let our_key = self.node.peer_key();
1920          let encoded = frame.encode().map_err(|e| NodeError::Serialization(e.to_string()))?;
1921          
1922          let mut sent_count = 0;
1923          for member in &circle.members {
1924              // Skip ourselves
1925              if member.pubkey == our_key {
1926                  continue;
1927              }
1928              
1929              // Try to send to this member
1930              match self.send_to_peer(&member.pubkey, &encoded).await {
1931                  RouteResult::Forwarded(_) => {
1932                      sent_count += 1;
1933                  }
1934                  RouteResult::NoRoute => {
1935                      debug!(member = ?&member.pubkey[..8], "No route to circle member");
1936                  }
1937                  RouteResult::Error(e) => {
1938                      debug!(member = ?&member.pubkey[..8], error = ?e, "Failed to send to circle member");
1939                  }
1940                  _ => {}
1941              }
1942          }
1943          
1944          debug!(circle = ?&circle_id[..8], sent = sent_count, total = circle.members.len() - 1, "Broadcast complete");
1945          Ok(sent_count)
1946      }
1947  
1948      /// Handle circle message acknowledgment
1949      async fn handle_circle_ack(
1950          &self,
1951          _circle_id: [u8; 32],
1952          _msg_id: u64,
1953          _member_key: [u8; 32],
1954          _source: PeerKey,
1955      ) -> RouteResult {
1956          // TODO: Track delivery status per member
1957          debug!("Received CircleAck - delivery tracking not yet implemented");
1958          RouteResult::Local
1959      }
1960  
1961      /// Handle secondary endorsement (vouch) for an existing Circle member.
1962      /// 
1963      /// A vouch is a weaker form of trust than an invite. Multiple vouches
1964      /// can elevate a member to admin status.
1965      async fn handle_circle_vouch(
1966          &self,
1967          circle_id: [u8; 32],
1968          voucher: [u8; 32],
1969          target: [u8; 32],
1970          _signature: Vec<u8>,
1971          timestamp: u64,
1972          _source: PeerKey,
1973      ) -> RouteResult {
1974          debug!(
1975              circle = ?&circle_id[..8],
1976              voucher = ?&voucher[..8],
1977              target = ?&target[..8],
1978              timestamp,
1979              "Received CircleVouch"
1980          );
1981          
1982          // Verify signature: voucher signed (circle_id || target || timestamp)
1983          let message = {
1984              let mut msg = Vec::with_capacity(32 + 32 + 8);
1985              msg.extend_from_slice(&circle_id);
1986              msg.extend_from_slice(&target);
1987              msg.extend_from_slice(&timestamp.to_be_bytes());
1988              msg
1989          };
1990          
1991          let voucher_vk = match ed25519_dalek::VerifyingKey::from_bytes(&voucher) {
1992              Ok(vk) => vk,
1993              Err(e) => {
1994                  warn!("Invalid voucher public key: {}", e);
1995                  return RouteResult::Local;
1996              }
1997          };
1998          
1999          let sig_bytes: [u8; 64] = match _signature.clone().try_into() {
2000              Ok(bytes) => bytes,
2001              Err(_) => {
2002                  warn!("Invalid signature length");
2003                  return RouteResult::Local;
2004              }
2005          };
2006          
2007          let signature = ed25519_dalek::Signature::from_bytes(&sig_bytes);
2008          if voucher_vk.verify_strict(&message, &signature).is_err() {
2009              warn!("Vouch signature verification failed");
2010              return RouteResult::Local;
2011          }
2012          
2013          // Apply the vouch using our local node
2014          // Note: This only applies if we are also a member and can record it
2015          match self.node.vouch_member(voucher, &circle_id, target, _signature) {
2016              Ok(_circle) => {
2017                  info!(
2018                      circle = ?&circle_id[..8],
2019                      voucher = ?&voucher[..8],
2020                      target = ?&target[..8],
2021                      "Vouch recorded successfully"
2022                  );
2023                  
2024                  // Broadcast to other members
2025                  let vouch_frame = AbzuFrame::CircleVouch {
2026                      circle_id,
2027                      voucher,
2028                      target,
2029                      signature: sig_bytes.to_vec(),
2030                      timestamp,
2031                  };
2032                  if let Err(e) = self.broadcast_to_circle(&circle_id, &vouch_frame).await {
2033                      warn!("Failed to broadcast vouch: {}", e);
2034                  }
2035              }
2036              Err(e) => {
2037                  warn!("Failed to apply vouch: {}", e);
2038              }
2039          }
2040          
2041          RouteResult::Local
2042      }
2043  
2044      /// Handle member pruning (moderation action).
2045      /// 
2046      /// Removes a member AND all members they invited (recursively).
2047      /// This is the accountability mechanism - if you invite someone bad,
2048      /// you risk being pruned along with them.
2049      #[allow(clippy::too_many_arguments)]
2050      async fn handle_circle_prune(
2051          &self,
2052          circle_id: [u8; 32],
2053          pruner: [u8; 32],
2054          target: [u8; 32],
2055          _signature: Vec<u8>,
2056          _reason_hash: Option<[u8; 32]>,
2057          timestamp: u64,
2058          _source: PeerKey,
2059      ) -> RouteResult {
2060          debug!(
2061              circle = ?&circle_id[..8],
2062              pruner = ?&pruner[..8],
2063              target = ?&target[..8],
2064              timestamp,
2065              "Received CirclePrune"
2066          );
2067          
2068          // Build message to verify: circle_id || target || reason_hash || timestamp
2069          let message = {
2070              let mut msg = Vec::with_capacity(32 + 32 + 32 + 8);
2071              msg.extend_from_slice(&circle_id);
2072              msg.extend_from_slice(&target);
2073              if let Some(reason) = _reason_hash {
2074                  msg.extend_from_slice(&reason);
2075              }
2076              msg.extend_from_slice(&timestamp.to_be_bytes());
2077              msg
2078          };
2079          
2080          // Verify pruner's signature
2081          let pruner_vk = match ed25519_dalek::VerifyingKey::from_bytes(&pruner) {
2082              Ok(vk) => vk,
2083              Err(e) => {
2084                  warn!("Invalid pruner public key: {}", e);
2085                  return RouteResult::Local;
2086              }
2087          };
2088          
2089          let sig_bytes: [u8; 64] = match _signature.clone().try_into() {
2090              Ok(bytes) => bytes,
2091              Err(_) => {
2092                  warn!("Invalid prune signature length");
2093                  return RouteResult::Local;
2094              }
2095          };
2096          
2097          let signature = ed25519_dalek::Signature::from_bytes(&sig_bytes);
2098          if pruner_vk.verify_strict(&message, &signature).is_err() {
2099              warn!("Prune signature verification failed");
2100              return RouteResult::Local;
2101          }
2102          
2103          // Apply pruning using our local node
2104          match self.node.prune_branch(pruner, &circle_id, target, _signature, _reason_hash) {
2105              Ok((_circle, removed)) => {
2106                  info!(
2107                      circle = ?&circle_id[..8],
2108                      pruner = ?&pruner[..8],
2109                      target = ?&target[..8],
2110                      removed_count = removed.len(),
2111                      "Branch pruned successfully"
2112                  );
2113                  
2114                  // Broadcast to remaining members
2115                  let prune_frame = AbzuFrame::CirclePrune {
2116                      circle_id,
2117                      pruner,
2118                      target,
2119                      signature: sig_bytes.to_vec(),
2120                      reason_hash: _reason_hash,
2121                      timestamp,
2122                  };
2123                  if let Err(e) = self.broadcast_to_circle(&circle_id, &prune_frame).await {
2124                      warn!("Failed to broadcast prune: {}", e);
2125                  }
2126              }
2127              Err(e) => {
2128                  warn!("Failed to apply prune: {}", e);
2129              }
2130          }
2131          
2132          RouteResult::Local
2133      }
2134  
2135  
2136      /// Handle "I have" announcements for lazy-push repair.
2137      /// 
2138      /// When we receive a GossipHave frame, we can identify messages we're missing
2139      /// and request them from the sender.
2140      async fn handle_gossip_have(
2141          &self,
2142          circle_id: [u8; 32],
2143          msg_ids: Vec<u64>,
2144          epoch: u64,
2145          source: PeerKey,
2146      ) -> RouteResult {
2147          debug!(
2148              circle = ?&circle_id[..8], 
2149              msg_count = msg_ids.len(), 
2150              epoch, 
2151              source = ?&source[..8],
2152              "Received GossipHave announcement"
2153          );
2154          
2155          // Check if we're a member of this circle
2156          if self.node.get_circle(&circle_id).ok().flatten().is_none() {
2157              debug!(circle = ?&circle_id[..8], "Not a member of this circle, ignoring gossip");
2158              return RouteResult::Local;
2159          }
2160          
2161          // TODO: Implement pull mechanism for missing messages.
2162          // For now, just log the announcement - eager push handles most propagation.
2163          // Future: Compare msg_ids with our seen-set and request missing ones.
2164          
2165          RouteResult::Local
2166      }
2167  
2168      // ─────────────────────────────────────────────────────────────
2169      // Gossip Protocol (Epidemic Broadcast)
2170      // ─────────────────────────────────────────────────────────────
2171  
2172      /// Calculate gossip fanout: √n peers, clamped to [2, 8]
2173      pub fn calculate_fanout(member_count: usize) -> usize {
2174          let n = member_count.saturating_sub(1); // Exclude self
2175          let fanout = (n as f64).sqrt().ceil() as usize;
2176          fanout.clamp(2, 8)
2177      }
2178  
2179      /// Check if we've seen this message recently (probabilistic)
2180      pub async fn already_seen(&self, circle_id: &[u8; 32], msg_id: u64) -> bool {
2181          let filter = self.seen_filter.read().await;
2182          filter.contains(circle_id, msg_id)
2183      }
2184  
2185      /// Mark a message as seen (insert into probabilistic filter)
2186      pub async fn mark_seen(&self, circle_id: &[u8; 32], msg_id: u64) {
2187          let mut filter = self.seen_filter.write().await;
2188          filter.insert(circle_id, msg_id);
2189          // Opportunistic rotation check
2190          filter.maybe_rotate();
2191      }
2192  
2193      /// Gossip a message to √n random circle members (epidemic broadcast)
2194      /// 
2195      /// Returns the number of peers we gossiped to.
2196      pub async fn gossip_to_circle(
2197          &self,
2198          circle_id: &[u8; 32],
2199          msg_id: u64,
2200          frame: &AbzuFrame,
2201      ) -> Result<usize, NodeError> {
2202          // Check deduplication first
2203          if self.already_seen(circle_id, msg_id).await {
2204              trace!(circle = ?&circle_id[..8], msg_id, "Message already seen, skipping gossip");
2205              return Ok(0);
2206          }
2207          
2208          // Mark as seen
2209          self.mark_seen(circle_id, msg_id).await;
2210          
2211          // Get circle
2212          let circle = self.node.get_circle(circle_id)?
2213              .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?;
2214          
2215          let our_key = self.node.peer_key();
2216          let encoded = frame.encode().map_err(|e| NodeError::Serialization(e.to_string()))?;
2217          
2218          // Calculate fanout
2219          let fanout = Self::calculate_fanout(circle.members.len());
2220          
2221          // Filter to eligible peers (not us)
2222          let eligible: Vec<_> = circle.members.iter()
2223              .filter(|m| m.pubkey != our_key)
2224              .collect();
2225          
2226          if eligible.is_empty() {
2227              return Ok(0);
2228          }
2229          
2230          // Select random subset
2231          use rand::seq::SliceRandom;
2232          let mut rng = rand::thread_rng();
2233          let targets: Vec<_> = eligible.choose_multiple(&mut rng, fanout.min(eligible.len())).collect();
2234          
2235          let mut sent_count = 0;
2236          for member in targets {
2237              match self.send_to_peer(&member.pubkey, &encoded).await {
2238                  RouteResult::Forwarded(_) => {
2239                      sent_count += 1;
2240                  }
2241                  RouteResult::NoRoute => {
2242                      debug!(member = ?&member.pubkey[..8], "No route to gossip target");
2243                  }
2244                  RouteResult::Error(e) => {
2245                      debug!(member = ?&member.pubkey[..8], error = ?e, "Failed to gossip to member");
2246                  }
2247                  _ => {}
2248              }
2249          }
2250          
2251          debug!(
2252              circle = ?&circle_id[..8], 
2253              msg_id, 
2254              sent = sent_count, 
2255              fanout,
2256              total_members = circle.members.len(),
2257              "Gossip broadcast complete"
2258          );
2259          Ok(sent_count)
2260      }
2261  
2262      // ─────────────────────────────────────────────────────────────────────────
2263      // DHT (Distributed Hash Table) Handlers
2264      // ─────────────────────────────────────────────────────────────────────────
2265  
2266      /// Handle an incoming DHT request.
2267      ///
2268      /// Processes Ping, FindNode, FindValue, and Store requests from peers.
2269      /// Sends appropriate responses back to the requester.
2270      async fn handle_dht_request(
2271          &self,
2272          request_id: u64,
2273          sender_key: [u8; 32],
2274          kind: DhtRequestKind,
2275          source: PeerKey,
2276      ) -> RouteResult {
2277          debug!(
2278              request_id,
2279              sender = ?&sender_key[..8],
2280              kind = ?std::mem::discriminant(&kind),
2281              "Received DHT request"
2282          );
2283  
2284          let our_key = self.node.peer_key();
2285          let now = unix_now();
2286  
2287          // SECURITY: Rate limit per-peer DHT requests (H1 in audit).
2288          // Check before doing any expensive work like routing table updates.
2289          let sender_id = abzu_dht::key::node_id(&sender_key);
2290          let is_store = matches!(kind, DhtRequestKind::Store { .. });
2291          {
2292              let mut rate_limiter = self.dht_rate_limiter.write().await;
2293              match rate_limiter.check_request(&sender_id, now, is_store) {
2294                  RateLimitResult::Allowed => {}
2295                  RateLimitResult::Limited { remaining_secs } => {
2296                      warn!(
2297                          sender = ?&sender_key[..8],
2298                          remaining_secs,
2299                          "DHT request rate limited"
2300                      );
2301                      return RouteResult::Local; // Silently drop
2302                  }
2303                  RateLimitResult::StoreLimited { remaining_secs } => {
2304                      warn!(
2305                          sender = ?&sender_key[..8],
2306                          remaining_secs,
2307                          "DHT Store request rate limited"
2308                      );
2309                      return RouteResult::Local; // Silently drop
2310                  }
2311              }
2312          }
2313  
2314          // SECURITY: Add sender as Questionable, not Good (C2 in audit).
2315          // They initiated contact, but we haven't verified bidirectional communication.
2316          // They'll be promoted to Good after we successfully send them a message.
2317          let sender_node = DhtNodeInfoFull {
2318              id: abzu_dht::key::node_id(&sender_key),
2319              pubkey: sender_key,
2320              address: peer_key_to_hex(&source),
2321              tree_coords: None,
2322              state: NodeState::Questionable,
2323              failures: 0,
2324              last_seen: now,
2325              first_seen: now,
2326              rtt_ms: None,
2327          };
2328          
2329          {
2330              let mut routing = self.dht_routing.write().await;
2331              let _ = routing.add(sender_node.clone(), now);
2332          }
2333  
2334          // Build and send response based on request type
2335          let response_frame = match kind {
2336              DhtRequestKind::Ping => {
2337                  // Simple liveness check - respond with Pong
2338                  AbzuFrame::dht_pong(request_id, our_key)
2339              }
2340  
2341              DhtRequestKind::FindNode { target } => {
2342                  // Find K closest nodes to target in our routing table
2343                  let routing = self.dht_routing.read().await;
2344                  let closest = routing.closest(&target, K);
2345                  
2346                  // Convert domain nodes to wire format
2347                  let nodes: Vec<DhtNodeInfo> = closest
2348                      .nodes
2349                      .iter()
2350                      .map(domain_to_wire_node)
2351                      .collect();
2352                      
2353                  debug!(
2354                      target = ?&target[..8],
2355                      found_count = nodes.len(),
2356                      "DHT FindNode - returning closest nodes"
2357                  );
2358                  
2359                  AbzuFrame::dht_nodes(request_id, our_key, nodes)
2360              }
2361  
2362              DhtRequestKind::FindValue { key } => {
2363                  // Try to find the value in our local store
2364                  let values_store = self.dht_values.read().await;
2365                  let stored_values = values_store.get(&key, now);
2366                  
2367                  if let Some(first_value) = stored_values.first() {
2368                      // Found a value - return it
2369                      let ttl_remaining = first_value.metadata.expires_at.saturating_sub(now);
2370                      
2371                      debug!(
2372                          key = ?&key[..8],
2373                          value_len = first_value.payload.len(),
2374                          ttl_remaining,
2375                          "DHT FindValue - found value"
2376                      );
2377                      
2378                      // Convert u64 TTL to u32 for wire format (cap at u32::MAX)
2379                      let ttl_u32 = ttl_remaining.min(u32::MAX as u64) as u32;
2380                      
2381                      AbzuFrame::dht_value(
2382                          request_id,
2383                          our_key,
2384                          first_value.payload.clone(),
2385                          first_value.metadata.publisher,
2386                          ttl_u32,
2387                      )
2388                  } else {
2389                      // Value not found, return closest nodes
2390                      drop(values_store); // Release read lock
2391                      let routing = self.dht_routing.read().await;
2392                      let closest = routing.closest(&key, K);
2393                      let nodes: Vec<DhtNodeInfo> = closest.nodes.iter().map(domain_to_wire_node).collect();
2394                      
2395                      debug!(
2396                          key = ?&key[..8],
2397                          node_count = nodes.len(),
2398                          "DHT FindValue - value not found, returning closest nodes"
2399                      );
2400                      AbzuFrame::dht_nodes(request_id, our_key, nodes)
2401                  }
2402              }
2403  
2404              DhtRequestKind::Store { key, value, ttl_secs, signature } => {
2405                  // SECURITY: Reject malformed signatures immediately (H2 in audit).
2406                  // Ed25519 signatures are exactly 64 bytes. Don't waste cycles on invalid ones.
2407                  if signature.len() != 64 {
2408                      warn!(
2409                          sender = ?&sender_key[..8],
2410                          sig_len = signature.len(),
2411                          "DHT Store rejected - invalid signature length"
2412                      );
2413                      return RouteResult::Local; // Silently drop malformed request
2414                  }
2415  
2416                  // Create a StoredValue for the DHT store using the constructor
2417                  let mut stored = StoredValue::new(
2418                      key,
2419                      abzu_dht::store::ValueType::Application, // Generic application data
2420                      sender_key,
2421                      value.clone(),
2422                      now,
2423                      Some(ttl_secs as u64),
2424                  );
2425                  
2426                  // Signature is exactly 64 bytes, copy directly
2427                  stored.signature.copy_from_slice(&signature);
2428                  
2429                  // Store in our value store
2430                  let mut store = self.dht_values.write().await;
2431                  match store.store(stored, now) {
2432                      Ok(_) => {
2433                          debug!(
2434                              key = ?&key[..8],
2435                              value_len = value.len(),
2436                              ttl_secs,
2437                              "DHT Store - value stored successfully"
2438                          );
2439                          AbzuFrame::dht_store_ack(request_id, our_key, true)
2440                      }
2441                      Err(e) => {
2442                          warn!(
2443                              key = ?&key[..8],
2444                              error = ?e,
2445                              "DHT Store - failed to store value"
2446                          );
2447                          AbzuFrame::dht_store_ack(request_id, our_key, false)
2448                      }
2449                  }
2450              }
2451          };
2452  
2453          // Send the response back to the requester
2454          let peers_arc = self.node.peers();
2455          let mut peers = peers_arc.lock().await;
2456  
2457          if let Some(conn) = peers.get_mut(&source) {
2458              if let Err(e) = conn.interface.send_frame(&response_frame).await {
2459                  warn!(error = %e, "Failed to send DHT response");
2460              } else {
2461                  conn.touch();
2462                  trace!(request_id, "Sent DHT response");
2463              }
2464          }
2465  
2466          self.touch_peer(&source).await;
2467          RouteResult::Local
2468      }
2469  
2470      /// Handle an incoming DHT response.
2471      ///
2472      /// Processes Pong, Nodes, Value, and StoreAck responses from peers.
2473      /// Routes responses to pending DHT operations (lookups, stores).
2474      async fn handle_dht_response(
2475          &self,
2476          request_id: u64,
2477          responder_key: [u8; 32],
2478          kind: DhtResponseKind,
2479          source: PeerKey,
2480      ) -> RouteResult {
2481          debug!(
2482              request_id,
2483              responder = ?&responder_key[..8],
2484              kind = ?std::mem::discriminant(&kind),
2485              "Received DHT response"
2486          );
2487  
2488          let now = unix_now();
2489          
2490          // Update routing table with responder info (they responded, so they're alive)
2491          let responder_node = DhtNodeInfoFull {
2492              id: abzu_dht::key::node_id(&responder_key),
2493              pubkey: responder_key,
2494              address: peer_key_to_hex(&source),
2495              tree_coords: None,
2496              state: NodeState::Good,
2497              failures: 0,
2498              last_seen: now,
2499              first_seen: now,
2500              rtt_ms: None,
2501          };
2502          
2503          {
2504              let mut routing = self.dht_routing.write().await;
2505              let _ = routing.add(responder_node, now);
2506          }
2507  
2508          // Complete pending operation if one exists for this request_id
2509          {
2510              let mut pending = self.pending_dht_ops.write().await;
2511              if let Some((_created_at, sender)) = pending.remove(&request_id) {
2512                  // Clone the response to send to the waiting operation
2513                  let response_clone = kind.clone();
2514                  if sender.send(response_clone).is_err() {
2515                      debug!(request_id, "Pending DHT operation receiver dropped");
2516                  } else {
2517                      trace!(request_id, "Completed pending DHT operation");
2518                  }
2519              }
2520          }
2521  
2522          // Process the response locally (routing table updates, caching, etc.)
2523          match &kind {
2524              DhtResponseKind::Pong => {
2525                  // Node is alive - already updated routing table above
2526                  trace!(responder = ?&responder_key[..8], "DHT Pong received - node alive");
2527              }
2528  
2529              DhtResponseKind::Nodes { nodes } => {
2530                  // Received list of nodes closer to target
2531                  debug!(
2532                      responder = ?&responder_key[..8],
2533                      node_count = nodes.len(),
2534                      "DHT Nodes response received"
2535                  );
2536                  
2537                  // SECURITY: Add discovered nodes as Questionable, not Good (C2/C4 in audit).
2538                  // These nodes are reported by the responder but haven't been verified yet.
2539                  // They will be promoted to Good after successful communication.
2540                  // Also cap the number of nodes we accept per response to prevent flooding.
2541                  const MAX_NODES_PER_RESPONSE: usize = 20; // K = 20
2542                  {
2543                      let mut routing = self.dht_routing.write().await;
2544                      for wire_node in nodes.iter().take(MAX_NODES_PER_RESPONSE) {
2545                          let mut domain_node = wire_to_domain_node(wire_node, now);
2546                          domain_node.state = NodeState::Questionable; // Don't trust until verified
2547                          let _ = routing.add(domain_node, now);
2548                          trace!(node_key = ?&wire_node.key[..8], "Added discovered node as Questionable");
2549                      }
2550                  }
2551              }
2552  
2553              DhtResponseKind::Value { value, publisher, ttl_remaining_secs } => {
2554                  // Received a stored value
2555                  info!(
2556                      responder = ?&responder_key[..8],
2557                      value_len = value.len(),
2558                      publisher = ?&publisher[..8],
2559                      ttl_remaining = ttl_remaining_secs,
2560                      "DHT Value found"
2561                  );
2562                  
2563                  // SECURITY: We intentionally do NOT cache values from FindValue responses.
2564                  // The wire format doesn't include signatures, so we can't verify authenticity.
2565                  // Caching unverified values would enable cache poisoning attacks (C3 in audit).
2566                  // The value is still returned to the caller via the pending operation.
2567              }
2568  
2569              DhtResponseKind::StoreAck { success } => {
2570                  // Store operation acknowledgment
2571                  debug!(
2572                      responder = ?&responder_key[..8],
2573                      success,
2574                      "DHT StoreAck received"
2575                  );
2576              }
2577  
2578              DhtResponseKind::Error { code, message } => {
2579                  // DHT operation failed - mark responder as having an issue
2580                  warn!(
2581                      responder = ?&responder_key[..8],
2582                      error_code = code,
2583                      message = %message,
2584                      "DHT Error received"
2585                  );
2586                  
2587                  // We could increment failure count, but for now just log
2588                  // The node still responded, so it's reachable
2589              }
2590          }
2591  
2592          self.touch_peer(&source).await;
2593          RouteResult::Local
2594      }
2595  
2596  
2597      /// Handle generic service request
2598      async fn handle_service_request(&self, service_id: u64, request_id: u64, requester: [u8; 32], payload: Vec<u8>, source: PeerKey) -> RouteResult {
2599          if let Some(tx) = self.node.get_service_handler(service_id) {
2600              // Forward to local service handler
2601              let event = ServiceEvent::Request {
2602                  request_id,
2603                  requester: PeerKey::from(requester),
2604                  payload,
2605              };
2606              
2607              if let Err(e) = tx.send(event).await {
2608                  warn!(service_id, error = ?e, "Failed to dispatch service request (handler dropped)");
2609                  // Handler dropped - maybe send unavailable response?
2610              }
2611          } else {
2612              // No handler for this service
2613              debug!(service_id, "Received request for unknown service");
2614              
2615              // Send back generic "Service Unavailable" response (status 1)
2616              // This allows the requester to fail fast
2617              let response = AbzuFrame::ServiceResponse {
2618                  service_id,
2619                  request_id,
2620                  payload: vec![],
2621                  status: 1, // Generic unavailable
2622              };
2623              
2624              if let Err(e) = self.node.send(source, response).await {
2625                  warn!(target = ?source, error = ?e, "Failed to send ServiceUnavailable response");
2626              }
2627          }
2628          
2629          RouteResult::Local
2630      }
2631  
2632      /// Handle generic service response
2633      async fn handle_service_response(&self, service_id: u64, request_id: u64, payload: Vec<u8>, status: u8, _source: PeerKey) -> RouteResult {
2634          if let Some(tx) = self.node.get_service_handler(service_id) {
2635              // Forward to local service handler
2636              let event = ServiceEvent::Response {
2637                  request_id,
2638                  status,
2639                  payload,
2640              };
2641              
2642              if let Err(e) = tx.send(event).await {
2643                  warn!(service_id, error = ?e, "Failed to dispatch service response (handler dropped)");
2644              }
2645          } else {
2646              debug!(service_id, "Received response for unknown service (maybe just closed?)");
2647          }
2648          
2649          RouteResult::Local
2650      }
2651  }
2652  
2653  #[cfg(test)]
2654  mod tests {
2655      use super::*;
2656      use crate::node::NodeConfig;
2657      use tempfile::tempdir;
2658  
2659      fn create_test_node() -> Arc<Node> {
2660          let tmp = tempdir().unwrap();
2661          let config = NodeConfig {
2662              storage_path: tmp.path().to_str().unwrap().to_string(),
2663              ..Default::default()
2664          };
2665          Arc::new(Node::new(config).unwrap())
2666      }
2667  
2668      #[tokio::test]
2669      async fn test_handle_keepalive() {
2670          let node = create_test_node();
2671          let switchboard = Switchboard::new(node);
2672  
2673          let source = [0xAB; 32];
2674          let result = switchboard.handle_frame(AbzuFrame::KeepAlive, source).await;
2675  
2676          assert!(matches!(result, RouteResult::Local));
2677      }
2678  
2679      #[tokio::test]
2680      async fn test_handle_chunk() {
2681          let node = create_test_node();
2682          let switchboard = Switchboard::new(Arc::clone(&node));
2683  
2684          let data = b"test data";
2685          let cid = *blake3::hash(data).as_bytes();
2686          let source = [0xAB; 32];
2687  
2688          let result = switchboard
2689              .handle_frame(AbzuFrame::Chunk { cid, data: data.to_vec() }, source)
2690              .await;
2691  
2692          assert!(matches!(result, RouteResult::Local));
2693  
2694          // Verify chunk was stored
2695          let retrieved = node.get_chunk(&cid).unwrap().unwrap();
2696          assert_eq!(retrieved, data);
2697      }
2698  
2699      #[tokio::test]
2700      async fn test_handle_chunk_invalid_cid() {
2701          let node = create_test_node();
2702          let switchboard = Switchboard::new(node);
2703  
2704          let data = b"test data";
2705          let bad_cid = [0xFF; 32]; // Wrong CID
2706          let source = [0xAB; 32];
2707  
2708          let result = switchboard
2709              .handle_frame(AbzuFrame::Chunk { cid: bad_cid, data: data.to_vec() }, source)
2710              .await;
2711  
2712          assert!(matches!(result, RouteResult::Error(_)));
2713      }
2714  
2715      #[tokio::test]
2716      async fn test_handle_route_local_target() {
2717          let node = create_test_node();
2718          let our_key = node.peer_key();
2719          let switchboard = Switchboard::new(Arc::clone(&node));
2720  
2721          // Route frame where we are the target
2722          let payload = b"secret message".to_vec();
2723          let route_frame = AbzuFrame::Route {
2724              target: our_key,
2725              next_hop: our_key,
2726              payload: payload.clone(),
2727          };
2728  
2729          let source = [0xAB; 32];
2730          let result = switchboard.handle_frame(route_frame, source).await;
2731  
2732          // Should process locally
2733          assert!(matches!(result, RouteResult::Local));
2734      }
2735  
2736      #[tokio::test]
2737      async fn test_onion_wrap_unwrap() {
2738          // Test that wrap_onion creates proper nested structure
2739          let target = [0xCC; 32];
2740          let hop1 = [0xAA; 32];
2741          let hop2 = [0xBB; 32];
2742          let payload = b"inner payload".to_vec();
2743  
2744          // Create onion: us -> hop1 -> hop2 -> target
2745          let onion = AbzuFrame::wrap_onion(payload.clone(), target, &[hop1, hop2]);
2746  
2747          // Outer layer should have next_hop = hop1
2748          if let AbzuFrame::Route { next_hop, target: t, payload: p } = &onion {
2749              assert_eq!(*next_hop, hop1);
2750              assert_eq!(*t, target);
2751  
2752              // Unwrap first layer
2753              let inner = AbzuFrame::decode(p).unwrap();
2754              if let AbzuFrame::Route { next_hop: nh2, target: t2, payload: p2 } = &inner {
2755                  assert_eq!(*nh2, hop2);
2756                  assert_eq!(*t2, target);
2757  
2758                  // Unwrap second layer
2759                  let innermost = AbzuFrame::decode(p2).unwrap();
2760                  if let AbzuFrame::Route { next_hop: nh3, target: t3, payload: p3 } = &innermost {
2761                      assert_eq!(*nh3, target);
2762                      assert_eq!(*t3, target);
2763                      assert_eq!(p3, &payload);
2764                  } else {
2765                      panic!("Innermost should be Route");
2766                  }
2767              } else {
2768                  panic!("Inner should be Route");
2769              }
2770          } else {
2771              panic!("Outer should be Route");
2772          }
2773      }
2774  
2775      // ─────────────────────────────────────────────────────────────
2776      // Gossip Protocol Tests
2777      // ─────────────────────────────────────────────────────────────
2778  
2779      #[test]
2780      fn test_fanout_calculation() {
2781          // √n fanout, clamped to [2, 8]
2782          
2783          // Small groups: clamp to minimum of 2
2784          assert_eq!(Switchboard::calculate_fanout(1), 2);  // solo, effectively 0 peers
2785          assert_eq!(Switchboard::calculate_fanout(2), 2);  // 1 peer -> √1 = 1, clamped to 2
2786          assert_eq!(Switchboard::calculate_fanout(3), 2);  // 2 peers -> √2 = 1.4 → 2
2787          
2788          // Medium groups: √n applies
2789          assert_eq!(Switchboard::calculate_fanout(5), 2);   // 4 peers -> √4 = 2
2790          assert_eq!(Switchboard::calculate_fanout(10), 3);  // 9 peers -> √9 = 3
2791          assert_eq!(Switchboard::calculate_fanout(17), 4);  // 16 peers -> √16 = 4
2792          assert_eq!(Switchboard::calculate_fanout(26), 5);  // 25 peers -> √25 = 5
2793          
2794          // Large groups: clamp to maximum of 8
2795          assert_eq!(Switchboard::calculate_fanout(65), 8);   // √64 = 8
2796          assert_eq!(Switchboard::calculate_fanout(100), 8);  // √99 = ~10, clamped to 8
2797          assert_eq!(Switchboard::calculate_fanout(1000), 8); // √999 = ~32, clamped to 8
2798      }
2799  
2800      #[tokio::test]
2801      async fn test_seen_set_deduplication() {
2802          let node = create_test_node();
2803          let switchboard = Switchboard::new(node);
2804          
2805          let circle_id = [0x42; 32];
2806          let msg_id = 12345u64;
2807          
2808          // First time: not seen
2809          assert!(!switchboard.already_seen(&circle_id, msg_id).await);
2810          
2811          // Mark as seen
2812          switchboard.mark_seen(&circle_id, msg_id).await;
2813          
2814          // Second time: should be seen
2815          assert!(switchboard.already_seen(&circle_id, msg_id).await);
2816          
2817          // Different message: not seen
2818          assert!(!switchboard.already_seen(&circle_id, 99999).await);
2819          
2820          // Different circle: not seen
2821          assert!(!switchboard.already_seen(&[0x99; 32], msg_id).await);
2822      }
2823  }