/ abzu-core / src / lib.rs
lib.rs
   1  //! Abzu Core
   2  //!
   3  //! The main event loop and node management for the Abzu mesh network.
   4  //!
   5  //! # Architecture
   6  //!
   7  //! - `node`: Node state container (identity, router, peers, store)
   8  //! - `switchboard`: Frame handling and routing logic
   9  //! - `run()`: Main event loop using tokio::select!
  10  
  11  pub mod bootstrap;
  12  pub mod codec;
  13  pub mod config;
  14  pub mod identity;
  15  pub mod node;
  16  pub mod persistence;
  17  pub mod radicle;
  18  pub mod replication;
  19  #[cfg(feature = "webrtc")]
  20  pub mod signaling;
  21  pub mod storage;
  22  pub mod store;
  23  pub mod switchboard;
  24  pub mod trust;
  25  pub mod trusted_bootstrap;
  26  
  27  use std::net::SocketAddr;
  28  use std::sync::Arc;
  29  use std::time::Duration;
  30  
  31  use abzu_transport::rate_limit::{RateLimiter, RateLimitConfig};
  32  use tokio::net::TcpListener;
  33  use tracing::{debug, error, info, warn};
  34  
  35  pub use bootstrap::{BootstrapConfig, BootstrapResult};
  36  pub use identity::{MachineIdentity, UserIdentity};
  37  // Storage types from node
  38  pub use node::{
  39      Contact, MessageDirection, MessageStatus, Node, NodeConfig, NodeError,
  40      PeerConnection, StoredCircleMessage, StoredMessage,
  41  };
  42  // Trust types from trust module
  43  pub use trust::{
  44      Circle, CircleMember, LedgerMode, MemberRole, PruneMode, TrustAction,
  45      TrustCapacity, TrustLedgerEntry, TrustPolicy, Vouch,
  46      TrustEngine, TrustError, derive_circle_key,
  47      MAX_INVITE_CAPACITY, MAX_VOUCH_CAPACITY, MAX_INVITE_DEPTH, MIN_VOUCHES_FOR_ADMIN,
  48  };
  49  pub use store::{ContentStore, StoreError};
  50  
  51  pub use switchboard::{RouteResult, Switchboard};
  52  pub use trusted_bootstrap::{
  53      BootstrapTrustConfig, BootstrapTrustMode, BootstrapVerifyError, TofuKeystore,
  54      TrustedBootstrapNode, create_identity_message, format_pubkey, verify_bootstrap_identity,
  55  };
  56  #[cfg(feature = "webrtc")]
  57  pub use signaling::{
  58      SignalOffer, SignalAnswer, webrtc_inbox_key, encode_offer, decode_offer,
  59      encode_answer, decode_answer, create_webrtc_offer, accept_webrtc_offer, register_webrtc_peer,
  60  };
  61  
  62  use abzu_router::PeerKey;
  63  #[cfg(feature = "ghost")]
  64  use abzu_transport::cover::{CoverConfig, CoverGenerator, PatternObserver};
  65  use abzu_transport::transports::FakeTlsStream;
  66  use abzu_transport::{AbzuFrame, AbzuInterfaceExt};
  67  
  68  /// Re-export DHT types
  69  pub use abzu_dht as dht;
  70  
  71  /// Events that can occur in the node
  72  #[derive(Debug)]
  73  pub enum NodeEvent {
  74      /// A new peer connected
  75      PeerConnected(PeerKey),
  76      /// A peer disconnected
  77      PeerDisconnected(PeerKey),
  78      /// A frame was received
  79      FrameReceived { source: PeerKey, frame: AbzuFrame },
  80      /// A frame was sent
  81      FrameSent { target: PeerKey },
  82      /// Content was stored
  83      ContentStored { cid: [u8; 32] },
  84      /// Shutdown requested
  85      Shutdown,
  86  }
  87  
  88  /// Run the node event loop (without listener)
  89  ///
  90  /// This is a basic version that only handles outbound connections.
  91  /// Use `run_with_listener` for full functionality.
  92  pub async fn run(node: Arc<Node>) -> Result<(), NodeError> {
  93      run_internal(node, None).await
  94  }
  95  
  96  /// Run the node with a listener for incoming connections
  97  ///
  98  /// This is the full implementation that:
  99  /// - Binds a TcpListener on the configured port
 100  /// - Accepts incoming connections and upgrades them to FakeTLS
 101  /// - Handles heartbeat keepalives
 102  /// - Polls peers for incoming frames
 103  pub async fn run_with_listener(
 104      node: Arc<Node>,
 105      listen_addr: &str,
 106      shared_key: [u8; 32],
 107  ) -> Result<(), NodeError> {
 108      let addr: SocketAddr = listen_addr.parse()
 109          .map_err(|e| NodeError::Crypto(format!("Invalid listen address: {}", e)))?;
 110      
 111      let listener = TcpListener::bind(addr).await
 112          .map_err(|e| NodeError::Transport(abzu_transport::TransportError::Io(e)))?;
 113      
 114      info!(addr = %addr, "TCP listener bound");
 115      
 116      run_internal(node, Some((listener, shared_key))).await
 117  }
 118  
 119  /// Internal run implementation with optional listener
 120  async fn run_internal(
 121      node: Arc<Node>,
 122      listener: Option<(TcpListener, [u8; 32])>,
 123  ) -> Result<(), NodeError> {
 124      let switchboard = Switchboard::new(Arc::clone(&node));
 125      let heartbeat_ms = node.config().heartbeat_ms;
 126      let security_tier = node.config().security_tier;
 127      let shutdown = node.shutdown();
 128  
 129      // Rate limiter to prevent connection flooding
 130      let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig::default()));
 131  
 132      info!(address = %node.address(), tier = ?security_tier, "Starting Abzu node event loop");
 133  
 134      // Create heartbeat interval
 135      let mut heartbeat = tokio::time::interval(Duration::from_millis(heartbeat_ms));
 136      heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
 137  
 138      // Cover traffic setup for Ghost tier (only compiled when ghost feature enabled)
 139      #[cfg(feature = "ghost")]
 140      let (cover_tx, mut cover_rx) = tokio::sync::mpsc::channel::<AbzuFrame>(16);
 141      #[cfg(feature = "ghost")]
 142      let _cover_handle: Option<tokio::task::JoinHandle<()>> = if security_tier.uses_cover_traffic() {
 143          info!("Ghost tier active: starting cover traffic generation");
 144          let observer = PatternObserver::new(CoverConfig::ghost());
 145          let mut generator = CoverGenerator::with_observer(&observer);
 146          generator.set_sender(cover_tx.clone());
 147          Some(generator.start())
 148      } else {
 149          None
 150      };
 151  
 152      // No-op cover traffic channel for non-ghost builds
 153      #[cfg(not(feature = "ghost"))]
 154      let (_cover_tx, mut cover_rx) = tokio::sync::mpsc::channel::<AbzuFrame>(1);
 155  
 156      // Main event loop
 157      loop {
 158          // If we have a listener, use the full select with accept
 159          if let Some((ref listener, shared_key)) = listener {
 160              tokio::select! {
 161                  // Shutdown signal
 162                  _ = shutdown.notified() => {
 163                      info!("Shutdown signal received, stopping event loop");
 164                      break;
 165                  }
 166  
 167                  // Accept incoming connections
 168                  accept_result = listener.accept() => {
 169                      match accept_result {
 170                          Ok((socket, peer_addr)) => {
 171                              // Rate limit check
 172                              if !rate_limiter.check(peer_addr.ip()) {
 173                                  warn!(peer = %peer_addr, "Connection rate-limited, dropping");
 174                                  drop(socket);
 175                                  continue;
 176                              }
 177                              
 178                              info!(peer = %peer_addr, "Accepted incoming connection");
 179                              
 180                              let node_clone = Arc::clone(&node);
 181                              let key = shared_key;
 182                              
 183                              // Spawn handshake task - don't block the event loop
 184                              tokio::spawn(async move {
 185                                  match handle_inbound_connection(node_clone, socket, peer_addr, key).await {
 186                                      Ok(peer_key) => {
 187                                          info!(peer = %peer_addr, key = ?peer_key, "Peer registered");
 188                                      }
 189                                      Err(e) => {
 190                                          error!(peer = %peer_addr, error = %e, "Failed to register peer");
 191                                      }
 192                                  }
 193                              });
 194                          }
 195                          Err(e) => {
 196                              warn!(error = %e, "Failed to accept connection");
 197                          }
 198                      }
 199                  }
 200  
 201                  // Heartbeat tick - send keepalives and retry pending messages
 202                  _ = heartbeat.tick() => {
 203                      // Send keepalives
 204                      let results = switchboard.send_keepalives(heartbeat_ms).await;
 205                      for (peer, result) in results {
 206                          if let Err(e) = result {
 207                              warn!(peer = ?peer, error = %e, "Failed to send keepalive");
 208                              node.remove_peer(&peer).await;
 209                          }
 210                      }
 211                      
 212                      // Retry pending outbound messages
 213                      // Retry after 10s, fail after 5 minutes
 214                      let (retried, failed) = switchboard.retry_pending_messages(10_000, 300_000).await;
 215                      if retried > 0 || failed > 0 {
 216                          debug!(retried = retried, failed = failed, "Message retry cycle");
 217                      }
 218                  }
 219  
 220                  // Poll all peer connections for incoming frames
 221                  frame_result = poll_peers(Arc::clone(&node)) => {
 222                      if let Some((source, frame)) = frame_result {
 223                          handle_frame_result(&switchboard, source, frame).await;
 224                      }
 225                  }
 226  
 227                  // Cover traffic broadcast (Ghost tier only)
 228                  cover_frame = cover_rx.recv() => {
 229                      if let Some(frame) = cover_frame {
 230                          // Encode Cover frame - it encrypts identically to real traffic
 231                          if let Ok(encoded) = frame.encode() {
 232                              debug!(size = encoded.len(), "Broadcasting cover frame");
 233                              switchboard.broadcast_raw(&encoded).await;
 234                          }
 235                      }
 236                  }
 237              }
 238          } else {
 239              // No listener - simplified select
 240              tokio::select! {
 241                  // Shutdown signal
 242                  _ = shutdown.notified() => {
 243                      info!("Shutdown signal received, stopping event loop");
 244                      break;
 245                  }
 246  
 247                  // Heartbeat tick - send keepalives and retry pending messages
 248                  _ = heartbeat.tick() => {
 249                      // Send keepalives
 250                      let results = switchboard.send_keepalives(heartbeat_ms).await;
 251                      for (peer, result) in results {
 252                          if let Err(e) = result {
 253                              warn!(peer = ?peer, error = %e, "Failed to send keepalive");
 254                              node.remove_peer(&peer).await;
 255                          }
 256                      }
 257                      
 258                      // Retry pending outbound messages
 259                      // Retry after 10s, fail after 5 minutes
 260                      let (retried, failed) = switchboard.retry_pending_messages(10_000, 300_000).await;
 261                      if retried > 0 || failed > 0 {
 262                          debug!(retried = retried, failed = failed, "Message retry cycle");
 263                      }
 264                  }
 265  
 266                  // Poll all peer connections for incoming frames
 267                  frame_result = poll_peers(Arc::clone(&node)) => {
 268                      if let Some((source, frame)) = frame_result {
 269                          handle_frame_result(&switchboard, source, frame).await;
 270                      }
 271                  }
 272  
 273                  // Cover traffic broadcast (Ghost tier only)
 274                  cover_frame = cover_rx.recv() => {
 275                      if let Some(frame) = cover_frame {
 276                          // Encode Cover frame - it encrypts identically to real traffic
 277                          if let Ok(encoded) = frame.encode() {
 278                              debug!(size = encoded.len(), "Broadcasting cover frame");
 279                              switchboard.broadcast_raw(&encoded).await;
 280                          }
 281                      }
 282                  }
 283              }
 284          }
 285      }
 286  
 287      // Graceful shutdown - close all peer connections
 288      info!("Closing peer connections...");
 289      let peer_keys: Vec<PeerKey> = {
 290          let peers_arc = node.peers();
 291          let peers = peers_arc.lock().await;
 292          peers.keys().copied().collect()
 293      };
 294  
 295      for key in peer_keys {
 296          if let Some(conn) = node.remove_peer(&key).await
 297              && let Err(e) = conn.interface.close().await
 298          {
 299              warn!(peer = ?key, error = %e, "Error closing peer connection");
 300          }
 301      }
 302  
 303      info!("Abzu node stopped");
 304      Ok(())
 305  }
 306  
 307  /// Handle an inbound connection - upgrade to FakeTLS and register as peer
 308  async fn handle_inbound_connection(
 309      node: Arc<Node>,
 310      socket: tokio::net::TcpStream,
 311      peer_addr: SocketAddr,
 312      shared_key: [u8; 32],
 313  ) -> Result<PeerKey, NodeError> {
 314      // Perform FakeTLS handshake with Perfect Forward Secrecy
 315      // The handshake returns both the stream AND the peer's X25519 public key
 316      let handshake_result = FakeTlsStream::accept_pfs(socket, Some(&shared_key)).await?;
 317      
 318      // Use the peer's X25519 ephemeral public key as their cryptographic identity
 319      // This is secure because:
 320      // 1. The key is derived from a proper DH exchange
 321      // 2. Only someone with the PSK can derive the session key
 322      // 3. The identity is bound to the cryptographic handshake, not the network address
 323      let peer_key = handshake_result.peer_pubkey;
 324      
 325      info!(
 326          peer_addr = %peer_addr,
 327          peer_pubkey = ?&peer_key[..8],
 328          "Peer authenticated via X25519 key exchange"
 329      );
 330  
 331      // Register the peer with their cryptographic identity
 332      let conn = PeerConnection::new(Box::new(handshake_result.stream));
 333      node.add_peer(peer_key, conn).await;
 334  
 335      Ok(peer_key)
 336  }
 337  
 338  /// Handle a frame result from the switchboard
 339  async fn handle_frame_result(switchboard: &Switchboard, source: PeerKey, frame: AbzuFrame) {
 340      let result = switchboard.handle_frame(frame, source).await;
 341      
 342      match result {
 343          RouteResult::Local => {
 344              debug!(source = ?source, "Frame processed locally");
 345          }
 346          RouteResult::Forwarded(next) => {
 347              debug!(source = ?source, next = ?next, "Frame forwarded");
 348          }
 349          RouteResult::NoRoute => {
 350              warn!(source = ?source, "No route for frame");
 351          }
 352          RouteResult::Error(e) => {
 353              error!(source = ?source, error = %e, "Error handling frame");
 354          }
 355      }
 356  }
 357  
 358  /// Poll all peers for incoming frames
 359  ///
 360  /// Returns the first available frame from any peer.
 361  async fn poll_peers(node: Arc<Node>) -> Option<(PeerKey, AbzuFrame)> {
 362      let peers_arc = node.peers();
 363      let peer_keys: Vec<PeerKey> = {
 364          let guard = peers_arc.lock().await;
 365          guard.keys().copied().collect()
 366      };
 367  
 368      if peer_keys.is_empty() {
 369          // No peers - just yield
 370          tokio::time::sleep(Duration::from_millis(10)).await;
 371          return None;
 372      }
 373  
 374      // Try to receive from each peer with a short timeout
 375      for key in peer_keys {
 376          let result = {
 377              let mut guard = peers_arc.lock().await;
 378              if let Some(conn) = guard.get_mut(&key) {
 379                  // Non-blocking receive attempt
 380                  tokio::time::timeout(
 381                      Duration::from_millis(1),
 382                      conn.interface.recv_frame(),
 383                  )
 384                  .await
 385              } else {
 386                  continue;
 387              }
 388          };
 389  
 390          match result {
 391              Ok(Ok(frame)) => {
 392                  // Mark activity
 393                  let mut guard = peers_arc.lock().await;
 394                  if let Some(conn) = guard.get_mut(&key) {
 395                      conn.touch();
 396                  }
 397                  return Some((key, frame));
 398              }
 399              Ok(Err(e)) => {
 400                  // Transport error - peer may be disconnected
 401                  warn!(peer = ?key, error = %e, "Error receiving from peer");
 402                  node.remove_peer(&key).await;
 403              }
 404              Err(_) => {
 405                  // Timeout - no data available, try next peer
 406                  continue;
 407              }
 408          }
 409      }
 410  
 411      None
 412  }
 413  
 414  /// Connect to a peer (outbound connection)
 415  ///
 416  /// Uses X25519 ephemeral key exchange to establish a secure connection
 417  /// with Perfect Forward Secrecy. The peer's ephemeral public key is
 418  /// used as their cryptographic identity.
 419  pub async fn connect_peer(
 420      node: Arc<Node>,
 421      addr: &str,
 422      shared_key: &[u8; 32],
 423  ) -> Result<PeerKey, NodeError> {
 424      info!(addr = addr, "Connecting to peer");
 425  
 426      // Connect with Perfect Forward Secrecy (ephemeral X25519 key exchange)
 427      // The handshake returns both the stream AND the peer's X25519 public key
 428      let handshake_result = FakeTlsStream::connect_pfs(addr, Some(shared_key)).await?;
 429      
 430      // Use the peer's X25519 ephemeral public key as their cryptographic identity
 431      // This ensures peers are authenticated by their handshake, not their network address
 432      let peer_key = handshake_result.peer_pubkey;
 433      
 434      info!(
 435          addr = addr,
 436          peer_pubkey = ?&peer_key[..8],
 437          "Connected and authenticated peer via X25519 key exchange"
 438      );
 439  
 440      let conn = PeerConnection::new(Box::new(handshake_result.stream));
 441      node.add_peer(peer_key, conn).await;
 442  
 443      Ok(peer_key)
 444  }
 445  
 446  /// Connect to a peer with identity verification (for bootstrap nodes)
 447  ///
 448  /// Performs an Ed25519 challenge-response to verify the bootstrap node's identity.
 449  /// This prevents Sybil attacks where an attacker impersonates a bootstrap node.
 450  ///
 451  /// # Arguments
 452  /// * `node` - The local node
 453  /// * `addr` - Address to connect to  
 454  /// * `shared_key` - Pre-shared key for transport encryption
 455  /// * `expected_identity` - If Some, the node must prove ownership of this Ed25519 key
 456  ///
 457  /// # Returns
 458  /// * `Ok((peer_key, identity_pubkey))` - Connection successful with verified identity
 459  /// * `Err(_)` - Connection failed or identity verification failed
 460  pub async fn connect_peer_verified(
 461      node: Arc<Node>,
 462      addr: &str,
 463      shared_key: &[u8; 32],
 464      expected_identity: Option<&[u8; 32]>,
 465  ) -> Result<(PeerKey, Option<[u8; 32]>), NodeError> {
 466      use rand::RngCore;
 467      use std::time::{SystemTime, UNIX_EPOCH};
 468      
 469      info!(addr = addr, verify = expected_identity.is_some(), "Connecting to peer with verification");
 470  
 471      // Connect with Perfect Forward Secrecy
 472      let handshake_result = FakeTlsStream::connect_pfs(addr, Some(shared_key)).await?;
 473      let peer_key = handshake_result.peer_pubkey;
 474      
 475      // If we need to verify identity, send a challenge
 476      let verified_identity = if expected_identity.is_some() {
 477          // Generate random challenge nonce
 478          let mut challenge = [0u8; 32];
 479          rand::thread_rng().fill_bytes(&mut challenge);
 480          
 481          let timestamp = SystemTime::now()
 482              .duration_since(UNIX_EPOCH)
 483              .unwrap_or_default()
 484              .as_secs();
 485          
 486          // Send Hello with challenge
 487          let hello = AbzuFrame::hello_with_challenge(peer_key, timestamp, challenge);
 488          handshake_result.stream.send_frame(&hello).await?;
 489          
 490          // Receive HelloAck with identity signature
 491          let response = tokio::time::timeout(
 492              Duration::from_secs(5),
 493              handshake_result.stream.recv_frame()
 494          )
 495          .await
 496          .map_err(|_| NodeError::Transport(abzu_transport::TransportError::Timeout))?
 497          .map_err(NodeError::Transport)?;
 498          
 499          match response {
 500              AbzuFrame::HelloAck { 
 501                  ephemeral_pub,
 502                  identity_pubkey: Some(identity_pubkey),
 503                  identity_signature,
 504                  ..
 505              } if !identity_signature.is_empty() => {
 506                  // Verify the signature
 507                  verify_bootstrap_identity(
 508                      &challenge,
 509                      &ephemeral_pub,
 510                      timestamp,
 511                      &identity_pubkey,
 512                      &identity_signature,
 513                  ).map_err(|e| NodeError::Connection(format!("Bootstrap verification failed: {}", e)))?;
 514                  
 515                  // Verify it matches expected identity (if pinned)
 516                  if let Some(expected) = expected_identity {
 517                      if &identity_pubkey != expected {
 518                          return Err(NodeError::Connection(format!(
 519                              "Identity mismatch: expected {}, got {}",
 520                              format_pubkey(expected),
 521                              format_pubkey(&identity_pubkey)
 522                          )));
 523                      }
 524                  }
 525                  
 526                  info!(
 527                      addr = addr,
 528                      identity = %format_pubkey(&identity_pubkey),
 529                      "Bootstrap node identity verified"
 530                  );
 531                  
 532                  Some(identity_pubkey)
 533              }
 534              AbzuFrame::HelloAck { identity_pubkey: None, .. } | AbzuFrame::HelloAck { .. } => {
 535                  // Node didn't provide identity - fail if we required it
 536                  if expected_identity.is_some() {
 537                      return Err(NodeError::Connection(
 538                          "Bootstrap node did not provide identity for verification".into()
 539                      ));
 540                  }
 541                  None
 542              }
 543              _ => {
 544                  return Err(NodeError::Connection(
 545                      format!("Unexpected response to Hello challenge: {:?}", std::mem::discriminant(&response))
 546                  ));
 547              }
 548          }
 549      } else {
 550          None
 551      };
 552      
 553      info!(
 554          addr = addr,
 555          peer_pubkey = ?&peer_key[..8],
 556          verified = verified_identity.is_some(),
 557          "Connected to peer"
 558      );
 559  
 560      let conn = PeerConnection::new(Box::new(handshake_result.stream));
 561      node.add_peer(peer_key, conn).await;
 562  
 563      Ok((peer_key, verified_identity))
 564  }
 565  
 566  /// Bootstrap the node into the network
 567  ///
 568  /// Attempts to connect to peers in priority order:
 569  /// 1. User-configured bootstrap peers
 570  /// 2. Previously known peers (submarine test resilience)
 571  /// 3. Default community nodes (if enabled)
 572  ///
 573  /// Uses exponential backoff for failed connections.
 574  pub async fn bootstrap(
 575      node: Arc<Node>,
 576      shared_key: &[u8; 32],
 577      config: BootstrapConfig,
 578  ) -> BootstrapResult {
 579      use std::sync::Mutex;
 580      use std::path::PathBuf;
 581      
 582      // Security warning for unverified bootstrap
 583      match config.trust.mode {
 584          BootstrapTrustMode::Any => {
 585              warn!("⚠️  SECURITY WARNING: Bootstrap verification disabled (trust mode: Any)");
 586              warn!("⚠️  This exposes the node to potential Sybil attacks during network entry");
 587              warn!("⚠️  Consider using Pinned or TrustOnFirstUse mode in production");
 588          }
 589          BootstrapTrustMode::Pinned => {
 590              info!(
 591                  trusted_nodes = config.trust.pinned_nodes.len(),
 592                  "Bootstrap verification: Pinned mode"
 593              );
 594          }
 595          BootstrapTrustMode::TrustOnFirstUse => {
 596              info!("Bootstrap verification: Trust-on-first-use mode");
 597          }
 598      }
 599      
 600      // Load TOFU keystore if needed (wrapped in Mutex for interior mutability)
 601      let tofu_keystore: Option<Arc<Mutex<TofuKeystore>>> = if config.trust.mode == BootstrapTrustMode::TrustOnFirstUse {
 602          let path = config.trust.tofu_keystore_path.as_ref()
 603              .map(PathBuf::from)
 604              .unwrap_or_else(|| {
 605                  // Default path: data_dir/bootstrap_keys.json
 606                  node.data_dir().join("bootstrap_keys.json")
 607              });
 608          match TofuKeystore::load(&path) {
 609              Ok(ks) => {
 610                  debug!(path = %path.display(), keys = ks.len(), "Loaded TOFU keystore");
 611                  Some(Arc::new(Mutex::new(ks)))
 612              }
 613              Err(e) => {
 614                  warn!(error = %e, "Failed to load TOFU keystore, starting fresh");
 615                  Some(Arc::new(Mutex::new(TofuKeystore::default())))
 616              }
 617          }
 618      } else {
 619          None
 620      };
 621      let tofu_path: Option<PathBuf> = config.trust.tofu_keystore_path.as_ref()
 622          .map(PathBuf::from)
 623          .or_else(|| {
 624              if config.trust.mode == BootstrapTrustMode::TrustOnFirstUse {
 625                  Some(node.data_dir().join("bootstrap_keys.json"))
 626              } else {
 627                  None
 628              }
 629          });
 630      
 631      let mut connected = 0usize;
 632      let mut connected_peers: Vec<PeerKey> = Vec::new();
 633      let mut failed = 0usize;
 634      
 635      // Phase 1: Try configured bootstrap peers
 636      let addresses = config.all_addresses();
 637      if !addresses.is_empty() {
 638          info!(count = addresses.len(), "Connecting to configured bootstrap peers");
 639          
 640          for addr in &addresses {
 641              let result = try_verified_connect_with_retry(
 642                  Arc::clone(&node),
 643                  addr,
 644                  shared_key,
 645                  config.retry_attempts,
 646                  config.initial_backoff_ms,
 647                  &config.trust,
 648                  &tofu_keystore,
 649                  &tofu_path,
 650              ).await;
 651              
 652              match result {
 653                  Ok(peer_key) => {
 654                      info!(addr = %addr, peer = ?&peer_key[..8], "Connected to bootstrap peer");
 655                      connected_peers.push(peer_key);
 656                      connected += 1;
 657                  }
 658                  Err(e) => {
 659                      warn!(addr = %addr, error = %e, "Failed to connect to bootstrap peer");
 660                      failed += 1;
 661                  }
 662              }
 663          }
 664      }
 665      
 666      // Phase 2: Try known peers (submarine test - reconnect after restart)
 667      if connected == 0 || connected < config.max_concurrent {
 668          let known_peers = node.known_peers().load_all().unwrap_or_default();
 669          let limit = config.known_peers_limit.saturating_sub(connected);
 670          
 671          if !known_peers.is_empty() {
 672              info!(
 673                  available = known_peers.len(),
 674                  trying = limit.min(known_peers.len()),
 675                  "Reconnecting to known peers"
 676              );
 677              
 678              for peer in known_peers.into_iter().take(limit) {
 679                  let addr = peer.address.to_string();
 680                  
 681                  // Skip if we already tried this address in phase 1
 682                  if addresses.iter().any(|a| a == &addr) {
 683                      continue;
 684                  }
 685                  
 686                  match try_connect_with_retry(
 687                      Arc::clone(&node),
 688                      &addr,
 689                      shared_key,
 690                      1, // Single retry for known peers
 691                      config.initial_backoff_ms,
 692                  ).await {
 693                      Ok(peer_key) => {
 694                          info!(addr = %addr, "Reconnected to known peer");
 695                          // Update success in peer store
 696                          if let Ok(router) = node.router().try_read() {
 697                              let coords = router.self_coords().clone();
 698                              let _ = node.known_peers().record_success(peer_key, peer.address, coords);
 699                          }
 700                          connected_peers.push(peer_key);
 701                          connected += 1;
 702                      }
 703                      Err(_) => {
 704                          // Record failure
 705                          let _ = node.known_peers().record_failure(&peer.key);
 706                          failed += 1;
 707                      }
 708                  }
 709                  
 710                  if connected >= config.max_concurrent {
 711                      break;
 712                  }
 713              }
 714          }
 715      }
 716      
 717      // Prune stale peers opportunistically
 718      if let Ok(pruned) = node.known_peers().prune()
 719          && pruned > 0 {
 720              debug!(pruned = pruned, "Pruned stale peers");
 721      }
 722      
 723      // Phase 3: DHT bootstrap - seed routing table and perform self-lookup
 724      if !connected_peers.is_empty() {
 725          info!("Starting DHT bootstrap...");
 726          let switchboard = Switchboard::new(Arc::clone(&node));
 727          
 728          // Add connected peers as DHT seeds
 729          for peer_key in &connected_peers {
 730              switchboard.add_dht_seed(*peer_key).await;
 731          }
 732          
 733          // Self-lookup to discover nearby nodes and populate routing table
 734          let our_id = switchboard.our_dht_key();
 735          let timeout_ms = 10_000;
 736          match switchboard.dht_find_node(&our_id, timeout_ms).await {
 737              Ok(nodes) => {
 738                  info!(found = nodes.len(), "DHT routing table populated");
 739              }
 740              Err(e) => {
 741                  warn!(error = %e, "DHT bootstrap failed, continuing anyway");
 742              }
 743          }
 744      }
 745      
 746      let result = BootstrapResult::new(connected, failed);
 747      
 748      if result.success {
 749          info!(connected = connected, failed = failed, "Bootstrap complete");
 750      } else {
 751          warn!(failed = failed, "Bootstrap failed - no peers connected");
 752      }
 753      
 754      result
 755  }
 756  
 757  /// Try to connect to a peer with exponential backoff retry
 758  async fn try_connect_with_retry(
 759      node: Arc<Node>,
 760      addr: &str,
 761      shared_key: &[u8; 32],
 762      max_attempts: u8,
 763      initial_backoff_ms: u64,
 764  ) -> Result<PeerKey, NodeError> {
 765      use bootstrap::backoff_delay;
 766      
 767      let mut last_error = None;
 768      
 769      for attempt in 0..max_attempts {
 770          if attempt > 0 {
 771              let delay = backoff_delay(attempt - 1, initial_backoff_ms);
 772              debug!(addr = %addr, attempt = attempt + 1, delay_ms = ?delay.as_millis(), "Retrying connection");
 773              tokio::time::sleep(delay).await;
 774          }
 775          
 776          match connect_peer(Arc::clone(&node), addr, shared_key).await {
 777              Ok(peer_key) => return Ok(peer_key),
 778              Err(e) => {
 779                  last_error = Some(e);
 780              }
 781          }
 782      }
 783      
 784      Err(last_error.unwrap_or_else(|| NodeError::Connection("No attempts made".into())))
 785  }
 786  
 787  /// Try to connect with trust verification (for bootstrap nodes)
 788  async fn try_verified_connect_with_retry(
 789      node: Arc<Node>,
 790      addr: &str,
 791      shared_key: &[u8; 32],
 792      max_attempts: u8,
 793      initial_backoff_ms: u64,
 794      trust_config: &BootstrapTrustConfig,
 795      tofu_keystore: &Option<std::sync::Arc<std::sync::Mutex<TofuKeystore>>>,
 796      tofu_path: &Option<std::path::PathBuf>,
 797  ) -> Result<PeerKey, NodeError> {
 798      use bootstrap::backoff_delay;
 799      
 800      let mut last_error = None;
 801      
 802      for attempt in 0..max_attempts {
 803          if attempt > 0 {
 804              let delay = backoff_delay(attempt - 1, initial_backoff_ms);
 805              debug!(addr = %addr, attempt = attempt + 1, delay_ms = ?delay.as_millis(), "Retrying verified connection");
 806              tokio::time::sleep(delay).await;
 807          }
 808          
 809          // Determine expected identity based on trust mode (copy the key to avoid lifetime issues)
 810          let expected_identity_owned: Option<[u8; 32]> = match trust_config.mode {
 811              BootstrapTrustMode::Any => None, // No verification
 812              BootstrapTrustMode::Pinned => trust_config.get_pinned_key(addr).copied(),
 813              BootstrapTrustMode::TrustOnFirstUse => {
 814                  // Check TOFU keystore for previously seen key
 815                  if let Some(ks) = tofu_keystore {
 816                      let ks_guard = ks.lock().unwrap();
 817                      ks_guard.get(addr).copied()
 818                  } else {
 819                      None
 820                  }
 821              }
 822          };
 823          
 824          // If Pinned mode but no pinned key for this address, skip verification
 825          // (allows mixed verified/unverified bootstrap nodes)
 826          if trust_config.mode == BootstrapTrustMode::Pinned && expected_identity_owned.is_none() {
 827              debug!(addr = %addr, "No pinned key for address, using unverified connection");
 828              match connect_peer(Arc::clone(&node), addr, shared_key).await {
 829                  Ok(peer_key) => return Ok(peer_key),
 830                  Err(e) => {
 831                      last_error = Some(e);
 832                      continue;
 833                  }
 834              }
 835          }
 836          
 837          // Use verified connection
 838          match connect_peer_verified(
 839              Arc::clone(&node),
 840              addr,
 841              shared_key,
 842              expected_identity_owned.as_ref(),
 843          ).await {
 844              Ok((peer_key, verified_identity)) => {
 845                  // For TOFU mode, store the verified identity for future connections
 846                  if trust_config.mode == BootstrapTrustMode::TrustOnFirstUse {
 847                      if let Some(identity) = verified_identity {
 848                          if let Some(ks) = tofu_keystore {
 849                              let mut ks_guard = ks.lock().unwrap();
 850                              let is_new = ks_guard.get(addr).is_none();
 851                              if is_new {
 852                                  let timestamp = std::time::SystemTime::now()
 853                                      .duration_since(std::time::UNIX_EPOCH)
 854                                      .map(|d| d.as_secs())
 855                                      .unwrap_or(0);
 856                                  ks_guard.store(addr.to_string(), identity, timestamp);
 857                                  info!(addr = %addr, "Stored new bootstrap identity (TOFU)");
 858                                  
 859                                  // Persist keystore
 860                                  if let Some(path) = tofu_path {
 861                                      if let Err(e) = ks_guard.save(path) {
 862                                          warn!(error = %e, "Failed to persist TOFU keystore");
 863                                      }
 864                                  }
 865                              } else {
 866                                  // Verify it matches stored key
 867                                  if let Some(false) = ks_guard.verify(addr, &identity) {
 868                                      warn!(
 869                                          addr = %addr,
 870                                          "Bootstrap identity changed! Previous key differs from current"
 871                                      );
 872                                      // Don't fail - TOFU is "warn on change" not "fail on change"
 873                                  }
 874                              }
 875                          }
 876                      }
 877                  }
 878                  return Ok(peer_key);
 879              }
 880              Err(e) => {
 881                  last_error = Some(e);
 882              }
 883          }
 884      }
 885      
 886      Err(last_error.unwrap_or_else(|| NodeError::Connection("No attempts made".into())))
 887  }
 888  
 889  /// Discover our NAT mapping for peer hole punching
 890  ///
 891  /// Returns the discovered public IP:port that can be shared with peers
 892  /// for direct connection establishment.
 893  pub async fn discover_nat() -> Result<abzu_transport::NatMapping, abzu_transport::NatError> {
 894      let resolver = abzu_transport::NatResolver::new();
 895      resolver.discover().await
 896  }
 897  
 898  /// Connect to a peer with NAT hole punching
 899  ///
 900  /// This performs coordinated hole punching to establish a connection
 901  /// through NAT. Both peers should call this simultaneously after
 902  /// exchanging their mapped addresses out-of-band.
 903  ///
 904  /// # Arguments
 905  /// * `node` - The node context
 906  /// * `peer_mapped_addr` - The peer's public mapped address (from their discover_nat)
 907  /// * `shared_key` - Encryption key for the channel
 908  ///
 909  /// # Returns
 910  /// The peer key if successful, or an error if hole punch fails
 911  pub async fn connect_peer_nat(
 912      node: Arc<Node>,
 913      peer_mapped_addr: std::net::SocketAddr,
 914      shared_key: &[u8; 32],
 915  ) -> Result<PeerKey, NodeError> {
 916      use abzu_transport::{NatResolver, NatType};
 917      
 918      info!(target = %peer_mapped_addr, "Starting NAT-aware connection");
 919      
 920      // Discover our own NAT mapping
 921      let resolver = NatResolver::new();
 922      let our_mapping = resolver.discover().await.map_err(|e| {
 923          NodeError::Connection(format!("NAT discovery failed: {}", e))
 924      })?;
 925      
 926      info!(
 927          local = %our_mapping.local_addr,
 928          mapped = %our_mapping.mapped_addr,
 929          nat_type = ?our_mapping.nat_type,
 930          "NAT discovery completed"
 931      );
 932      
 933      // Check if NAT is traversable
 934      if our_mapping.nat_type == NatType::Symmetric {
 935          warn!("Symmetric NAT detected - connection may fail, consider relay");
 936      }
 937      
 938      // Perform hole punch
 939      let punch_socket = resolver.punch(peer_mapped_addr).await.map_err(|e| {
 940          NodeError::Connection(format!("Hole punch failed: {}", e))
 941      })?;
 942      
 943      // After hole punch, try TCP over the punched path
 944      let addr = peer_mapped_addr.to_string();
 945      let handshake_result = FakeTlsStream::connect_pfs(&addr, Some(shared_key)).await?;
 946      
 947      // Use X25519 peer pubkey as cryptographic identity (not address hash!)
 948      let peer_key = handshake_result.peer_pubkey;
 949      
 950      info!(
 951          peer_addr = %peer_mapped_addr,
 952          peer_pubkey = ?&peer_key[..8],
 953          "NAT-aware peer authenticated via X25519"
 954      );
 955      
 956      let conn = PeerConnection::new(Box::new(handshake_result.stream));
 957      node.add_peer(peer_key, conn).await;
 958      
 959      // Clean up the punch socket (it was just for NAT binding)
 960      drop(punch_socket);
 961      
 962      info!("NAT-aware connection established");
 963      Ok(peer_key)
 964  }
 965  
 966  #[cfg(test)]
 967  mod tests {
 968      use super::*;
 969      use tempfile::tempdir;
 970  
 971      fn create_test_node() -> Arc<Node> {
 972          let tmp = tempdir().unwrap();
 973          let config = NodeConfig {
 974              storage_path: tmp.path().to_str().unwrap().to_string(),
 975              ..Default::default()
 976          };
 977          Arc::new(Node::new(config).unwrap())
 978      }
 979  
 980      #[tokio::test]
 981      async fn test_node_lifecycle() {
 982          let node = create_test_node();
 983          let node_clone = Arc::clone(&node);
 984  
 985          // Spawn the event loop
 986          let handle = tokio::spawn(async move {
 987              run(node_clone).await
 988          });
 989  
 990          // Let it run briefly
 991          tokio::time::sleep(Duration::from_millis(100)).await;
 992  
 993          // Signal shutdown
 994          node.signal_shutdown();
 995  
 996          // Wait for clean exit
 997          let result = handle.await.unwrap();
 998          assert!(result.is_ok());
 999      }
1000  
1001      #[tokio::test]
1002      async fn test_poll_peers_empty() {
1003          let node = create_test_node();
1004          let result = poll_peers(node).await;
1005          assert!(result.is_none());
1006      }
1007  }