lib.rs
1 //! Abzu Core 2 //! 3 //! The main event loop and node management for the Abzu mesh network. 4 //! 5 //! # Architecture 6 //! 7 //! - `node`: Node state container (identity, router, peers, store) 8 //! - `switchboard`: Frame handling and routing logic 9 //! - `run()`: Main event loop using tokio::select! 10 11 pub mod bootstrap; 12 pub mod codec; 13 pub mod config; 14 pub mod identity; 15 pub mod node; 16 pub mod persistence; 17 pub mod radicle; 18 pub mod replication; 19 #[cfg(feature = "webrtc")] 20 pub mod signaling; 21 pub mod storage; 22 pub mod store; 23 pub mod switchboard; 24 pub mod trust; 25 pub mod trusted_bootstrap; 26 27 use std::net::SocketAddr; 28 use std::sync::Arc; 29 use std::time::Duration; 30 31 use abzu_transport::rate_limit::{RateLimiter, RateLimitConfig}; 32 use tokio::net::TcpListener; 33 use tracing::{debug, error, info, warn}; 34 35 pub use bootstrap::{BootstrapConfig, BootstrapResult}; 36 pub use identity::{MachineIdentity, UserIdentity}; 37 // Storage types from node 38 pub use node::{ 39 Contact, MessageDirection, MessageStatus, Node, NodeConfig, NodeError, 40 PeerConnection, StoredCircleMessage, StoredMessage, 41 }; 42 // Trust types from trust module 43 pub use trust::{ 44 Circle, CircleMember, LedgerMode, MemberRole, PruneMode, TrustAction, 45 TrustCapacity, TrustLedgerEntry, TrustPolicy, Vouch, 46 TrustEngine, TrustError, derive_circle_key, 47 MAX_INVITE_CAPACITY, MAX_VOUCH_CAPACITY, MAX_INVITE_DEPTH, MIN_VOUCHES_FOR_ADMIN, 48 }; 49 pub use store::{ContentStore, StoreError}; 50 51 pub use switchboard::{RouteResult, Switchboard}; 52 pub use trusted_bootstrap::{ 53 BootstrapTrustConfig, BootstrapTrustMode, BootstrapVerifyError, TofuKeystore, 54 TrustedBootstrapNode, create_identity_message, format_pubkey, verify_bootstrap_identity, 55 }; 56 #[cfg(feature = "webrtc")] 57 pub use signaling::{ 58 SignalOffer, SignalAnswer, webrtc_inbox_key, encode_offer, decode_offer, 59 encode_answer, decode_answer, create_webrtc_offer, accept_webrtc_offer, register_webrtc_peer, 60 }; 61 62 use abzu_router::PeerKey; 63 #[cfg(feature = "ghost")] 64 use abzu_transport::cover::{CoverConfig, CoverGenerator, PatternObserver}; 65 use abzu_transport::transports::FakeTlsStream; 66 use abzu_transport::{AbzuFrame, AbzuInterfaceExt}; 67 68 /// Re-export DHT types 69 pub use abzu_dht as dht; 70 71 /// Events that can occur in the node 72 #[derive(Debug)] 73 pub enum NodeEvent { 74 /// A new peer connected 75 PeerConnected(PeerKey), 76 /// A peer disconnected 77 PeerDisconnected(PeerKey), 78 /// A frame was received 79 FrameReceived { source: PeerKey, frame: AbzuFrame }, 80 /// A frame was sent 81 FrameSent { target: PeerKey }, 82 /// Content was stored 83 ContentStored { cid: [u8; 32] }, 84 /// Shutdown requested 85 Shutdown, 86 } 87 88 /// Run the node event loop (without listener) 89 /// 90 /// This is a basic version that only handles outbound connections. 91 /// Use `run_with_listener` for full functionality. 92 pub async fn run(node: Arc<Node>) -> Result<(), NodeError> { 93 run_internal(node, None).await 94 } 95 96 /// Run the node with a listener for incoming connections 97 /// 98 /// This is the full implementation that: 99 /// - Binds a TcpListener on the configured port 100 /// - Accepts incoming connections and upgrades them to FakeTLS 101 /// - Handles heartbeat keepalives 102 /// - Polls peers for incoming frames 103 pub async fn run_with_listener( 104 node: Arc<Node>, 105 listen_addr: &str, 106 shared_key: [u8; 32], 107 ) -> Result<(), NodeError> { 108 let addr: SocketAddr = listen_addr.parse() 109 .map_err(|e| NodeError::Crypto(format!("Invalid listen address: {}", e)))?; 110 111 let listener = TcpListener::bind(addr).await 112 .map_err(|e| NodeError::Transport(abzu_transport::TransportError::Io(e)))?; 113 114 info!(addr = %addr, "TCP listener bound"); 115 116 run_internal(node, Some((listener, shared_key))).await 117 } 118 119 /// Internal run implementation with optional listener 120 async fn run_internal( 121 node: Arc<Node>, 122 listener: Option<(TcpListener, [u8; 32])>, 123 ) -> Result<(), NodeError> { 124 let switchboard = Switchboard::new(Arc::clone(&node)); 125 let heartbeat_ms = node.config().heartbeat_ms; 126 let security_tier = node.config().security_tier; 127 let shutdown = node.shutdown(); 128 129 // Rate limiter to prevent connection flooding 130 let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig::default())); 131 132 info!(address = %node.address(), tier = ?security_tier, "Starting Abzu node event loop"); 133 134 // Create heartbeat interval 135 let mut heartbeat = tokio::time::interval(Duration::from_millis(heartbeat_ms)); 136 heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 137 138 // Cover traffic setup for Ghost tier (only compiled when ghost feature enabled) 139 #[cfg(feature = "ghost")] 140 let (cover_tx, mut cover_rx) = tokio::sync::mpsc::channel::<AbzuFrame>(16); 141 #[cfg(feature = "ghost")] 142 let _cover_handle: Option<tokio::task::JoinHandle<()>> = if security_tier.uses_cover_traffic() { 143 info!("Ghost tier active: starting cover traffic generation"); 144 let observer = PatternObserver::new(CoverConfig::ghost()); 145 let mut generator = CoverGenerator::with_observer(&observer); 146 generator.set_sender(cover_tx.clone()); 147 Some(generator.start()) 148 } else { 149 None 150 }; 151 152 // No-op cover traffic channel for non-ghost builds 153 #[cfg(not(feature = "ghost"))] 154 let (_cover_tx, mut cover_rx) = tokio::sync::mpsc::channel::<AbzuFrame>(1); 155 156 // Main event loop 157 loop { 158 // If we have a listener, use the full select with accept 159 if let Some((ref listener, shared_key)) = listener { 160 tokio::select! { 161 // Shutdown signal 162 _ = shutdown.notified() => { 163 info!("Shutdown signal received, stopping event loop"); 164 break; 165 } 166 167 // Accept incoming connections 168 accept_result = listener.accept() => { 169 match accept_result { 170 Ok((socket, peer_addr)) => { 171 // Rate limit check 172 if !rate_limiter.check(peer_addr.ip()) { 173 warn!(peer = %peer_addr, "Connection rate-limited, dropping"); 174 drop(socket); 175 continue; 176 } 177 178 info!(peer = %peer_addr, "Accepted incoming connection"); 179 180 let node_clone = Arc::clone(&node); 181 let key = shared_key; 182 183 // Spawn handshake task - don't block the event loop 184 tokio::spawn(async move { 185 match handle_inbound_connection(node_clone, socket, peer_addr, key).await { 186 Ok(peer_key) => { 187 info!(peer = %peer_addr, key = ?peer_key, "Peer registered"); 188 } 189 Err(e) => { 190 error!(peer = %peer_addr, error = %e, "Failed to register peer"); 191 } 192 } 193 }); 194 } 195 Err(e) => { 196 warn!(error = %e, "Failed to accept connection"); 197 } 198 } 199 } 200 201 // Heartbeat tick - send keepalives and retry pending messages 202 _ = heartbeat.tick() => { 203 // Send keepalives 204 let results = switchboard.send_keepalives(heartbeat_ms).await; 205 for (peer, result) in results { 206 if let Err(e) = result { 207 warn!(peer = ?peer, error = %e, "Failed to send keepalive"); 208 node.remove_peer(&peer).await; 209 } 210 } 211 212 // Retry pending outbound messages 213 // Retry after 10s, fail after 5 minutes 214 let (retried, failed) = switchboard.retry_pending_messages(10_000, 300_000).await; 215 if retried > 0 || failed > 0 { 216 debug!(retried = retried, failed = failed, "Message retry cycle"); 217 } 218 } 219 220 // Poll all peer connections for incoming frames 221 frame_result = poll_peers(Arc::clone(&node)) => { 222 if let Some((source, frame)) = frame_result { 223 handle_frame_result(&switchboard, source, frame).await; 224 } 225 } 226 227 // Cover traffic broadcast (Ghost tier only) 228 cover_frame = cover_rx.recv() => { 229 if let Some(frame) = cover_frame { 230 // Encode Cover frame - it encrypts identically to real traffic 231 if let Ok(encoded) = frame.encode() { 232 debug!(size = encoded.len(), "Broadcasting cover frame"); 233 switchboard.broadcast_raw(&encoded).await; 234 } 235 } 236 } 237 } 238 } else { 239 // No listener - simplified select 240 tokio::select! { 241 // Shutdown signal 242 _ = shutdown.notified() => { 243 info!("Shutdown signal received, stopping event loop"); 244 break; 245 } 246 247 // Heartbeat tick - send keepalives and retry pending messages 248 _ = heartbeat.tick() => { 249 // Send keepalives 250 let results = switchboard.send_keepalives(heartbeat_ms).await; 251 for (peer, result) in results { 252 if let Err(e) = result { 253 warn!(peer = ?peer, error = %e, "Failed to send keepalive"); 254 node.remove_peer(&peer).await; 255 } 256 } 257 258 // Retry pending outbound messages 259 // Retry after 10s, fail after 5 minutes 260 let (retried, failed) = switchboard.retry_pending_messages(10_000, 300_000).await; 261 if retried > 0 || failed > 0 { 262 debug!(retried = retried, failed = failed, "Message retry cycle"); 263 } 264 } 265 266 // Poll all peer connections for incoming frames 267 frame_result = poll_peers(Arc::clone(&node)) => { 268 if let Some((source, frame)) = frame_result { 269 handle_frame_result(&switchboard, source, frame).await; 270 } 271 } 272 273 // Cover traffic broadcast (Ghost tier only) 274 cover_frame = cover_rx.recv() => { 275 if let Some(frame) = cover_frame { 276 // Encode Cover frame - it encrypts identically to real traffic 277 if let Ok(encoded) = frame.encode() { 278 debug!(size = encoded.len(), "Broadcasting cover frame"); 279 switchboard.broadcast_raw(&encoded).await; 280 } 281 } 282 } 283 } 284 } 285 } 286 287 // Graceful shutdown - close all peer connections 288 info!("Closing peer connections..."); 289 let peer_keys: Vec<PeerKey> = { 290 let peers_arc = node.peers(); 291 let peers = peers_arc.lock().await; 292 peers.keys().copied().collect() 293 }; 294 295 for key in peer_keys { 296 if let Some(conn) = node.remove_peer(&key).await 297 && let Err(e) = conn.interface.close().await 298 { 299 warn!(peer = ?key, error = %e, "Error closing peer connection"); 300 } 301 } 302 303 info!("Abzu node stopped"); 304 Ok(()) 305 } 306 307 /// Handle an inbound connection - upgrade to FakeTLS and register as peer 308 async fn handle_inbound_connection( 309 node: Arc<Node>, 310 socket: tokio::net::TcpStream, 311 peer_addr: SocketAddr, 312 shared_key: [u8; 32], 313 ) -> Result<PeerKey, NodeError> { 314 // Perform FakeTLS handshake with Perfect Forward Secrecy 315 // The handshake returns both the stream AND the peer's X25519 public key 316 let handshake_result = FakeTlsStream::accept_pfs(socket, Some(&shared_key)).await?; 317 318 // Use the peer's X25519 ephemeral public key as their cryptographic identity 319 // This is secure because: 320 // 1. The key is derived from a proper DH exchange 321 // 2. Only someone with the PSK can derive the session key 322 // 3. The identity is bound to the cryptographic handshake, not the network address 323 let peer_key = handshake_result.peer_pubkey; 324 325 info!( 326 peer_addr = %peer_addr, 327 peer_pubkey = ?&peer_key[..8], 328 "Peer authenticated via X25519 key exchange" 329 ); 330 331 // Register the peer with their cryptographic identity 332 let conn = PeerConnection::new(Box::new(handshake_result.stream)); 333 node.add_peer(peer_key, conn).await; 334 335 Ok(peer_key) 336 } 337 338 /// Handle a frame result from the switchboard 339 async fn handle_frame_result(switchboard: &Switchboard, source: PeerKey, frame: AbzuFrame) { 340 let result = switchboard.handle_frame(frame, source).await; 341 342 match result { 343 RouteResult::Local => { 344 debug!(source = ?source, "Frame processed locally"); 345 } 346 RouteResult::Forwarded(next) => { 347 debug!(source = ?source, next = ?next, "Frame forwarded"); 348 } 349 RouteResult::NoRoute => { 350 warn!(source = ?source, "No route for frame"); 351 } 352 RouteResult::Error(e) => { 353 error!(source = ?source, error = %e, "Error handling frame"); 354 } 355 } 356 } 357 358 /// Poll all peers for incoming frames 359 /// 360 /// Returns the first available frame from any peer. 361 async fn poll_peers(node: Arc<Node>) -> Option<(PeerKey, AbzuFrame)> { 362 let peers_arc = node.peers(); 363 let peer_keys: Vec<PeerKey> = { 364 let guard = peers_arc.lock().await; 365 guard.keys().copied().collect() 366 }; 367 368 if peer_keys.is_empty() { 369 // No peers - just yield 370 tokio::time::sleep(Duration::from_millis(10)).await; 371 return None; 372 } 373 374 // Try to receive from each peer with a short timeout 375 for key in peer_keys { 376 let result = { 377 let mut guard = peers_arc.lock().await; 378 if let Some(conn) = guard.get_mut(&key) { 379 // Non-blocking receive attempt 380 tokio::time::timeout( 381 Duration::from_millis(1), 382 conn.interface.recv_frame(), 383 ) 384 .await 385 } else { 386 continue; 387 } 388 }; 389 390 match result { 391 Ok(Ok(frame)) => { 392 // Mark activity 393 let mut guard = peers_arc.lock().await; 394 if let Some(conn) = guard.get_mut(&key) { 395 conn.touch(); 396 } 397 return Some((key, frame)); 398 } 399 Ok(Err(e)) => { 400 // Transport error - peer may be disconnected 401 warn!(peer = ?key, error = %e, "Error receiving from peer"); 402 node.remove_peer(&key).await; 403 } 404 Err(_) => { 405 // Timeout - no data available, try next peer 406 continue; 407 } 408 } 409 } 410 411 None 412 } 413 414 /// Connect to a peer (outbound connection) 415 /// 416 /// Uses X25519 ephemeral key exchange to establish a secure connection 417 /// with Perfect Forward Secrecy. The peer's ephemeral public key is 418 /// used as their cryptographic identity. 419 pub async fn connect_peer( 420 node: Arc<Node>, 421 addr: &str, 422 shared_key: &[u8; 32], 423 ) -> Result<PeerKey, NodeError> { 424 info!(addr = addr, "Connecting to peer"); 425 426 // Connect with Perfect Forward Secrecy (ephemeral X25519 key exchange) 427 // The handshake returns both the stream AND the peer's X25519 public key 428 let handshake_result = FakeTlsStream::connect_pfs(addr, Some(shared_key)).await?; 429 430 // Use the peer's X25519 ephemeral public key as their cryptographic identity 431 // This ensures peers are authenticated by their handshake, not their network address 432 let peer_key = handshake_result.peer_pubkey; 433 434 info!( 435 addr = addr, 436 peer_pubkey = ?&peer_key[..8], 437 "Connected and authenticated peer via X25519 key exchange" 438 ); 439 440 let conn = PeerConnection::new(Box::new(handshake_result.stream)); 441 node.add_peer(peer_key, conn).await; 442 443 Ok(peer_key) 444 } 445 446 /// Connect to a peer with identity verification (for bootstrap nodes) 447 /// 448 /// Performs an Ed25519 challenge-response to verify the bootstrap node's identity. 449 /// This prevents Sybil attacks where an attacker impersonates a bootstrap node. 450 /// 451 /// # Arguments 452 /// * `node` - The local node 453 /// * `addr` - Address to connect to 454 /// * `shared_key` - Pre-shared key for transport encryption 455 /// * `expected_identity` - If Some, the node must prove ownership of this Ed25519 key 456 /// 457 /// # Returns 458 /// * `Ok((peer_key, identity_pubkey))` - Connection successful with verified identity 459 /// * `Err(_)` - Connection failed or identity verification failed 460 pub async fn connect_peer_verified( 461 node: Arc<Node>, 462 addr: &str, 463 shared_key: &[u8; 32], 464 expected_identity: Option<&[u8; 32]>, 465 ) -> Result<(PeerKey, Option<[u8; 32]>), NodeError> { 466 use rand::RngCore; 467 use std::time::{SystemTime, UNIX_EPOCH}; 468 469 info!(addr = addr, verify = expected_identity.is_some(), "Connecting to peer with verification"); 470 471 // Connect with Perfect Forward Secrecy 472 let handshake_result = FakeTlsStream::connect_pfs(addr, Some(shared_key)).await?; 473 let peer_key = handshake_result.peer_pubkey; 474 475 // If we need to verify identity, send a challenge 476 let verified_identity = if expected_identity.is_some() { 477 // Generate random challenge nonce 478 let mut challenge = [0u8; 32]; 479 rand::thread_rng().fill_bytes(&mut challenge); 480 481 let timestamp = SystemTime::now() 482 .duration_since(UNIX_EPOCH) 483 .unwrap_or_default() 484 .as_secs(); 485 486 // Send Hello with challenge 487 let hello = AbzuFrame::hello_with_challenge(peer_key, timestamp, challenge); 488 handshake_result.stream.send_frame(&hello).await?; 489 490 // Receive HelloAck with identity signature 491 let response = tokio::time::timeout( 492 Duration::from_secs(5), 493 handshake_result.stream.recv_frame() 494 ) 495 .await 496 .map_err(|_| NodeError::Transport(abzu_transport::TransportError::Timeout))? 497 .map_err(NodeError::Transport)?; 498 499 match response { 500 AbzuFrame::HelloAck { 501 ephemeral_pub, 502 identity_pubkey: Some(identity_pubkey), 503 identity_signature, 504 .. 505 } if !identity_signature.is_empty() => { 506 // Verify the signature 507 verify_bootstrap_identity( 508 &challenge, 509 &ephemeral_pub, 510 timestamp, 511 &identity_pubkey, 512 &identity_signature, 513 ).map_err(|e| NodeError::Connection(format!("Bootstrap verification failed: {}", e)))?; 514 515 // Verify it matches expected identity (if pinned) 516 if let Some(expected) = expected_identity { 517 if &identity_pubkey != expected { 518 return Err(NodeError::Connection(format!( 519 "Identity mismatch: expected {}, got {}", 520 format_pubkey(expected), 521 format_pubkey(&identity_pubkey) 522 ))); 523 } 524 } 525 526 info!( 527 addr = addr, 528 identity = %format_pubkey(&identity_pubkey), 529 "Bootstrap node identity verified" 530 ); 531 532 Some(identity_pubkey) 533 } 534 AbzuFrame::HelloAck { identity_pubkey: None, .. } | AbzuFrame::HelloAck { .. } => { 535 // Node didn't provide identity - fail if we required it 536 if expected_identity.is_some() { 537 return Err(NodeError::Connection( 538 "Bootstrap node did not provide identity for verification".into() 539 )); 540 } 541 None 542 } 543 _ => { 544 return Err(NodeError::Connection( 545 format!("Unexpected response to Hello challenge: {:?}", std::mem::discriminant(&response)) 546 )); 547 } 548 } 549 } else { 550 None 551 }; 552 553 info!( 554 addr = addr, 555 peer_pubkey = ?&peer_key[..8], 556 verified = verified_identity.is_some(), 557 "Connected to peer" 558 ); 559 560 let conn = PeerConnection::new(Box::new(handshake_result.stream)); 561 node.add_peer(peer_key, conn).await; 562 563 Ok((peer_key, verified_identity)) 564 } 565 566 /// Bootstrap the node into the network 567 /// 568 /// Attempts to connect to peers in priority order: 569 /// 1. User-configured bootstrap peers 570 /// 2. Previously known peers (submarine test resilience) 571 /// 3. Default community nodes (if enabled) 572 /// 573 /// Uses exponential backoff for failed connections. 574 pub async fn bootstrap( 575 node: Arc<Node>, 576 shared_key: &[u8; 32], 577 config: BootstrapConfig, 578 ) -> BootstrapResult { 579 use std::sync::Mutex; 580 use std::path::PathBuf; 581 582 // Security warning for unverified bootstrap 583 match config.trust.mode { 584 BootstrapTrustMode::Any => { 585 warn!("⚠️ SECURITY WARNING: Bootstrap verification disabled (trust mode: Any)"); 586 warn!("⚠️ This exposes the node to potential Sybil attacks during network entry"); 587 warn!("⚠️ Consider using Pinned or TrustOnFirstUse mode in production"); 588 } 589 BootstrapTrustMode::Pinned => { 590 info!( 591 trusted_nodes = config.trust.pinned_nodes.len(), 592 "Bootstrap verification: Pinned mode" 593 ); 594 } 595 BootstrapTrustMode::TrustOnFirstUse => { 596 info!("Bootstrap verification: Trust-on-first-use mode"); 597 } 598 } 599 600 // Load TOFU keystore if needed (wrapped in Mutex for interior mutability) 601 let tofu_keystore: Option<Arc<Mutex<TofuKeystore>>> = if config.trust.mode == BootstrapTrustMode::TrustOnFirstUse { 602 let path = config.trust.tofu_keystore_path.as_ref() 603 .map(PathBuf::from) 604 .unwrap_or_else(|| { 605 // Default path: data_dir/bootstrap_keys.json 606 node.data_dir().join("bootstrap_keys.json") 607 }); 608 match TofuKeystore::load(&path) { 609 Ok(ks) => { 610 debug!(path = %path.display(), keys = ks.len(), "Loaded TOFU keystore"); 611 Some(Arc::new(Mutex::new(ks))) 612 } 613 Err(e) => { 614 warn!(error = %e, "Failed to load TOFU keystore, starting fresh"); 615 Some(Arc::new(Mutex::new(TofuKeystore::default()))) 616 } 617 } 618 } else { 619 None 620 }; 621 let tofu_path: Option<PathBuf> = config.trust.tofu_keystore_path.as_ref() 622 .map(PathBuf::from) 623 .or_else(|| { 624 if config.trust.mode == BootstrapTrustMode::TrustOnFirstUse { 625 Some(node.data_dir().join("bootstrap_keys.json")) 626 } else { 627 None 628 } 629 }); 630 631 let mut connected = 0usize; 632 let mut connected_peers: Vec<PeerKey> = Vec::new(); 633 let mut failed = 0usize; 634 635 // Phase 1: Try configured bootstrap peers 636 let addresses = config.all_addresses(); 637 if !addresses.is_empty() { 638 info!(count = addresses.len(), "Connecting to configured bootstrap peers"); 639 640 for addr in &addresses { 641 let result = try_verified_connect_with_retry( 642 Arc::clone(&node), 643 addr, 644 shared_key, 645 config.retry_attempts, 646 config.initial_backoff_ms, 647 &config.trust, 648 &tofu_keystore, 649 &tofu_path, 650 ).await; 651 652 match result { 653 Ok(peer_key) => { 654 info!(addr = %addr, peer = ?&peer_key[..8], "Connected to bootstrap peer"); 655 connected_peers.push(peer_key); 656 connected += 1; 657 } 658 Err(e) => { 659 warn!(addr = %addr, error = %e, "Failed to connect to bootstrap peer"); 660 failed += 1; 661 } 662 } 663 } 664 } 665 666 // Phase 2: Try known peers (submarine test - reconnect after restart) 667 if connected == 0 || connected < config.max_concurrent { 668 let known_peers = node.known_peers().load_all().unwrap_or_default(); 669 let limit = config.known_peers_limit.saturating_sub(connected); 670 671 if !known_peers.is_empty() { 672 info!( 673 available = known_peers.len(), 674 trying = limit.min(known_peers.len()), 675 "Reconnecting to known peers" 676 ); 677 678 for peer in known_peers.into_iter().take(limit) { 679 let addr = peer.address.to_string(); 680 681 // Skip if we already tried this address in phase 1 682 if addresses.iter().any(|a| a == &addr) { 683 continue; 684 } 685 686 match try_connect_with_retry( 687 Arc::clone(&node), 688 &addr, 689 shared_key, 690 1, // Single retry for known peers 691 config.initial_backoff_ms, 692 ).await { 693 Ok(peer_key) => { 694 info!(addr = %addr, "Reconnected to known peer"); 695 // Update success in peer store 696 if let Ok(router) = node.router().try_read() { 697 let coords = router.self_coords().clone(); 698 let _ = node.known_peers().record_success(peer_key, peer.address, coords); 699 } 700 connected_peers.push(peer_key); 701 connected += 1; 702 } 703 Err(_) => { 704 // Record failure 705 let _ = node.known_peers().record_failure(&peer.key); 706 failed += 1; 707 } 708 } 709 710 if connected >= config.max_concurrent { 711 break; 712 } 713 } 714 } 715 } 716 717 // Prune stale peers opportunistically 718 if let Ok(pruned) = node.known_peers().prune() 719 && pruned > 0 { 720 debug!(pruned = pruned, "Pruned stale peers"); 721 } 722 723 // Phase 3: DHT bootstrap - seed routing table and perform self-lookup 724 if !connected_peers.is_empty() { 725 info!("Starting DHT bootstrap..."); 726 let switchboard = Switchboard::new(Arc::clone(&node)); 727 728 // Add connected peers as DHT seeds 729 for peer_key in &connected_peers { 730 switchboard.add_dht_seed(*peer_key).await; 731 } 732 733 // Self-lookup to discover nearby nodes and populate routing table 734 let our_id = switchboard.our_dht_key(); 735 let timeout_ms = 10_000; 736 match switchboard.dht_find_node(&our_id, timeout_ms).await { 737 Ok(nodes) => { 738 info!(found = nodes.len(), "DHT routing table populated"); 739 } 740 Err(e) => { 741 warn!(error = %e, "DHT bootstrap failed, continuing anyway"); 742 } 743 } 744 } 745 746 let result = BootstrapResult::new(connected, failed); 747 748 if result.success { 749 info!(connected = connected, failed = failed, "Bootstrap complete"); 750 } else { 751 warn!(failed = failed, "Bootstrap failed - no peers connected"); 752 } 753 754 result 755 } 756 757 /// Try to connect to a peer with exponential backoff retry 758 async fn try_connect_with_retry( 759 node: Arc<Node>, 760 addr: &str, 761 shared_key: &[u8; 32], 762 max_attempts: u8, 763 initial_backoff_ms: u64, 764 ) -> Result<PeerKey, NodeError> { 765 use bootstrap::backoff_delay; 766 767 let mut last_error = None; 768 769 for attempt in 0..max_attempts { 770 if attempt > 0 { 771 let delay = backoff_delay(attempt - 1, initial_backoff_ms); 772 debug!(addr = %addr, attempt = attempt + 1, delay_ms = ?delay.as_millis(), "Retrying connection"); 773 tokio::time::sleep(delay).await; 774 } 775 776 match connect_peer(Arc::clone(&node), addr, shared_key).await { 777 Ok(peer_key) => return Ok(peer_key), 778 Err(e) => { 779 last_error = Some(e); 780 } 781 } 782 } 783 784 Err(last_error.unwrap_or_else(|| NodeError::Connection("No attempts made".into()))) 785 } 786 787 /// Try to connect with trust verification (for bootstrap nodes) 788 async fn try_verified_connect_with_retry( 789 node: Arc<Node>, 790 addr: &str, 791 shared_key: &[u8; 32], 792 max_attempts: u8, 793 initial_backoff_ms: u64, 794 trust_config: &BootstrapTrustConfig, 795 tofu_keystore: &Option<std::sync::Arc<std::sync::Mutex<TofuKeystore>>>, 796 tofu_path: &Option<std::path::PathBuf>, 797 ) -> Result<PeerKey, NodeError> { 798 use bootstrap::backoff_delay; 799 800 let mut last_error = None; 801 802 for attempt in 0..max_attempts { 803 if attempt > 0 { 804 let delay = backoff_delay(attempt - 1, initial_backoff_ms); 805 debug!(addr = %addr, attempt = attempt + 1, delay_ms = ?delay.as_millis(), "Retrying verified connection"); 806 tokio::time::sleep(delay).await; 807 } 808 809 // Determine expected identity based on trust mode (copy the key to avoid lifetime issues) 810 let expected_identity_owned: Option<[u8; 32]> = match trust_config.mode { 811 BootstrapTrustMode::Any => None, // No verification 812 BootstrapTrustMode::Pinned => trust_config.get_pinned_key(addr).copied(), 813 BootstrapTrustMode::TrustOnFirstUse => { 814 // Check TOFU keystore for previously seen key 815 if let Some(ks) = tofu_keystore { 816 let ks_guard = ks.lock().unwrap(); 817 ks_guard.get(addr).copied() 818 } else { 819 None 820 } 821 } 822 }; 823 824 // If Pinned mode but no pinned key for this address, skip verification 825 // (allows mixed verified/unverified bootstrap nodes) 826 if trust_config.mode == BootstrapTrustMode::Pinned && expected_identity_owned.is_none() { 827 debug!(addr = %addr, "No pinned key for address, using unverified connection"); 828 match connect_peer(Arc::clone(&node), addr, shared_key).await { 829 Ok(peer_key) => return Ok(peer_key), 830 Err(e) => { 831 last_error = Some(e); 832 continue; 833 } 834 } 835 } 836 837 // Use verified connection 838 match connect_peer_verified( 839 Arc::clone(&node), 840 addr, 841 shared_key, 842 expected_identity_owned.as_ref(), 843 ).await { 844 Ok((peer_key, verified_identity)) => { 845 // For TOFU mode, store the verified identity for future connections 846 if trust_config.mode == BootstrapTrustMode::TrustOnFirstUse { 847 if let Some(identity) = verified_identity { 848 if let Some(ks) = tofu_keystore { 849 let mut ks_guard = ks.lock().unwrap(); 850 let is_new = ks_guard.get(addr).is_none(); 851 if is_new { 852 let timestamp = std::time::SystemTime::now() 853 .duration_since(std::time::UNIX_EPOCH) 854 .map(|d| d.as_secs()) 855 .unwrap_or(0); 856 ks_guard.store(addr.to_string(), identity, timestamp); 857 info!(addr = %addr, "Stored new bootstrap identity (TOFU)"); 858 859 // Persist keystore 860 if let Some(path) = tofu_path { 861 if let Err(e) = ks_guard.save(path) { 862 warn!(error = %e, "Failed to persist TOFU keystore"); 863 } 864 } 865 } else { 866 // Verify it matches stored key 867 if let Some(false) = ks_guard.verify(addr, &identity) { 868 warn!( 869 addr = %addr, 870 "Bootstrap identity changed! Previous key differs from current" 871 ); 872 // Don't fail - TOFU is "warn on change" not "fail on change" 873 } 874 } 875 } 876 } 877 } 878 return Ok(peer_key); 879 } 880 Err(e) => { 881 last_error = Some(e); 882 } 883 } 884 } 885 886 Err(last_error.unwrap_or_else(|| NodeError::Connection("No attempts made".into()))) 887 } 888 889 /// Discover our NAT mapping for peer hole punching 890 /// 891 /// Returns the discovered public IP:port that can be shared with peers 892 /// for direct connection establishment. 893 pub async fn discover_nat() -> Result<abzu_transport::NatMapping, abzu_transport::NatError> { 894 let resolver = abzu_transport::NatResolver::new(); 895 resolver.discover().await 896 } 897 898 /// Connect to a peer with NAT hole punching 899 /// 900 /// This performs coordinated hole punching to establish a connection 901 /// through NAT. Both peers should call this simultaneously after 902 /// exchanging their mapped addresses out-of-band. 903 /// 904 /// # Arguments 905 /// * `node` - The node context 906 /// * `peer_mapped_addr` - The peer's public mapped address (from their discover_nat) 907 /// * `shared_key` - Encryption key for the channel 908 /// 909 /// # Returns 910 /// The peer key if successful, or an error if hole punch fails 911 pub async fn connect_peer_nat( 912 node: Arc<Node>, 913 peer_mapped_addr: std::net::SocketAddr, 914 shared_key: &[u8; 32], 915 ) -> Result<PeerKey, NodeError> { 916 use abzu_transport::{NatResolver, NatType}; 917 918 info!(target = %peer_mapped_addr, "Starting NAT-aware connection"); 919 920 // Discover our own NAT mapping 921 let resolver = NatResolver::new(); 922 let our_mapping = resolver.discover().await.map_err(|e| { 923 NodeError::Connection(format!("NAT discovery failed: {}", e)) 924 })?; 925 926 info!( 927 local = %our_mapping.local_addr, 928 mapped = %our_mapping.mapped_addr, 929 nat_type = ?our_mapping.nat_type, 930 "NAT discovery completed" 931 ); 932 933 // Check if NAT is traversable 934 if our_mapping.nat_type == NatType::Symmetric { 935 warn!("Symmetric NAT detected - connection may fail, consider relay"); 936 } 937 938 // Perform hole punch 939 let punch_socket = resolver.punch(peer_mapped_addr).await.map_err(|e| { 940 NodeError::Connection(format!("Hole punch failed: {}", e)) 941 })?; 942 943 // After hole punch, try TCP over the punched path 944 let addr = peer_mapped_addr.to_string(); 945 let handshake_result = FakeTlsStream::connect_pfs(&addr, Some(shared_key)).await?; 946 947 // Use X25519 peer pubkey as cryptographic identity (not address hash!) 948 let peer_key = handshake_result.peer_pubkey; 949 950 info!( 951 peer_addr = %peer_mapped_addr, 952 peer_pubkey = ?&peer_key[..8], 953 "NAT-aware peer authenticated via X25519" 954 ); 955 956 let conn = PeerConnection::new(Box::new(handshake_result.stream)); 957 node.add_peer(peer_key, conn).await; 958 959 // Clean up the punch socket (it was just for NAT binding) 960 drop(punch_socket); 961 962 info!("NAT-aware connection established"); 963 Ok(peer_key) 964 } 965 966 #[cfg(test)] 967 mod tests { 968 use super::*; 969 use tempfile::tempdir; 970 971 fn create_test_node() -> Arc<Node> { 972 let tmp = tempdir().unwrap(); 973 let config = NodeConfig { 974 storage_path: tmp.path().to_str().unwrap().to_string(), 975 ..Default::default() 976 }; 977 Arc::new(Node::new(config).unwrap()) 978 } 979 980 #[tokio::test] 981 async fn test_node_lifecycle() { 982 let node = create_test_node(); 983 let node_clone = Arc::clone(&node); 984 985 // Spawn the event loop 986 let handle = tokio::spawn(async move { 987 run(node_clone).await 988 }); 989 990 // Let it run briefly 991 tokio::time::sleep(Duration::from_millis(100)).await; 992 993 // Signal shutdown 994 node.signal_shutdown(); 995 996 // Wait for clean exit 997 let result = handle.await.unwrap(); 998 assert!(result.is_ok()); 999 } 1000 1001 #[tokio::test] 1002 async fn test_poll_peers_empty() { 1003 let node = create_test_node(); 1004 let result = poll_peers(node).await; 1005 assert!(result.is_none()); 1006 } 1007 }