dht.rs
1 //! DHT (Distributed Hash Table) transport using Mainline DHT (BEP44). 2 //! 3 //! This module provides DHT-based message storage using BitTorrent's Mainline DHT 4 //! with BEP44 mutable items. Unlike libp2p Kademlia, Mainline DHT public nodes 5 //! accept arbitrary data storage. 6 //! 7 //! # Architecture 8 //! 9 //! ```text 10 //! ┌─────────────────────────────────────────────────────────────┐ 11 //! │ DhtTransport │ 12 //! │ - Client-mode only (no storage on mobile) │ 13 //! │ - Uses BEP44 mutable items for message storage │ 14 //! │ - Key = SHA1(ed25519_pubkey || salt) │ 15 //! └─────────────────────────────────────────────────────────────┘ 16 //! │ 17 //! ▼ 18 //! ┌─────────────────────────────────────────────────────────────┐ 19 //! │ Mainline DHT (BEP44) │ 20 //! │ - Public bootstrap nodes: router.bittorrent.com:6881 │ 21 //! │ - Stores mutable items with ed25519 signatures │ 22 //! │ - ~1000 byte value limit per item │ 23 //! └─────────────────────────────────────────────────────────────┘ 24 //! ``` 25 //! 26 //! # Usage 27 //! 28 //! ```ignore 29 //! let config = DhtConfig { 30 //! bootstrap_nodes: vec!["router.bittorrent.com:6881".to_string()], 31 //! }; 32 //! 33 //! let mut dht = DhtTransport::new(config); 34 //! dht.initialize().await?; 35 //! 36 //! // Publish messages for a recipient 37 //! let messages = vec![DhtStoredMessage::new(...)]; 38 //! dht.publish_messages(recipient_key, messages).await?; 39 //! 40 //! // Fetch messages for our key 41 //! let messages = dht.fetch_messages(our_public_key).await?; 42 //! ``` 43 44 use std::collections::HashMap; 45 use std::sync::Arc; 46 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 47 48 use async_trait::async_trait; 49 use bytes::Bytes; 50 use ed25519_dalek::SigningKey; 51 use mainline::{Dht, MutableItem}; 52 use serde::{Deserialize, Serialize}; 53 use sha2::{Digest, Sha512}; 54 use tokio::sync::{Mutex, RwLock}; 55 56 use super::{Transport, TransportAddress, TransportError, TransportResult, TransportType}; 57 58 // ============================================================================ 59 // CONSTANTS 60 // ============================================================================ 61 62 /// Default TTL for messages stored in DHT (48 hours). 63 pub const DEFAULT_MESSAGE_TTL_SECS: u64 = 48 * 60 * 60; 64 65 /// Domain separator for deriving ed25519 keys from X25519 keys. 66 const ED25519_DERIVATION_DOMAIN: &[u8] = b"deaddrop/dht/ed25519/v1"; 67 68 /// Salt prefix for BEP44 mutable items (slot number appended). 69 const DHT_SALT_PREFIX: &str = "deaddrop/v1/slot/"; 70 71 /// Salt for the rendezvous slot (peer discovery of .onion addresses). 72 const DHT_RENDEZVOUS_SALT: &str = "deaddrop/v1/rendezvous"; 73 74 /// DHT salt for iroh endpoint ID rendezvous (separate from Tor rendezvous). 75 const DHT_IROH_RENDEZVOUS_SALT: &str = "deaddrop/v1/iroh-rendezvous"; 76 77 /// Minimum number of DHT slots for message distribution. 78 /// Used for small message counts and as the baseline. 79 const DHT_MIN_SLOTS: usize = 20; 80 81 /// Maximum number of DHT slots for large file transfers. 82 /// 100 slots allows transferring files up to ~100 chunks in one round. 83 const DHT_MAX_SLOTS: usize = 100; 84 85 /// Marker sender_key for continuation pointers in chained overflow slots. 86 /// 0xCC = "continuation chain" — distinguishes from chunk markers (0xCE) and real messages. 87 const CONTINUATION_MARKER: [u8; 32] = [0xCC; 32]; 88 89 /// Magic bytes identifying a continuation pointer payload. 90 const CONTINUATION_MAGIC: &[u8] = b"CN"; 91 92 /// Maximum number of chained ranges (each range = DHT_MAX_SLOTS slots). 93 /// 5 ranges = 500 slots total, preventing runaway publishing. 94 const MAX_CHAIN_DEPTH: usize = 5; 95 96 /// Calculate dynamic slot count based on message count. 97 /// For small transfers, use minimum slots. For large transfers, scale up. 98 fn calculate_slot_count(message_count: usize) -> usize { 99 if message_count <= DHT_MIN_SLOTS { 100 DHT_MIN_SLOTS 101 } else { 102 message_count.min(DHT_MAX_SLOTS) 103 } 104 } 105 106 /// Maximum size for BEP44 values (~1000 bytes). 107 const MAX_DHT_VALUE_SIZE: usize = 950; 108 109 /// Overhead for chunk metadata (magic + message_id + total + index + length). 110 /// Magic (2) + message_id (16) + total_chunks (1) + chunk_index (1) + payload_len (2) = 22 bytes 111 const CHUNK_OVERHEAD: usize = 22; 112 113 /// Maximum payload per chunk. 114 /// Each chunk needs: payload + 24 bytes (chunk encoding) + 114 bytes (DhtStoredMessage wrapper). 115 /// SLOT_BUDGET is 800 bytes, so max payload = 800 - 24 - 114 = 662 bytes. 116 /// Using 600 bytes per chunk payload for safety margin. 117 /// This gives ~50 micro-chunks per 30KB document chunk. 118 const MAX_CHUNK_PAYLOAD: usize = 400; 119 120 /// Magic bytes to identify chunked DHT values. 121 const CHUNKED_MAGIC: &[u8] = b"CK"; 122 123 /// Default timeout for DHT operations. 124 const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30); 125 126 /// Maximum retry attempts for DHT operations. 127 const MAX_RETRY_ATTEMPTS: u32 = 3; 128 129 /// Base delay for exponential backoff (1 second). 130 const RETRY_BASE_DELAY: Duration = Duration::from_secs(1); 131 132 /// Delay before retry to avoid DHT query collisions (2 seconds). 133 const RETRY_COLLISION_DELAY: Duration = Duration::from_secs(2); 134 135 /// TTL for cached incomplete chunk sets (5 minutes). 136 const CHUNK_CACHE_TTL: Duration = Duration::from_secs(300); 137 138 // ============================================================================ 139 // PUBLISH OUTCOME 140 // ============================================================================ 141 142 /// Result of a DHT publish operation with per-message success/failure tracking. 143 #[derive(Debug, Clone, Default)] 144 pub struct DhtPublishOutcome { 145 /// Message IDs that were successfully published (all chunks stored). 146 pub succeeded: Vec<[u8; 16]>, 147 /// Message IDs that failed to publish (at least one chunk failed). 148 pub failed: Vec<[u8; 16]>, 149 /// Error messages for failed slots. 150 pub errors: Vec<String>, 151 /// Message IDs that were split into chunks at the transport layer. 152 /// These should stay Queued for republish since partial chunk propagation 153 /// means the receiver can't reassemble the message. 154 pub chunked: Vec<[u8; 16]>, 155 } 156 157 // ============================================================================ 158 // CONFIGURATION 159 // ============================================================================ 160 161 /// Configuration for the DHT transport. 162 #[derive(Debug, Clone)] 163 pub struct DhtConfig { 164 /// Bootstrap node addresses (host:port format). 165 pub bootstrap_nodes: Vec<String>, 166 /// Query timeout in seconds. 167 pub query_timeout_secs: u64, 168 } 169 170 impl Default for DhtConfig { 171 fn default() -> Self { 172 Self { 173 bootstrap_nodes: vec![ 174 "router.bittorrent.com:6881".to_string(), 175 "router.utorrent.com:6881".to_string(), 176 "dht.transmissionbt.com:6881".to_string(), 177 ], 178 query_timeout_secs: 30, 179 } 180 } 181 } 182 183 // ============================================================================ 184 // DHT STATE 185 // ============================================================================ 186 187 /// State of the DHT transport. 188 #[derive(Debug, Clone, PartialEq, Eq)] 189 pub enum DhtState { 190 /// Not connected to the DHT network. 191 Disconnected, 192 /// Bootstrapping - connecting to initial peers. 193 Bootstrapping, 194 /// Connected to the DHT network. 195 Connected { 196 /// Number of known peers. 197 peer_count: usize, 198 }, 199 /// Failed to connect. 200 Failed { 201 /// Error description. 202 error: String, 203 }, 204 } 205 206 // ============================================================================ 207 // STORED MESSAGE 208 // ============================================================================ 209 210 /// A message stored in the DHT for a recipient. 211 /// 212 /// Messages are stored at a key derived from the recipient's public key. 213 /// Multiple messages can be stored for the same recipient. 214 #[derive(Debug, Clone, Serialize, Deserialize)] 215 pub struct DhtStoredMessage { 216 /// Unique message identifier. 217 pub message_id: [u8; 16], 218 /// Sender's public key. 219 pub sender_key: [u8; 32], 220 /// Encrypted message payload (serialized EncryptedMessage). 221 pub payload: Vec<u8>, 222 /// Unix timestamp when message was stored. 223 pub timestamp: u64, 224 /// Unix timestamp when message expires. 225 pub expires_at: u64, 226 } 227 228 impl DhtStoredMessage { 229 /// Create a new DHT stored message. 230 pub fn new( 231 message_id: [u8; 16], 232 sender_key: [u8; 32], 233 payload: Vec<u8>, 234 ttl_secs: Option<u64>, 235 ) -> Self { 236 let now = SystemTime::now() 237 .duration_since(UNIX_EPOCH) 238 .unwrap() 239 .as_secs(); 240 let ttl = ttl_secs.unwrap_or(DEFAULT_MESSAGE_TTL_SECS); 241 242 Self { 243 message_id, 244 sender_key, 245 payload, 246 timestamp: now, 247 expires_at: now + ttl, 248 } 249 } 250 251 /// Check if this message has expired. 252 pub fn is_expired(&self) -> bool { 253 let now = SystemTime::now() 254 .duration_since(UNIX_EPOCH) 255 .unwrap() 256 .as_secs(); 257 now >= self.expires_at 258 } 259 260 /// Serialize the message for storage. 261 pub fn serialize(&self) -> TransportResult<Vec<u8>> { 262 bincode::serialize(self).map_err(|e| { 263 TransportError::Internal(format!("Failed to serialize DhtStoredMessage: {}", e)) 264 }) 265 } 266 267 /// Deserialize a message from storage. 268 pub fn deserialize(data: &[u8]) -> TransportResult<Self> { 269 bincode::deserialize(data).map_err(|e| { 270 TransportError::Internal(format!("Failed to deserialize DhtStoredMessage: {}", e)) 271 }) 272 } 273 } 274 275 // ============================================================================ 276 // RENDEZVOUS VALUE 277 // ============================================================================ 278 279 /// Value stored in the DHT rendezvous slot for peer discovery. 280 /// Contains a .onion address and timestamp for freshness checking. 281 #[derive(Debug, Clone, Serialize, Deserialize)] 282 struct RendezvousValue { 283 /// The .onion address (e.g., "abc123...xyz.onion") 284 #[serde(rename = "a")] 285 address: String, 286 /// Unix timestamp when this was published 287 #[serde(rename = "t")] 288 timestamp: u64, 289 } 290 291 // ============================================================================ 292 // KEY DERIVATION 293 // ============================================================================ 294 295 /// Derive ed25519 signing key from X25519 public key for BEP44 signing. 296 /// 297 /// Uses SHA-512 with domain separation to deterministically derive an ed25519 298 /// seed from an X25519 key. This allows recipients to sign DHT entries using 299 /// their exchange key. 300 fn derive_ed25519_from_x25519(x25519_key: &[u8; 32]) -> SigningKey { 301 let mut hasher = Sha512::new(); 302 hasher.update(ED25519_DERIVATION_DOMAIN); 303 hasher.update(x25519_key); 304 let hash = hasher.finalize(); 305 306 let mut seed = [0u8; 32]; 307 seed.copy_from_slice(&hash[..32]); 308 SigningKey::from_bytes(&seed) 309 } 310 311 /// Generate salt for a specific DHT slot. 312 /// 313 /// Each slot uses a unique salt: "deaddrop/v1/slot/0", "deaddrop/v1/slot/1", etc. 314 /// This allows storing up to DHT_MAX_SLOTS * MAX_DHT_VALUE_SIZE bytes per recipient. 315 fn generate_slot_salt(slot: usize) -> Bytes { 316 let salt = format!("{}{}", DHT_SALT_PREFIX, slot); 317 Bytes::from(salt.into_bytes()) 318 } 319 320 /// Encode a continuation pointer payload: "CN" + u16_be(next_slot_start). 321 fn encode_continuation(next_slot_start: usize) -> Vec<u8> { 322 let mut buf = Vec::with_capacity(4); 323 buf.extend_from_slice(CONTINUATION_MAGIC); 324 buf.extend_from_slice(&(next_slot_start as u16).to_be_bytes()); 325 buf 326 } 327 328 /// Decode a continuation pointer payload. Returns the next range's starting slot. 329 fn decode_continuation(payload: &[u8]) -> Option<usize> { 330 if payload.len() >= 4 && &payload[0..2] == CONTINUATION_MAGIC { 331 let start = u16::from_be_bytes([payload[2], payload[3]]) as usize; 332 Some(start) 333 } else { 334 None 335 } 336 } 337 338 /// Check if a DhtStoredMessage is a continuation pointer. 339 fn is_continuation_message(msg: &DhtStoredMessage) -> bool { 340 msg.sender_key == CONTINUATION_MARKER 341 } 342 343 /// Create a continuation pointer message pointing to the next range start. 344 fn make_continuation_message(next_slot_start: usize) -> DhtStoredMessage { 345 DhtStoredMessage { 346 message_id: [0xCC; 16], // Deterministic ID for continuation pointers 347 sender_key: CONTINUATION_MARKER, 348 payload: encode_continuation(next_slot_start), 349 timestamp: 0, 350 expires_at: SystemTime::now() 351 .duration_since(UNIX_EPOCH) 352 .unwrap() 353 .as_secs() 354 + DEFAULT_MESSAGE_TTL_SECS, 355 } 356 } 357 358 // ============================================================================ 359 // MESSAGE ENCODING (Bencode for BEP44 with compression) 360 // ============================================================================ 361 362 /// Magic bytes to identify compressed DHT values. 363 const COMPRESSED_MAGIC: &[u8] = b"ZS"; 364 365 /// Encode messages for DHT storage using bencode with zstd compression. 366 fn bencode_messages(messages: &[DhtStoredMessage]) -> Vec<u8> { 367 // Compact format: list of [message_id, sender_key, payload, expires_at] 368 let compact: Vec<(Vec<u8>, Vec<u8>, Vec<u8>, u64)> = messages 369 .iter() 370 .map(|m| { 371 ( 372 m.message_id.to_vec(), 373 m.sender_key.to_vec(), 374 m.payload.clone(), 375 m.expires_at, 376 ) 377 }) 378 .collect(); 379 380 let bencoded = serde_bencode::to_bytes(&compact).unwrap_or_default(); 381 382 // Try to compress with zstd (level 3 is a good balance of speed/compression) 383 match zstd::encode_all(bencoded.as_slice(), 3) { 384 Ok(compressed) => { 385 // Only use compression if it actually saves space 386 if compressed.len() + COMPRESSED_MAGIC.len() < bencoded.len() { 387 let mut result = Vec::with_capacity(COMPRESSED_MAGIC.len() + compressed.len()); 388 result.extend_from_slice(COMPRESSED_MAGIC); 389 result.extend_from_slice(&compressed); 390 debug_log!( 391 "[DHT DEBUG] Compressed {} -> {} bytes ({}% reduction)", 392 bencoded.len(), 393 result.len(), 394 100 - (result.len() * 100 / bencoded.len()) 395 ); 396 result 397 } else { 398 bencoded 399 } 400 } 401 Err(_) => bencoded, 402 } 403 } 404 405 /// A chunk of a large message that was split for DHT storage. 406 #[derive(Debug, Clone, Serialize, Deserialize)] 407 struct MessageChunk { 408 /// Original message ID (for reassembly). 409 message_id: [u8; 16], 410 /// Total number of chunks. 411 total_chunks: u8, 412 /// Index of this chunk (0-based). 413 chunk_index: u8, 414 /// Chunk payload. 415 payload: Vec<u8>, 416 } 417 418 /// Encode a chunk for DHT storage. 419 fn encode_chunk(chunk: &MessageChunk) -> Vec<u8> { 420 let mut result = Vec::with_capacity(CHUNK_OVERHEAD + chunk.payload.len()); 421 result.extend_from_slice(CHUNKED_MAGIC); 422 result.extend_from_slice(&chunk.message_id); 423 result.push(chunk.total_chunks); 424 result.push(chunk.chunk_index); 425 result.extend_from_slice(&(chunk.payload.len() as u16).to_le_bytes()); 426 result.extend_from_slice(&chunk.payload); 427 result 428 } 429 430 /// Decode a chunk from DHT storage. 431 fn decode_chunk(data: &[u8]) -> Option<MessageChunk> { 432 if data.len() < CHUNK_OVERHEAD || !data.starts_with(CHUNKED_MAGIC) { 433 return None; 434 } 435 let mut message_id = [0u8; 16]; 436 message_id.copy_from_slice(&data[2..18]); 437 let total_chunks = data[18]; 438 let chunk_index = data[19]; 439 let payload_len = u16::from_le_bytes([data[20], data[21]]) as usize; 440 if data.len() < CHUNK_OVERHEAD + payload_len { 441 return None; 442 } 443 let payload = data[22..22 + payload_len].to_vec(); 444 Some(MessageChunk { 445 message_id, 446 total_chunks, 447 chunk_index, 448 payload, 449 }) 450 } 451 452 /// Split a large message into chunks that fit in DHT slots. 453 fn split_message_into_chunks(msg: &DhtStoredMessage) -> Vec<MessageChunk> { 454 // Serialize the full message 455 let full_data = msg.serialize().unwrap_or_default(); 456 let total_chunks = (full_data.len() + MAX_CHUNK_PAYLOAD - 1) / MAX_CHUNK_PAYLOAD; 457 458 if total_chunks > 255 { 459 debug_log!("[DHT WARN] Message too large to chunk (would need {} chunks)", total_chunks); 460 return Vec::new(); 461 } 462 463 let total_chunks = total_chunks as u8; 464 let mut chunks = Vec::with_capacity(total_chunks as usize); 465 466 for (i, chunk_data) in full_data.chunks(MAX_CHUNK_PAYLOAD).enumerate() { 467 chunks.push(MessageChunk { 468 message_id: msg.message_id, 469 total_chunks, 470 chunk_index: i as u8, 471 payload: chunk_data.to_vec(), 472 }); 473 } 474 475 debug_log!( 476 "[DHT DEBUG] Split message {} into {} chunks ({} bytes total)", 477 hex::encode(&msg.message_id), 478 chunks.len(), 479 full_data.len() 480 ); 481 482 chunks 483 } 484 485 /// Reassemble chunks into a complete message. 486 fn reassemble_chunks(chunks: &[MessageChunk]) -> Option<DhtStoredMessage> { 487 if chunks.is_empty() { 488 return None; 489 } 490 491 let message_id = chunks[0].message_id; 492 let total_chunks = chunks[0].total_chunks; 493 494 // Verify all chunks belong to same message and we have all of them 495 if chunks.len() != total_chunks as usize { 496 debug_log!( 497 "[DHT DEBUG] Incomplete chunks for {}: have {}/{}", 498 hex::encode(&message_id), 499 chunks.len(), 500 total_chunks 501 ); 502 return None; 503 } 504 505 // Sort by chunk index and verify continuity 506 let mut sorted: Vec<_> = chunks.iter().collect(); 507 sorted.sort_by_key(|c| c.chunk_index); 508 509 for (i, chunk) in sorted.iter().enumerate() { 510 if chunk.chunk_index != i as u8 || chunk.message_id != message_id { 511 debug_log!("[DHT WARN] Chunk mismatch during reassembly"); 512 return None; 513 } 514 } 515 516 // Concatenate payloads 517 let mut full_data = Vec::new(); 518 for chunk in sorted { 519 full_data.extend_from_slice(&chunk.payload); 520 } 521 522 // Deserialize 523 match DhtStoredMessage::deserialize(&full_data) { 524 Ok(msg) => { 525 debug_log!( 526 "[DHT DEBUG] Reassembled message {} from {} chunks", 527 hex::encode(&msg.message_id), 528 total_chunks 529 ); 530 Some(msg) 531 } 532 Err(e) => { 533 debug_log!("[DHT WARN] Failed to deserialize reassembled message: {}", e); 534 None 535 } 536 } 537 } 538 539 /// Decode messages from DHT storage (handles both compressed and uncompressed). 540 fn decode_messages(data: &[u8]) -> Vec<DhtStoredMessage> { 541 // Check for compression magic bytes 542 let decoded_data = if data.starts_with(COMPRESSED_MAGIC) { 543 match zstd::decode_all(&data[COMPRESSED_MAGIC.len()..]) { 544 Ok(decompressed) => decompressed, 545 Err(e) => { 546 debug_log!("[DHT WARN] Failed to decompress DHT data: {}", e); 547 return Vec::new(); 548 } 549 } 550 } else { 551 data.to_vec() 552 }; 553 554 if let Ok(compact) = 555 serde_bencode::from_bytes::<Vec<(Vec<u8>, Vec<u8>, Vec<u8>, u64)>>(&decoded_data) 556 { 557 compact 558 .into_iter() 559 .filter_map(|(id, sender, payload, expires)| { 560 if id.len() == 16 && sender.len() == 32 { 561 let mut message_id = [0u8; 16]; 562 let mut sender_key = [0u8; 32]; 563 message_id.copy_from_slice(&id); 564 sender_key.copy_from_slice(&sender); 565 566 Some(DhtStoredMessage { 567 message_id, 568 sender_key, 569 payload, 570 timestamp: 0, // Not stored in DHT 571 expires_at: expires, 572 }) 573 } else { 574 None 575 } 576 }) 577 .collect() 578 } else { 579 vec![] 580 } 581 } 582 583 // ============================================================================ 584 // DHT TRANSPORT 585 // ============================================================================ 586 587 /// DHT transport using Mainline DHT with BEP44 mutable items. 588 /// 589 /// This transport uses BitTorrent's Mainline DHT to store and retrieve 590 /// encrypted messages. Unlike libp2p Kademlia, Mainline DHT public nodes 591 /// accept arbitrary data storage via BEP44. 592 pub struct DhtTransport { 593 /// Transport configuration. 594 config: DhtConfig, 595 /// Current state. 596 state: Arc<RwLock<DhtState>>, 597 /// Whether initialized. 598 initialized: Arc<RwLock<bool>>, 599 /// Mainline DHT client. 600 dht: Arc<RwLock<Option<Dht>>>, 601 /// Cached messages for recipients (keyed by X25519 public key). 602 message_cache: Arc<RwLock<HashMap<[u8; 32], Vec<DhtStoredMessage>>>>, 603 /// Mutex to serialize PUT operations (prevents PutQueryIsInflight errors). 604 put_mutex: Arc<Mutex<()>>, 605 /// Cache for incomplete chunk sets across fetch cycles. 606 /// Key: message_id, Value: (chunks by index, first_seen timestamp). 607 chunk_cache: Arc<RwLock<HashMap<[u8; 16], (HashMap<u8, MessageChunk>, Instant)>>>, 608 } 609 610 impl DhtTransport { 611 /// Create a new DHT transport. 612 pub fn new(config: DhtConfig) -> Self { 613 Self { 614 config, 615 state: Arc::new(RwLock::new(DhtState::Disconnected)), 616 initialized: Arc::new(RwLock::new(false)), 617 dht: Arc::new(RwLock::new(None)), 618 message_cache: Arc::new(RwLock::new(HashMap::new())), 619 put_mutex: Arc::new(Mutex::new(())), 620 chunk_cache: Arc::new(RwLock::new(HashMap::new())), 621 } 622 } 623 624 /// Get the current DHT state. 625 pub async fn state(&self) -> DhtState { 626 self.state.read().await.clone() 627 } 628 629 /// Get the number of connected peers. 630 pub async fn peer_count(&self) -> usize { 631 match &*self.state.read().await { 632 DhtState::Connected { peer_count } => *peer_count, 633 _ => 0, 634 } 635 } 636 637 /// Check if the DHT is ready (initialized and connected). 638 pub async fn is_ready(&self) -> bool { 639 let initialized = *self.initialized.read().await; 640 let state = self.state.read().await; 641 initialized && matches!(&*state, DhtState::Connected { .. }) 642 } 643 644 // ========================================================================= 645 // MESSAGE STORAGE METHODS 646 // ========================================================================= 647 648 /// Publish messages to the DHT for a recipient. 649 /// 650 /// Messages are distributed across multiple DHT slots to overcome the 651 /// ~1000 byte BEP44 value limit. With 10 slots, we can store ~50x more 652 /// messages per recipient. 653 /// 654 /// # Arguments 655 /// 656 /// * `recipient_key` - The recipient's X25519 public key 657 /// * `messages` - Messages to store for this recipient 658 /// 659 /// # Returns 660 /// 661 /// `DhtPublishOutcome` containing lists of succeeded and failed message IDs. 662 /// A message is only considered successful if ALL its slots/chunks were stored. 663 pub async fn publish_messages( 664 &self, 665 recipient_key: [u8; 32], 666 messages: Vec<DhtStoredMessage>, 667 ) -> TransportResult<DhtPublishOutcome> { 668 if !*self.initialized.read().await { 669 return Err(TransportError::NotInitialized); 670 } 671 672 // Collect all message IDs being published 673 let all_message_ids: std::collections::HashSet<[u8; 16]> = 674 messages.iter().map(|m| m.message_id).collect(); 675 676 if messages.is_empty() { 677 return Ok(DhtPublishOutcome::default()); 678 } 679 680 // Cache locally first 681 { 682 let mut cache = self.message_cache.write().await; 683 let entry = cache.entry(recipient_key).or_insert_with(Vec::new); 684 685 for msg in &messages { 686 // Don't add duplicates 687 if !entry.iter().any(|m| m.message_id == msg.message_id) { 688 entry.push(msg.clone()); 689 } 690 } 691 692 // Remove expired messages 693 entry.retain(|m| !m.is_expired()); 694 } 695 696 // Try to publish to DHT network 697 let dht_guard = self.dht.read().await; 698 if dht_guard.is_some() { 699 drop(dht_guard); // Release lock before async operations 700 701 // Get all cached messages for this recipient 702 let mut all_messages = { 703 let cache = self.message_cache.read().await; 704 cache.get(&recipient_key).cloned().unwrap_or_default() 705 }; 706 707 // Limit messages to prevent slot overflow 708 // With 10 slots at ~800 bytes each, we can fit ~10 small messages or ~2 large chunked messages 709 // Prioritize large messages (which need chunking) first, then most recent small messages 710 const MAX_MESSAGES_PER_PUBLISH: usize = 5; 711 if all_messages.len() > MAX_MESSAGES_PER_PUBLISH { 712 // Partition into large (need chunking) and small messages 713 let (large, mut small): (Vec<_>, Vec<_>) = all_messages 714 .into_iter() 715 .partition(|m| { 716 let encoded_size = bencode_messages(&[m.clone()]).len(); 717 encoded_size >= MAX_DHT_VALUE_SIZE 718 }); 719 // Sort small messages by most recent first 720 small.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); 721 // Large (chunked) messages always get priority 722 let remaining_slots = MAX_MESSAGES_PER_PUBLISH.saturating_sub(large.len()); 723 small.truncate(remaining_slots); 724 all_messages = large; 725 all_messages.extend(small); 726 debug_log!( 727 "[DHT INFO] Limiting publish to {} messages ({} large + {} small)", 728 all_messages.len(), 729 all_messages.len() - all_messages.iter().filter(|m| bencode_messages(&[(*m).clone()]).len() < MAX_DHT_VALUE_SIZE).count(), 730 all_messages.iter().filter(|m| bencode_messages(&[(*m).clone()]).len() < MAX_DHT_VALUE_SIZE).count() 731 ); 732 } 733 734 // Distribute messages across slots 735 let (slot_assignments, chunked_message_ids) = self.distribute_messages_to_slots(&all_messages); 736 737 // Build slot -> message_ids mapping to track which messages are affected by slot failures 738 let slot_count = slot_assignments.len(); 739 let mut slot_to_messages: Vec<std::collections::HashSet<[u8; 16]>> = 740 vec![std::collections::HashSet::new(); slot_count]; 741 for (slot, slot_msgs) in slot_assignments.iter().enumerate() { 742 for msg in slot_msgs { 743 slot_to_messages[slot].insert(msg.message_id); 744 } 745 } 746 747 // Track which slots failed 748 let mut failed_slots: std::collections::HashSet<usize> = std::collections::HashSet::new(); 749 750 // Publish each slot (could parallelize, but keeping sequential for reliability) 751 let mut errors = Vec::new(); 752 for (slot, slot_messages) in slot_assignments.into_iter().enumerate() { 753 if slot_messages.is_empty() { 754 continue; 755 } 756 757 let value = bencode_messages(&slot_messages); 758 let slot_failed; 759 if value.len() > MAX_DHT_VALUE_SIZE { 760 debug_log!( 761 "[DHT WARN] Slot {} value too large ({} bytes), truncating", 762 slot, value.len() 763 ); 764 // Keep only messages that fit in this slot 765 let mut kept = Vec::new(); 766 let mut size = 0; 767 for msg in slot_messages.iter() { 768 let encoded = bencode_messages(&[msg.clone()]); 769 if size + encoded.len() < MAX_DHT_VALUE_SIZE { 770 kept.push(msg.clone()); 771 size += encoded.len(); 772 } 773 } 774 let value = bencode_messages(&kept); 775 slot_failed = self.put_mutable_item_to_slot(&recipient_key, &value, slot).await.is_err(); 776 if slot_failed { 777 errors.push(format!("Slot {}: PUT failed", slot)); 778 } 779 } else { 780 slot_failed = match self.put_mutable_item_to_slot(&recipient_key, &value, slot).await { 781 Ok(()) => false, 782 Err(e) => { 783 errors.push(format!("Slot {}: {}", slot, e)); 784 true 785 } 786 }; 787 } 788 789 if slot_failed { 790 failed_slots.insert(slot); 791 } 792 } 793 794 // Verification: Try to read back what we just wrote to slot 0 795 // This helps diagnose whether data is being stored at all 796 tokio::time::sleep(Duration::from_millis(500)).await; // Brief delay for propagation 797 798 let signing_key = derive_ed25519_from_x25519(&recipient_key); 799 let verifying_key = signing_key.verifying_key(); 800 let pk_bytes: [u8; 32] = verifying_key.to_bytes(); 801 let salt = generate_slot_salt(0); 802 803 let dht_guard = self.dht.read().await; 804 if let Some(dht) = dht_guard.as_ref() { 805 let dht_clone = dht.clone(); 806 drop(dht_guard); 807 808 match tokio::task::spawn_blocking(move || { 809 let mut iter = dht_clone.get_mutable(&pk_bytes, Some(salt.as_ref()), None); 810 iter.next() 811 }) 812 .await 813 { 814 Ok(Some(item)) => { 815 debug_log!( 816 "[DHT VERIFY] Read-back successful: slot 0 has {} bytes (seq={})", 817 item.value().len(), 818 item.seq() 819 ); 820 } 821 Ok(None) => { 822 debug_log!( 823 "[DHT VERIFY] Read-back FAILED: slot 0 returned None - data may not have propagated" 824 ); 825 } 826 Err(e) => { 827 debug_log!("[DHT VERIFY] Read-back error: {:?}", e); 828 } 829 } 830 } 831 832 // Determine which messages succeeded vs failed based on slot failures 833 // A message fails if ANY of its slots failed (including all chunks for chunked messages) 834 let mut failed_message_ids: std::collections::HashSet<[u8; 16]> = std::collections::HashSet::new(); 835 for failed_slot in &failed_slots { 836 for msg_id in &slot_to_messages[*failed_slot] { 837 failed_message_ids.insert(*msg_id); 838 } 839 } 840 841 // Succeeded = messages we were trying to publish that didn't fail 842 let succeeded: Vec<[u8; 16]> = all_message_ids 843 .iter() 844 .filter(|id| !failed_message_ids.contains(*id)) 845 .copied() 846 .collect(); 847 let failed: Vec<[u8; 16]> = all_message_ids 848 .iter() 849 .filter(|id| failed_message_ids.contains(*id)) 850 .copied() 851 .collect(); 852 853 debug_log!( 854 "[DHT INFO] Publish outcome: {} succeeded, {} failed", 855 succeeded.len(), 856 failed.len() 857 ); 858 859 if !errors.is_empty() { 860 debug_log!("[DHT WARN] Slot errors: {:?}", errors); 861 } 862 863 Ok(DhtPublishOutcome { 864 succeeded, 865 failed, 866 errors, 867 chunked: chunked_message_ids, 868 }) 869 } else { 870 debug_log!("[DHT WARN] DHT not available, using local cache only"); 871 // When DHT not available, mark all as failed 872 Ok(DhtPublishOutcome { 873 succeeded: Vec::new(), 874 failed: all_message_ids.into_iter().collect(), 875 errors: vec!["DHT not available".to_string()], 876 chunked: Vec::new(), 877 }) 878 } 879 } 880 881 /// Distribute messages across DHT slots with chained overflow. 882 /// 883 /// Messages are distributed across slots while respecting the size limit 884 /// per slot. Large messages are split into chunks. If messages overflow 885 /// the base range (slots 0–99), continuation pointers chain to additional 886 /// ranges (100–199, 200–299, etc.) up to MAX_CHAIN_DEPTH ranges. 887 /// Returns (slot_assignments, chunked_message_ids). 888 fn distribute_messages_to_slots( 889 &self, 890 messages: &[DhtStoredMessage], 891 ) -> (Vec<Vec<DhtStoredMessage>>, Vec<[u8; 16]>) { 892 self.distribute_messages_to_range(messages, 0, 1) 893 } 894 895 /// Internal: distribute messages into a single range starting at `range_start`. 896 /// Returns a flat slot vec indexed from 0 (representing absolute slots starting at `range_start`). 897 /// If overflow occurs, chains to the next range via continuation pointer. 898 /// Returns (slot_assignments, chunked_message_ids). 899 fn distribute_messages_to_range( 900 &self, 901 messages: &[DhtStoredMessage], 902 range_start: usize, 903 chain_depth: usize, 904 ) -> (Vec<Vec<DhtStoredMessage>>, Vec<[u8; 16]>) { 905 // Use dynamic slot count based on number of messages (capped at DHT_MAX_SLOTS) 906 let slot_count = calculate_slot_count(messages.len()); 907 debug_log!( 908 "[DHT DEBUG] Range {}: using {} slots for {} messages (chain depth {})", 909 range_start, slot_count, messages.len(), chain_depth 910 ); 911 912 let mut slots: Vec<Vec<DhtStoredMessage>> = (0..slot_count).map(|_| Vec::new()).collect(); 913 let mut slot_sizes: Vec<usize> = vec![0; slot_count]; 914 let mut pending_chunks: Vec<MessageChunk> = Vec::new(); 915 let mut small_messages: Vec<(DhtStoredMessage, usize)> = Vec::new(); 916 let mut overflow_messages: Vec<DhtStoredMessage> = Vec::new(); 917 let mut chunked_ids: Vec<[u8; 16]> = Vec::new(); 918 919 // Phase 1: Separate large (need chunking) from small messages 920 for msg in messages { 921 let encoded_size = bencode_messages(&[msg.clone()]).len(); 922 923 if encoded_size >= MAX_DHT_VALUE_SIZE { 924 debug_log!( 925 "[DHT INFO] Message {} too large ({} bytes), splitting into chunks", 926 hex::encode(&msg.message_id), 927 encoded_size 928 ); 929 chunked_ids.push(msg.message_id); 930 let chunks = split_message_into_chunks(msg); 931 pending_chunks.extend(chunks); 932 } else { 933 small_messages.push((msg.clone(), encoded_size)); 934 } 935 } 936 937 // Phase 2: Place chunks FIRST in dedicated slots (highest priority) 938 const SLOT_BUDGET: usize = 1500; 939 940 // Track overflow chunks that don't fit in this range 941 let mut overflow_chunks: Vec<MessageChunk> = Vec::new(); 942 943 if !pending_chunks.is_empty() { 944 debug_log!( 945 "[DHT DEBUG] Distributing {} chunks across slots (budget: {} bytes/slot)", 946 pending_chunks.len(), 947 SLOT_BUDGET 948 ); 949 950 // Group chunks by message for efficient distribution 951 let mut chunk_map: HashMap<[u8; 16], Vec<MessageChunk>> = HashMap::new(); 952 for chunk in pending_chunks { 953 chunk_map.entry(chunk.message_id).or_default().push(chunk); 954 } 955 956 let chunk_overhead: usize = 114; // DhtStoredMessage wrapper overhead 957 958 // Reserve the last slot for a potential continuation pointer 959 // (we'll only use it if there's actual overflow) 960 let usable_slots = if chain_depth < MAX_CHAIN_DEPTH { slot_count - 1 } else { slot_count }; 961 962 for (_msg_id, chunks) in chunk_map { 963 for chunk in chunks { 964 let chunk_encoded = encode_chunk(&chunk); 965 let chunk_size = chunk_encoded.len() + chunk_overhead; 966 967 // Find an EMPTY slot for this chunk (dedicated, not shared) 968 // Only search within usable_slots (excluding reserved last slot) 969 let best_slot = slot_sizes[..usable_slots] 970 .iter() 971 .enumerate() 972 .filter(|(_, &size)| size == 0) 973 .next() 974 .map(|(idx, _)| idx) 975 .or_else(|| { 976 // Fallback: find any slot with space within usable range 977 slot_sizes[..usable_slots] 978 .iter() 979 .enumerate() 980 .filter(|(_, &size)| size + chunk_size < SLOT_BUDGET) 981 .min_by_key(|(_, &size)| size) 982 .map(|(idx, _)| idx) 983 }); 984 985 if let Some(slot) = best_slot { 986 let chunk_msg = DhtStoredMessage { 987 message_id: chunk.message_id, 988 sender_key: [0xCE; 32], 989 payload: chunk_encoded, 990 timestamp: 0, 991 expires_at: SystemTime::now() 992 .duration_since(UNIX_EPOCH) 993 .unwrap() 994 .as_secs() 995 + DEFAULT_MESSAGE_TTL_SECS, 996 }; 997 slots[slot].push(chunk_msg); 998 slot_sizes[slot] = SLOT_BUDGET; 999 debug_log!( 1000 "[DHT DEBUG] Chunk {}/{} assigned to slot {} (absolute {})", 1001 chunk.chunk_index + 1, 1002 chunk.total_chunks, 1003 slot, 1004 range_start + slot 1005 ); 1006 } else { 1007 // No space — this chunk overflows to next range 1008 overflow_chunks.push(chunk); 1009 } 1010 } 1011 } 1012 } 1013 1014 // Phase 3: Place small messages in remaining slots (after chunks have priority) 1015 let usable_slots = if chain_depth < MAX_CHAIN_DEPTH { slot_count - 1 } else { slot_count }; 1016 for (msg, encoded_size) in small_messages { 1017 let best_slot = slot_sizes[..usable_slots] 1018 .iter() 1019 .enumerate() 1020 .filter(|(_, &size)| size + encoded_size < MAX_DHT_VALUE_SIZE) 1021 .min_by_key(|(_, &size)| size) 1022 .map(|(idx, _)| idx); 1023 1024 if let Some(slot) = best_slot { 1025 slots[slot].push(msg); 1026 slot_sizes[slot] += encoded_size; 1027 } else { 1028 // Overflow small message to next range 1029 overflow_messages.push(msg); 1030 } 1031 } 1032 1033 // Convert overflow chunks back into DhtStoredMessages for the next range 1034 for chunk in overflow_chunks { 1035 let chunk_encoded = encode_chunk(&chunk); 1036 let chunk_msg = DhtStoredMessage { 1037 message_id: chunk.message_id, 1038 sender_key: [0xCE; 32], 1039 payload: chunk_encoded, 1040 timestamp: 0, 1041 expires_at: SystemTime::now() 1042 .duration_since(UNIX_EPOCH) 1043 .unwrap() 1044 .as_secs() 1045 + DEFAULT_MESSAGE_TTL_SECS, 1046 }; 1047 overflow_messages.push(chunk_msg); 1048 } 1049 1050 // Phase 4: Chain overflow to next range if needed 1051 if !overflow_messages.is_empty() && chain_depth < MAX_CHAIN_DEPTH { 1052 let next_range_start = range_start + DHT_MAX_SLOTS; 1053 debug_log!( 1054 "[DHT INFO] Overflow: {} messages/chunks don't fit in range {}–{}. Chaining to range starting at slot {}", 1055 overflow_messages.len(), 1056 range_start, 1057 range_start + slot_count - 1, 1058 next_range_start 1059 ); 1060 1061 // Place continuation pointer in the last slot of this range 1062 let continuation = make_continuation_message(next_range_start); 1063 // Ensure we have enough slots to place the continuation pointer 1064 while slots.len() < DHT_MAX_SLOTS { 1065 slots.push(Vec::new()); 1066 slot_sizes.push(0); 1067 } 1068 slots[DHT_MAX_SLOTS - 1] = vec![continuation]; 1069 1070 // Recursively distribute overflow into next range 1071 let (overflow_slots, overflow_chunked) = self.distribute_messages_to_range( 1072 &overflow_messages, 1073 next_range_start, 1074 chain_depth + 1, 1075 ); 1076 chunked_ids.extend(overflow_chunked); 1077 1078 // Pad current range to full DHT_MAX_SLOTS and append overflow 1079 while slots.len() < DHT_MAX_SLOTS { 1080 slots.push(Vec::new()); 1081 } 1082 slots.extend(overflow_slots); 1083 } else if !overflow_messages.is_empty() { 1084 debug_log!( 1085 "[DHT WARN] Chain depth limit ({}) reached. {} messages/chunks dropped.", 1086 MAX_CHAIN_DEPTH, 1087 overflow_messages.len() 1088 ); 1089 } 1090 1091 (slots, chunked_ids) 1092 } 1093 1094 /// Put a mutable item to a specific DHT slot with retry logic. 1095 async fn put_mutable_item_to_slot( 1096 &self, 1097 recipient_key: &[u8; 32], 1098 value: &[u8], 1099 slot: usize, 1100 ) -> TransportResult<()> { 1101 // Acquire mutex to serialize PUT operations (prevents PutQueryIsInflight errors) 1102 let _put_lock = self.put_mutex.lock().await; 1103 1104 let dht_guard = self.dht.read().await; 1105 let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?; 1106 1107 // Derive ed25519 signing key from recipient's X25519 key 1108 let signing_key = derive_ed25519_from_x25519(recipient_key); 1109 let verifying_key = signing_key.verifying_key(); 1110 let ed25519_pubkey: [u8; 32] = verifying_key.to_bytes(); 1111 1112 // Use current timestamp as sequence number 1113 let seq = SystemTime::now() 1114 .duration_since(UNIX_EPOCH) 1115 .unwrap() 1116 .as_secs() as i64; 1117 1118 debug_log!( 1119 "[DHT DEBUG] PUT slot {}: X25519={}, ed25519={}, seq={}, value_len={}", 1120 slot, 1121 hex::encode(recipient_key), 1122 hex::encode(&ed25519_pubkey), 1123 seq, 1124 value.len() 1125 ); 1126 1127 // Create mutable item with slot-specific salt 1128 let salt_bytes = generate_slot_salt(slot); 1129 let item = MutableItem::new(signing_key, value, seq, Some(salt_bytes.as_ref())); 1130 1131 // Retry with exponential backoff 1132 let mut last_error = None; 1133 for attempt in 0..MAX_RETRY_ATTEMPTS { 1134 if attempt > 0 { 1135 let delay = RETRY_BASE_DELAY * (1 << (attempt - 1)); // 1s, 2s, 4s 1136 tokio::time::sleep(delay).await; 1137 debug_log!("[DHT INFO] Retry attempt {} for slot {}", attempt + 1, slot); 1138 } 1139 1140 let dht_clone = dht.clone(); 1141 let item_clone = item.clone(); 1142 let salt_for_log = generate_slot_salt(slot); 1143 match tokio::task::spawn_blocking(move || dht_clone.put_mutable(item_clone, None)).await { 1144 Ok(Ok(stored_id)) => { 1145 // Log what was actually stored - Id is the infohash target 1146 debug_log!( 1147 "[DHT INFO] Published to slot {} (attempt {}): id={:?}, salt={}, seq={}", 1148 slot, attempt + 1, 1149 stored_id, 1150 String::from_utf8_lossy(salt_for_log.as_ref()), 1151 seq 1152 ); 1153 // Small delay to let the PUT complete before next operation 1154 tokio::time::sleep(Duration::from_millis(100)).await; 1155 return Ok(()); 1156 } 1157 Ok(Err(e)) => { 1158 debug_log!("[DHT WARN] PUT to slot {} failed (attempt {}): {:?}", slot, attempt + 1, e); 1159 last_error = Some(format!("DHT PUT failed: {:?}", e)); 1160 } 1161 Err(e) => { 1162 debug_log!("[DHT WARN] Task failed for slot {} (attempt {}): {:?}", slot, attempt + 1, e); 1163 last_error = Some(format!("Task failed: {:?}", e)); 1164 } 1165 } 1166 } 1167 1168 Err(TransportError::Internal(last_error.unwrap_or_else(|| "Unknown error".to_string()))) 1169 } 1170 1171 /// Fetch messages from the DHT for our public key. 1172 /// 1173 /// Retrieves all unexpired messages from all DHT slots concurrently. 1174 /// This provides ~50x capacity compared to single-slot implementation. 1175 /// 1176 /// # Arguments 1177 /// 1178 /// * `our_public_key` - Our X25519 public key to fetch messages for 1179 pub async fn fetch_messages( 1180 &self, 1181 our_public_key: [u8; 32], 1182 ) -> TransportResult<Vec<DhtStoredMessage>> { 1183 if !*self.initialized.read().await { 1184 return Err(TransportError::NotInitialized); 1185 } 1186 1187 // Always fetch from DHT network - don't use cache to short-circuit 1188 // The cache caused stale data issues where new messages were never fetched 1189 let dht_guard = self.dht.read().await; 1190 if dht_guard.is_some() { 1191 drop(dht_guard); // Release lock before async operations 1192 1193 let all_messages = self.fetch_all_slots(our_public_key).await; 1194 return Ok(all_messages); 1195 } 1196 1197 Ok(Vec::new()) 1198 } 1199 1200 /// Fetch messages from all DHT slots concurrently, following continuation chains. 1201 /// 1202 /// Queries the base range (slots 0–99) in parallel. If a continuation pointer 1203 /// is found, fetches the next range, and so on up to MAX_CHAIN_DEPTH ranges. 1204 async fn fetch_all_slots(&self, our_public_key: [u8; 32]) -> Vec<DhtStoredMessage> { 1205 let dht_guard = self.dht.read().await; 1206 let dht = match dht_guard.as_ref() { 1207 Some(d) => d.clone(), 1208 None => return Vec::new(), 1209 }; 1210 drop(dht_guard); 1211 1212 // Derive ed25519 public key from our X25519 key 1213 let signing_key = derive_ed25519_from_x25519(&our_public_key); 1214 let public_key = signing_key.verifying_key(); 1215 let public_key_bytes: [u8; 32] = public_key.to_bytes(); 1216 1217 debug_log!( 1218 "[DHT DEBUG] FETCH: X25519={}, ed25519={}", 1219 hex::encode(&our_public_key), 1220 hex::encode(&public_key_bytes) 1221 ); 1222 1223 let mut all_messages = Vec::new(); 1224 let mut seen_ids = std::collections::HashSet::new(); 1225 let mut pending_chunks: HashMap<[u8; 16], HashMap<u8, MessageChunk>> = HashMap::new(); 1226 let mut total_slots_fetched: usize = 0; 1227 1228 // Marker for chunk messages 1229 const CHUNK_MARKER: [u8; 32] = [0xCE; 32]; 1230 1231 // Follow continuation chain: start with base range, follow pointers 1232 let mut range_start: usize = 0; 1233 let mut chain_depth: usize = 0; 1234 1235 loop { 1236 let range_end = range_start + DHT_MAX_SLOTS; 1237 debug_log!( 1238 "[DHT DEBUG] Fetching range {}–{} (chain depth {})", 1239 range_start, range_end - 1, chain_depth 1240 ); 1241 1242 // Spawn concurrent fetch tasks for this range 1243 let mut handles = Vec::new(); 1244 for slot in range_start..range_end { 1245 let dht_clone = dht.clone(); 1246 let pk_bytes = public_key_bytes; 1247 let salt = generate_slot_salt(slot); 1248 1249 let handle = tokio::spawn(async move { 1250 tokio::time::timeout( 1251 DEFAULT_OPERATION_TIMEOUT, 1252 tokio::task::spawn_blocking(move || { 1253 let mut iter = dht_clone.get_mutable(&pk_bytes, Some(salt.as_ref()), None); 1254 iter.next() 1255 }), 1256 ) 1257 .await 1258 }); 1259 handles.push((slot, handle)); 1260 } 1261 1262 // Collect results from this range 1263 let mut next_range_start: Option<usize> = None; 1264 1265 for (slot, handle) in handles { 1266 match handle.await { 1267 Ok(Ok(Ok(Some(item)))) => { 1268 let value = item.value(); 1269 debug_log!("[DHT DEBUG] Slot {} returned {} bytes (seq={})", slot, value.len(), item.seq()); 1270 1271 let messages = decode_messages(value); 1272 for msg in messages { 1273 if msg.is_expired() { 1274 continue; 1275 } 1276 1277 // Check for continuation pointer 1278 if is_continuation_message(&msg) { 1279 if let Some(next_start) = decode_continuation(&msg.payload) { 1280 debug_log!( 1281 "[DHT INFO] Found continuation pointer in slot {} → next range starts at {}", 1282 slot, next_start 1283 ); 1284 next_range_start = Some(next_start); 1285 } 1286 continue; // Don't add continuation pointers as regular messages 1287 } 1288 1289 // Check if this is a chunk message 1290 if msg.sender_key == CHUNK_MARKER { 1291 if let Some(chunk) = decode_chunk(&msg.payload) { 1292 debug_log!( 1293 "[DHT DEBUG] Found chunk {}/{} for message {}", 1294 chunk.chunk_index + 1, 1295 chunk.total_chunks, 1296 hex::encode(&chunk.message_id) 1297 ); 1298 pending_chunks 1299 .entry(chunk.message_id) 1300 .or_default() 1301 .insert(chunk.chunk_index, chunk); 1302 } 1303 } else if seen_ids.insert(msg.message_id) { 1304 all_messages.push(msg); 1305 } 1306 } 1307 } 1308 Ok(Ok(Ok(None))) => { 1309 // Only log for base range to avoid noise 1310 if chain_depth == 0 { 1311 debug_log!("[DHT DEBUG] Slot {} returned None (no data)", slot); 1312 } 1313 } 1314 Ok(Ok(Err(e))) => { 1315 debug_log!("[DHT WARN] Slot {} task failed: {:?}", slot, e); 1316 } 1317 Ok(Err(_)) => { 1318 debug_log!("[DHT WARN] Slot {} timed out", slot); 1319 } 1320 Err(e) => { 1321 debug_log!("[DHT WARN] Slot {} join error: {:?}", slot, e); 1322 } 1323 } 1324 } 1325 1326 total_slots_fetched += DHT_MAX_SLOTS; 1327 chain_depth += 1; 1328 1329 // Follow continuation chain or stop 1330 match next_range_start { 1331 Some(next_start) if chain_depth < MAX_CHAIN_DEPTH => { 1332 debug_log!( 1333 "[DHT INFO] Following continuation chain to range starting at slot {}", 1334 next_start 1335 ); 1336 range_start = next_start; 1337 } 1338 Some(_) => { 1339 debug_log!( 1340 "[DHT WARN] Continuation chain depth limit ({}) reached, stopping", 1341 MAX_CHAIN_DEPTH 1342 ); 1343 break; 1344 } 1345 None => break, // No continuation pointer found, done 1346 } 1347 } 1348 1349 // Try to reassemble chunked messages, merging with cached chunks from previous fetches 1350 { 1351 let mut cache = self.chunk_cache.write().await; 1352 1353 if !pending_chunks.is_empty() || !cache.is_empty() { 1354 debug_log!( 1355 "[DHT DEBUG] Attempting to reassemble {} chunked messages ({} cached)", 1356 pending_chunks.len(), 1357 cache.len() 1358 ); 1359 } 1360 1361 // Merge fetched chunks into the cache 1362 for (msg_id, fetched_chunks) in pending_chunks { 1363 let entry = cache.entry(msg_id).or_insert_with(|| (HashMap::new(), Instant::now())); 1364 entry.0.extend(fetched_chunks); 1365 } 1366 1367 // Try to reassemble all cached chunk sets 1368 let mut completed = Vec::new(); 1369 for (msg_id, (chunk_map, _)) in cache.iter() { 1370 let chunks: Vec<MessageChunk> = chunk_map.values().cloned().collect(); 1371 if let Some(reassembled) = reassemble_chunks(&chunks) { 1372 if seen_ids.insert(reassembled.message_id) { 1373 all_messages.push(reassembled); 1374 } 1375 completed.push(*msg_id); 1376 } else { 1377 let have = chunks.len(); 1378 let total = chunks.first().map(|c| c.total_chunks).unwrap_or(0); 1379 debug_log!( 1380 "[DHT DEBUG] Caching incomplete message {}, have {}/{} chunks", 1381 hex::encode(msg_id), 1382 have, 1383 total 1384 ); 1385 } 1386 } 1387 1388 // Remove successfully reassembled entries 1389 for msg_id in &completed { 1390 cache.remove(msg_id); 1391 } 1392 1393 // Expire stale cache entries 1394 cache.retain(|msg_id, (_, first_seen)| { 1395 let keep = first_seen.elapsed() < CHUNK_CACHE_TTL; 1396 if !keep { 1397 debug_log!( 1398 "[DHT DEBUG] Expiring cached chunks for message {}", 1399 hex::encode(msg_id) 1400 ); 1401 } 1402 keep 1403 }); 1404 } 1405 1406 debug_log!( 1407 "[DHT INFO] Fetched {} unique messages from {} slots ({} ranges)", 1408 all_messages.len(), 1409 total_slots_fetched, 1410 chain_depth 1411 ); 1412 all_messages 1413 } 1414 1415 /// Fetch messages from a single DHT slot. 1416 #[allow(dead_code)] 1417 async fn fetch_slot( 1418 &self, 1419 our_public_key: [u8; 32], 1420 slot: usize, 1421 ) -> TransportResult<Vec<DhtStoredMessage>> { 1422 let dht_guard = self.dht.read().await; 1423 let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?; 1424 1425 let signing_key = derive_ed25519_from_x25519(&our_public_key); 1426 let public_key = signing_key.verifying_key(); 1427 let public_key_bytes: [u8; 32] = public_key.to_bytes(); 1428 1429 let dht_clone = dht.clone(); 1430 let salt = generate_slot_salt(slot); 1431 1432 match tokio::time::timeout( 1433 DEFAULT_OPERATION_TIMEOUT, 1434 tokio::task::spawn_blocking(move || { 1435 let mut iter = dht_clone.get_mutable(&public_key_bytes, Some(salt.as_ref()), None); 1436 iter.next() 1437 }), 1438 ) 1439 .await 1440 { 1441 Ok(Ok(Some(item))) => { 1442 let messages = decode_messages(item.value()); 1443 Ok(messages.into_iter().filter(|m| !m.is_expired()).collect()) 1444 } 1445 Ok(Ok(None)) => Ok(Vec::new()), 1446 Ok(Err(e)) => Err(TransportError::Internal(format!("Task failed: {:?}", e))), 1447 Err(_) => Err(TransportError::Timeout), 1448 } 1449 } 1450 1451 /// Acknowledge messages (remove them from cache). 1452 /// 1453 /// After successfully receiving and storing messages locally, 1454 /// acknowledge them to prevent re-fetching. Remaining messages 1455 /// are redistributed across slots. 1456 /// 1457 /// # Arguments 1458 /// 1459 /// * `our_public_key` - Our X25519 public key 1460 /// * `message_ids` - IDs of messages to acknowledge 1461 pub async fn acknowledge_messages( 1462 &self, 1463 our_public_key: [u8; 32], 1464 message_ids: &[[u8; 16]], 1465 ) -> TransportResult<()> { 1466 if !*self.initialized.read().await { 1467 return Err(TransportError::NotInitialized); 1468 } 1469 1470 if message_ids.is_empty() { 1471 return Ok(()); 1472 } 1473 1474 // Remove from local cache 1475 let mut cache = self.message_cache.write().await; 1476 if let Some(messages) = cache.get_mut(&our_public_key) { 1477 messages.retain(|m| !message_ids.contains(&m.message_id)); 1478 } 1479 1480 // Republish remaining messages to DHT across slots 1481 let remaining = cache.get(&our_public_key).cloned().unwrap_or_default(); 1482 drop(cache); 1483 1484 if !remaining.is_empty() { 1485 // Redistribute remaining messages across slots 1486 let (slot_assignments, _chunked) = self.distribute_messages_to_slots(&remaining); 1487 for (slot, slot_messages) in slot_assignments.into_iter().enumerate() { 1488 if !slot_messages.is_empty() { 1489 let value = bencode_messages(&slot_messages); 1490 if let Err(e) = self.put_mutable_item_to_slot(&our_public_key, &value, slot).await { 1491 debug_log!("[DHT WARN] Failed to republish slot {} after acknowledge: {:?}", slot, e); 1492 } 1493 } 1494 } 1495 } 1496 1497 Ok(()) 1498 } 1499 1500 /// Get the count of cached messages for a recipient. 1501 pub async fn cached_message_count(&self, recipient_key: [u8; 32]) -> usize { 1502 let cache = self.message_cache.read().await; 1503 cache.get(&recipient_key).map(|v| v.len()).unwrap_or(0) 1504 } 1505 1506 /// Clear expired messages from the cache. 1507 pub async fn cleanup_expired_messages(&self) { 1508 let mut cache = self.message_cache.write().await; 1509 for messages in cache.values_mut() { 1510 messages.retain(|m| !m.is_expired()); 1511 } 1512 // Remove empty entries 1513 cache.retain(|_, v| !v.is_empty()); 1514 } 1515 1516 // ======================================================================== 1517 // RENDEZVOUS: Publish/fetch .onion addresses via DHT 1518 // ======================================================================== 1519 1520 /// Publish our .onion address to the DHT for peer discovery. 1521 /// 1522 /// Stores the onion address in a BEP44 mutable item at a dedicated 1523 /// rendezvous salt, signed with our derived ed25519 key. Contacts who 1524 /// know our public key can fetch this to establish direct Tor connections. 1525 pub async fn publish_onion_address( 1526 &self, 1527 our_public_key: [u8; 32], 1528 onion_address: &str, 1529 ) -> TransportResult<()> { 1530 if !*self.initialized.read().await { 1531 return Err(TransportError::NotInitialized); 1532 } 1533 1534 let _put_lock = self.put_mutex.lock().await; 1535 1536 let dht_guard = self.dht.read().await; 1537 let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?; 1538 1539 let signing_key = derive_ed25519_from_x25519(&our_public_key); 1540 1541 let seq = SystemTime::now() 1542 .duration_since(UNIX_EPOCH) 1543 .unwrap() 1544 .as_secs() as i64; 1545 1546 // Bencode the rendezvous value: { "a": "<onion_address>", "t": <timestamp> } 1547 let value = serde_bencode::to_bytes(&RendezvousValue { 1548 address: onion_address.to_string(), 1549 timestamp: seq as u64, 1550 }) 1551 .map_err(|e| TransportError::Internal(format!("Failed to encode rendezvous: {}", e)))?; 1552 1553 if value.len() > MAX_DHT_VALUE_SIZE { 1554 return Err(TransportError::Internal( 1555 "Rendezvous value too large for DHT".to_string(), 1556 )); 1557 } 1558 1559 let salt = Bytes::from(DHT_RENDEZVOUS_SALT.as_bytes().to_vec()); 1560 let item = MutableItem::new(signing_key, &value, seq, Some(salt.as_ref())); 1561 1562 debug_log!( 1563 "[DHT RENDEZVOUS] Publishing onion address: {} ({} bytes, seq={})", 1564 onion_address, 1565 value.len(), 1566 seq 1567 ); 1568 1569 // Retry with exponential backoff (same as slot publishing) 1570 let mut last_error = None; 1571 for attempt in 0..MAX_RETRY_ATTEMPTS { 1572 if attempt > 0 { 1573 let delay = RETRY_BASE_DELAY * (1 << (attempt - 1)); // 1s, 2s, 4s 1574 tokio::time::sleep(delay).await; 1575 debug_log!("[DHT RENDEZVOUS] Retry attempt {} for publish", attempt + 1); 1576 } 1577 1578 let dht_clone = dht.clone(); 1579 let item_clone = item.clone(); 1580 match tokio::task::spawn_blocking(move || dht_clone.put_mutable(item_clone, None)).await 1581 { 1582 Ok(Ok(_)) => { 1583 debug_log!( 1584 "[DHT RENDEZVOUS] Published successfully (attempt {})", 1585 attempt + 1 1586 ); 1587 tokio::time::sleep(Duration::from_millis(100)).await; 1588 return Ok(()); 1589 } 1590 Ok(Err(e)) => { 1591 debug_log!( 1592 "[DHT RENDEZVOUS] Publish failed (attempt {}): {:?}", 1593 attempt + 1, 1594 e 1595 ); 1596 last_error = Some(format!("DHT PUT rendezvous failed: {:?}", e)); 1597 } 1598 Err(e) => { 1599 debug_log!( 1600 "[DHT RENDEZVOUS] Task failed (attempt {}): {:?}", 1601 attempt + 1, 1602 e 1603 ); 1604 last_error = Some(format!("Task failed: {:?}", e)); 1605 } 1606 } 1607 } 1608 1609 Err(TransportError::Internal( 1610 last_error.unwrap_or_else(|| "Unknown error".to_string()), 1611 )) 1612 } 1613 1614 /// Fetch a contact's .onion address from the DHT. 1615 /// 1616 /// Looks up the rendezvous slot for the given contact's public key. 1617 /// Returns the onion address if found and not expired. 1618 pub async fn fetch_onion_address( 1619 &self, 1620 contact_public_key: [u8; 32], 1621 ) -> TransportResult<Option<String>> { 1622 if !*self.initialized.read().await { 1623 return Err(TransportError::NotInitialized); 1624 } 1625 1626 let dht_guard = self.dht.read().await; 1627 let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?; 1628 1629 let signing_key = derive_ed25519_from_x25519(&contact_public_key); 1630 let verifying_key = signing_key.verifying_key(); 1631 let pk_bytes: [u8; 32] = verifying_key.to_bytes(); 1632 let salt = Bytes::from(DHT_RENDEZVOUS_SALT.as_bytes().to_vec()); 1633 1634 let dht_clone = dht.clone(); 1635 let salt_ref = salt.to_vec(); 1636 1637 let result = tokio::task::spawn_blocking(move || { 1638 let mut iter = dht_clone.get_mutable(&pk_bytes, Some(&salt_ref), None); 1639 iter.next() 1640 }) 1641 .await 1642 .map_err(|e| TransportError::Internal(format!("Task join error: {:?}", e)))?; 1643 1644 match result { 1645 Some(item) => { 1646 let value_bytes = item.value().to_vec(); 1647 match serde_bencode::from_bytes::<RendezvousValue>(&value_bytes) { 1648 Ok(rv) => { 1649 // Check freshness: reject if older than 1 hour 1650 let now = SystemTime::now() 1651 .duration_since(UNIX_EPOCH) 1652 .unwrap() 1653 .as_secs(); 1654 if now - rv.timestamp > 3600 { 1655 debug_log!( 1656 "[DHT RENDEZVOUS] Stale address for {}: age={}s", 1657 hex::encode(&contact_public_key[..8]), 1658 now - rv.timestamp 1659 ); 1660 return Ok(None); 1661 } 1662 1663 debug_log!( 1664 "[DHT RENDEZVOUS] Found address for {}: {}", 1665 hex::encode(&contact_public_key[..8]), 1666 rv.address 1667 ); 1668 Ok(Some(rv.address)) 1669 } 1670 Err(e) => { 1671 debug_log!("[DHT RENDEZVOUS] Failed to decode: {:?}", e); 1672 Ok(None) 1673 } 1674 } 1675 } 1676 None => { 1677 debug_log!( 1678 "[DHT RENDEZVOUS] No address found for {}", 1679 hex::encode(&contact_public_key[..8]) 1680 ); 1681 Ok(None) 1682 } 1683 } 1684 } 1685 1686 /// Publish our iroh endpoint ID to the DHT for peer discovery. 1687 /// 1688 /// Uses a separate salt from Tor rendezvous so both can coexist. 1689 pub async fn publish_iroh_endpoint_id( 1690 &self, 1691 our_public_key: [u8; 32], 1692 endpoint_id: &str, 1693 ) -> TransportResult<()> { 1694 if !*self.initialized.read().await { 1695 return Err(TransportError::NotInitialized); 1696 } 1697 1698 let _put_lock = self.put_mutex.lock().await; 1699 1700 let dht_guard = self.dht.read().await; 1701 let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?; 1702 1703 let signing_key = derive_ed25519_from_x25519(&our_public_key); 1704 1705 let seq = SystemTime::now() 1706 .duration_since(UNIX_EPOCH) 1707 .unwrap() 1708 .as_secs() as i64; 1709 1710 // Reuse RendezvousValue format: { "a": "<endpoint_id>", "t": <timestamp> } 1711 let value = serde_bencode::to_bytes(&RendezvousValue { 1712 address: endpoint_id.to_string(), 1713 timestamp: seq as u64, 1714 }) 1715 .map_err(|e| TransportError::Internal(format!("Failed to encode iroh rendezvous: {}", e)))?; 1716 1717 if value.len() > MAX_DHT_VALUE_SIZE { 1718 return Err(TransportError::Internal( 1719 "Iroh rendezvous value too large for DHT".to_string(), 1720 )); 1721 } 1722 1723 let salt = Bytes::from(DHT_IROH_RENDEZVOUS_SALT.as_bytes().to_vec()); 1724 let item = MutableItem::new(signing_key, &value, seq, Some(salt.as_ref())); 1725 1726 debug_log!( 1727 "[DHT IROH] Publishing endpoint ID: {} ({} bytes, seq={})", 1728 endpoint_id, 1729 value.len(), 1730 seq 1731 ); 1732 1733 // Retry with exponential backoff 1734 let mut last_error = None; 1735 for attempt in 0..MAX_RETRY_ATTEMPTS { 1736 if attempt > 0 { 1737 let delay = RETRY_BASE_DELAY * (1 << (attempt - 1)); 1738 tokio::time::sleep(delay).await; 1739 debug_log!("[DHT IROH] Retry attempt {} for publish", attempt + 1); 1740 } 1741 1742 let dht_clone = dht.clone(); 1743 let item_clone = item.clone(); 1744 match tokio::task::spawn_blocking(move || dht_clone.put_mutable(item_clone, None)).await 1745 { 1746 Ok(Ok(_)) => { 1747 debug_log!( 1748 "[DHT IROH] Published successfully (attempt {})", 1749 attempt + 1 1750 ); 1751 tokio::time::sleep(Duration::from_millis(100)).await; 1752 return Ok(()); 1753 } 1754 Ok(Err(e)) => { 1755 debug_log!( 1756 "[DHT IROH] Publish failed (attempt {}): {:?}", 1757 attempt + 1, 1758 e 1759 ); 1760 last_error = Some(format!("DHT PUT iroh rendezvous failed: {:?}", e)); 1761 } 1762 Err(e) => { 1763 debug_log!( 1764 "[DHT IROH] Task failed (attempt {}): {:?}", 1765 attempt + 1, 1766 e 1767 ); 1768 last_error = Some(format!("Task failed: {:?}", e)); 1769 } 1770 } 1771 } 1772 1773 Err(TransportError::Internal( 1774 last_error.unwrap_or_else(|| "Unknown error".to_string()), 1775 )) 1776 } 1777 1778 /// Fetch a contact's iroh endpoint ID from the DHT. 1779 /// 1780 /// Returns the endpoint ID if found and not expired (1-hour freshness). 1781 pub async fn fetch_iroh_endpoint_id( 1782 &self, 1783 contact_public_key: [u8; 32], 1784 ) -> TransportResult<Option<String>> { 1785 if !*self.initialized.read().await { 1786 return Err(TransportError::NotInitialized); 1787 } 1788 1789 let dht_guard = self.dht.read().await; 1790 let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?; 1791 1792 let signing_key = derive_ed25519_from_x25519(&contact_public_key); 1793 let verifying_key = signing_key.verifying_key(); 1794 let pk_bytes: [u8; 32] = verifying_key.to_bytes(); 1795 let salt = Bytes::from(DHT_IROH_RENDEZVOUS_SALT.as_bytes().to_vec()); 1796 1797 let dht_clone = dht.clone(); 1798 let salt_ref = salt.to_vec(); 1799 1800 // Take the first response from the DHT. BEP44 nodes should prefer 1801 // higher sequence numbers, so the first response is typically the latest. 1802 let result = tokio::task::spawn_blocking(move || { 1803 let mut iter = dht_clone.get_mutable(&pk_bytes, Some(&salt_ref), None); 1804 iter.next() 1805 }) 1806 .await 1807 .map_err(|e| TransportError::Internal(format!("Task join error: {:?}", e)))?; 1808 1809 match result { 1810 Some(item) => { 1811 let value_bytes = item.value().to_vec(); 1812 match serde_bencode::from_bytes::<RendezvousValue>(&value_bytes) { 1813 Ok(rv) => { 1814 let now = SystemTime::now() 1815 .duration_since(UNIX_EPOCH) 1816 .unwrap() 1817 .as_secs(); 1818 if now - rv.timestamp > 3600 { 1819 debug_log!( 1820 "[DHT IROH] Stale endpoint ID for {}: age={}s", 1821 hex::encode(&contact_public_key[..8]), 1822 now - rv.timestamp 1823 ); 1824 return Ok(None); 1825 } 1826 1827 debug_log!( 1828 "[DHT IROH] Found endpoint ID for {} (seq={}): {}", 1829 hex::encode(&contact_public_key[..8]), 1830 item.seq(), 1831 rv.address 1832 ); 1833 Ok(Some(rv.address)) 1834 } 1835 Err(e) => { 1836 debug_log!("[DHT IROH] Failed to decode: {:?}", e); 1837 Ok(None) 1838 } 1839 } 1840 } 1841 None => { 1842 debug_log!( 1843 "[DHT IROH] No endpoint ID found for {}", 1844 hex::encode(&contact_public_key[..8]) 1845 ); 1846 Ok(None) 1847 } 1848 } 1849 } 1850 } 1851 1852 impl Default for DhtTransport { 1853 fn default() -> Self { 1854 Self::new(DhtConfig::default()) 1855 } 1856 } 1857 1858 impl Clone for DhtTransport { 1859 fn clone(&self) -> Self { 1860 Self { 1861 config: self.config.clone(), 1862 state: Arc::clone(&self.state), 1863 initialized: Arc::clone(&self.initialized), 1864 dht: Arc::clone(&self.dht), 1865 message_cache: Arc::clone(&self.message_cache), 1866 put_mutex: Arc::clone(&self.put_mutex), 1867 chunk_cache: Arc::clone(&self.chunk_cache), 1868 } 1869 } 1870 } 1871 1872 impl std::fmt::Debug for DhtTransport { 1873 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 1874 f.debug_struct("DhtTransport") 1875 .field("config", &self.config) 1876 .finish() 1877 } 1878 } 1879 1880 #[async_trait] 1881 impl Transport for DhtTransport { 1882 async fn initialize(&mut self) -> TransportResult<()> { 1883 if self.config.bootstrap_nodes.is_empty() { 1884 return Err(TransportError::NotAvailable( 1885 "No DHT bootstrap nodes configured".to_string(), 1886 )); 1887 } 1888 1889 // Update state 1890 { 1891 let mut state = self.state.write().await; 1892 *state = DhtState::Bootstrapping; 1893 } 1894 1895 debug_log!( 1896 "[DHT INFO] Initializing Mainline DHT with {} bootstrap nodes", 1897 self.config.bootstrap_nodes.len() 1898 ); 1899 1900 // Create DHT client with a random port (0 = OS assigns available port) 1901 // This avoids conflicts with other apps using the default port 6881 1902 let dht = match Dht::builder().port(0).build() { 1903 Ok(dht) => dht, 1904 Err(e) => { 1905 let mut state = self.state.write().await; 1906 *state = DhtState::Failed { 1907 error: format!("Failed to create DHT client: {:?}", e), 1908 }; 1909 return Err(TransportError::Internal(format!( 1910 "Failed to create DHT: {:?}", 1911 e 1912 ))); 1913 } 1914 }; 1915 1916 // Store DHT client 1917 { 1918 let mut dht_guard = self.dht.write().await; 1919 *dht_guard = Some(dht.clone()); 1920 } 1921 1922 // Wait for connection to complete using the proper v5 API 1923 let dht_clone = dht.clone(); 1924 let bootstrap_result = tokio::task::spawn_blocking(move || { 1925 debug_log!("[DHT INFO] Connecting to DHT network..."); 1926 let success = dht_clone.bootstrapped(); 1927 debug_log!("[DHT INFO] Connection result: success={}", success); 1928 success 1929 }) 1930 .await; 1931 1932 let bootstrap_success = match bootstrap_result { 1933 Ok(true) => { 1934 debug_log!("[DHT INFO] Connection complete - DHT network reachable"); 1935 true 1936 } 1937 Ok(false) => { 1938 debug_log!("[DHT WARN] Connection failed - routing table may be empty"); 1939 false 1940 } 1941 Err(e) => { 1942 debug_log!("[DHT WARN] Connection task failed: {:?}", e); 1943 false 1944 } 1945 }; 1946 1947 // Get node info for diagnostics 1948 let dht_clone = dht.clone(); 1949 if let Ok(info) = tokio::task::spawn_blocking(move || dht_clone.info()).await { 1950 debug_log!("[DHT INFO] Node info: local_addr={:?}, firewalled={}", 1951 info.local_addr(), info.firewalled()); 1952 } 1953 1954 if bootstrap_success { 1955 // Mark as connected 1956 { 1957 let mut state = self.state.write().await; 1958 *state = DhtState::Connected { peer_count: 0 }; 1959 } 1960 1961 // Mark as initialized 1962 { 1963 let mut initialized = self.initialized.write().await; 1964 *initialized = true; 1965 } 1966 1967 debug_log!("[DHT INFO] DHT transport initialized and ready"); 1968 Ok(()) 1969 } else { 1970 // Bootstrap failed - mark as failed and return error 1971 { 1972 let mut state = self.state.write().await; 1973 *state = DhtState::Failed { 1974 error: "Connection failed - could not join DHT network".to_string(), 1975 }; 1976 } 1977 1978 debug_log!("[DHT WARN] DHT transport failed to connect"); 1979 Err(TransportError::NotAvailable( 1980 "DHT connection failed - routing table empty".to_string(), 1981 )) 1982 } 1983 } 1984 1985 async fn shutdown(&mut self) -> TransportResult<()> { 1986 // Drop DHT client 1987 { 1988 let mut dht_guard = self.dht.write().await; 1989 *dht_guard = None; 1990 } 1991 1992 { 1993 let mut initialized = self.initialized.write().await; 1994 *initialized = false; 1995 } 1996 1997 { 1998 let mut state = self.state.write().await; 1999 *state = DhtState::Disconnected; 2000 } 2001 2002 debug_log!("[DHT INFO] DHT transport shut down"); 2003 Ok(()) 2004 } 2005 2006 async fn send(&self, _recipient: &TransportAddress, _message: &[u8]) -> TransportResult<()> { 2007 // DHT is not used for direct message sending 2008 // Use publish_messages() instead for DHT storage 2009 Err(TransportError::NotAvailable( 2010 "DHT transport is for storage only, not direct sending. Use publish_messages().".to_string(), 2011 )) 2012 } 2013 2014 async fn receive(&self) -> TransportResult<(TransportAddress, Vec<u8>)> { 2015 // DHT is not used for receiving messages directly 2016 // Use fetch_messages() instead for DHT retrieval 2017 Err(TransportError::NotAvailable( 2018 "DHT transport is for storage only, not direct receiving. Use fetch_messages().".to_string(), 2019 )) 2020 } 2021 2022 fn is_available(&self) -> bool { 2023 self.initialized.try_read().map(|g| *g).unwrap_or(false) 2024 } 2025 2026 fn priority(&self) -> u8 { 2027 // DHT is used for storage, lower priority than direct transports 2028 20 2029 } 2030 2031 fn transport_type(&self) -> TransportType { 2032 TransportType::DHT 2033 } 2034 2035 fn name(&self) -> &str { 2036 "DHT (Mainline BEP44)" 2037 } 2038 2039 fn local_address(&self) -> Option<TransportAddress> { 2040 // DHT doesn't have a local address for messaging 2041 None 2042 } 2043 } 2044 2045 #[cfg(test)] 2046 mod tests { 2047 use super::*; 2048 2049 #[test] 2050 fn test_dht_config_default() { 2051 let config = DhtConfig::default(); 2052 assert!(!config.bootstrap_nodes.is_empty()); 2053 assert!(config.bootstrap_nodes[0].contains("bittorrent")); 2054 assert_eq!(config.query_timeout_secs, 30); 2055 } 2056 2057 #[test] 2058 fn test_dht_transport_new() { 2059 let transport = DhtTransport::new(DhtConfig::default()); 2060 assert!(!transport.is_available()); 2061 assert_eq!(transport.transport_type(), TransportType::DHT); 2062 assert_eq!(transport.priority(), 20); 2063 assert_eq!(transport.name(), "DHT (Mainline BEP44)"); 2064 } 2065 2066 #[tokio::test] 2067 async fn test_dht_transport_state() { 2068 let transport = DhtTransport::new(DhtConfig::default()); 2069 let state = transport.state().await; 2070 assert_eq!(state, DhtState::Disconnected); 2071 } 2072 2073 #[tokio::test] 2074 async fn test_dht_transport_no_bootstrap() { 2075 let config = DhtConfig { 2076 bootstrap_nodes: vec![], 2077 ..Default::default() 2078 }; 2079 let mut transport = DhtTransport::new(config); 2080 let result = transport.initialize().await; 2081 assert!(matches!(result, Err(TransportError::NotAvailable(_)))); 2082 } 2083 2084 #[test] 2085 fn test_dht_stored_message_new() { 2086 let msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3, 4], None); 2087 2088 assert_eq!(msg.message_id, [0x01; 16]); 2089 assert_eq!(msg.sender_key, [0xAB; 32]); 2090 assert_eq!(msg.payload, vec![1, 2, 3, 4]); 2091 assert!(!msg.is_expired()); 2092 assert!(msg.expires_at > msg.timestamp); 2093 assert_eq!(msg.expires_at - msg.timestamp, DEFAULT_MESSAGE_TTL_SECS); 2094 } 2095 2096 #[test] 2097 fn test_dht_stored_message_custom_ttl() { 2098 let msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3, 4], Some(3600)); 2099 2100 assert_eq!(msg.expires_at - msg.timestamp, 3600); 2101 } 2102 2103 #[test] 2104 fn test_dht_stored_message_serialize_deserialize() { 2105 let msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3, 4, 5], None); 2106 2107 let serialized = msg.serialize().unwrap(); 2108 let deserialized = DhtStoredMessage::deserialize(&serialized).unwrap(); 2109 2110 assert_eq!(msg.message_id, deserialized.message_id); 2111 assert_eq!(msg.sender_key, deserialized.sender_key); 2112 assert_eq!(msg.payload, deserialized.payload); 2113 assert_eq!(msg.timestamp, deserialized.timestamp); 2114 assert_eq!(msg.expires_at, deserialized.expires_at); 2115 } 2116 2117 #[test] 2118 fn test_derive_ed25519_deterministic() { 2119 let x25519_key = [0xAB; 32]; 2120 2121 let signing_key1 = derive_ed25519_from_x25519(&x25519_key); 2122 let signing_key2 = derive_ed25519_from_x25519(&x25519_key); 2123 2124 assert_eq!(signing_key1.to_bytes(), signing_key2.to_bytes()); 2125 } 2126 2127 #[test] 2128 fn test_derive_ed25519_different_keys() { 2129 let key1 = [0xAB; 32]; 2130 let key2 = [0xCD; 32]; 2131 2132 let signing_key1 = derive_ed25519_from_x25519(&key1); 2133 let signing_key2 = derive_ed25519_from_x25519(&key2); 2134 2135 assert_ne!(signing_key1.to_bytes(), signing_key2.to_bytes()); 2136 } 2137 2138 #[test] 2139 fn test_bencode_roundtrip() { 2140 let messages = vec![ 2141 DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3], None), 2142 DhtStoredMessage::new([0x02; 16], [0xCD; 32], vec![4, 5, 6], Some(3600)), 2143 ]; 2144 2145 let encoded = bencode_messages(&messages); 2146 let decoded = decode_messages(&encoded); 2147 2148 assert_eq!(decoded.len(), 2); 2149 assert_eq!(decoded[0].message_id, [0x01; 16]); 2150 assert_eq!(decoded[0].sender_key, [0xAB; 32]); 2151 assert_eq!(decoded[0].payload, vec![1, 2, 3]); 2152 assert_eq!(decoded[1].message_id, [0x02; 16]); 2153 assert_eq!(decoded[1].sender_key, [0xCD; 32]); 2154 assert_eq!(decoded[1].payload, vec![4, 5, 6]); 2155 } 2156 2157 #[test] 2158 fn test_decode_invalid_data() { 2159 let invalid = b"not valid bencode"; 2160 let decoded = decode_messages(invalid); 2161 assert!(decoded.is_empty()); 2162 } 2163 2164 #[tokio::test] 2165 async fn test_dht_transport_message_not_initialized() { 2166 let transport = DhtTransport::new(DhtConfig::default()); 2167 2168 let result = transport.publish_messages([0xCD; 32], vec![]).await; 2169 assert!(matches!(result, Err(TransportError::NotInitialized))); 2170 2171 let result = transport.fetch_messages([0xCD; 32]).await; 2172 assert!(matches!(result, Err(TransportError::NotInitialized))); 2173 } 2174 2175 #[test] 2176 fn test_generate_slot_salt() { 2177 let salt0 = generate_slot_salt(0); 2178 let salt1 = generate_slot_salt(1); 2179 let salt9 = generate_slot_salt(9); 2180 2181 assert_eq!(salt0.as_ref(), b"deaddrop/v1/slot/0"); 2182 assert_eq!(salt1.as_ref(), b"deaddrop/v1/slot/1"); 2183 assert_eq!(salt9.as_ref(), b"deaddrop/v1/slot/9"); 2184 } 2185 2186 #[test] 2187 fn test_distribute_messages_to_slots_single() { 2188 let transport = DhtTransport::new(DhtConfig::default()); 2189 let messages = vec![ 2190 DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3], None), 2191 ]; 2192 2193 let (slots, _) = transport.distribute_messages_to_slots(&messages); 2194 2195 assert_eq!(slots.len(), DHT_MIN_SLOTS); // Single message uses minimum slots 2196 // Single message should go to first available slot 2197 assert_eq!(slots[0].len(), 1); 2198 for i in 1..DHT_MIN_SLOTS { 2199 assert!(slots[i].is_empty()); 2200 } 2201 } 2202 2203 #[test] 2204 fn test_distribute_messages_to_slots_multiple() { 2205 let transport = DhtTransport::new(DhtConfig::default()); 2206 let messages: Vec<DhtStoredMessage> = (0..5) 2207 .map(|i| { 2208 let mut id = [0u8; 16]; 2209 id[0] = i as u8; 2210 DhtStoredMessage::new(id, [0xAB; 32], vec![1, 2, 3], None) 2211 }) 2212 .collect(); 2213 2214 let (slots, _) = transport.distribute_messages_to_slots(&messages); 2215 2216 // 5 small messages should fit in early slots 2217 let total: usize = slots.iter().map(|s| s.len()).sum(); 2218 assert_eq!(total, 5); 2219 } 2220 2221 #[test] 2222 fn test_distribute_messages_to_slots_across_slots() { 2223 let transport = DhtTransport::new(DhtConfig::default()); 2224 // Create small messages that will fit multiple per slot 2225 // Empty payload = ~70 bytes per message (id + sender_key + expires_at + bencode overhead) 2226 let payload = vec![]; // Empty payload for small messages 2227 let messages: Vec<DhtStoredMessage> = (0..50) 2228 .map(|i| { 2229 let mut id = [0u8; 16]; 2230 id[0] = i as u8; 2231 DhtStoredMessage::new(id, [0xAB; 32], payload.clone(), None) 2232 }) 2233 .collect(); 2234 2235 let (slots, _) = transport.distribute_messages_to_slots(&messages); 2236 2237 // 50 small messages should spread across multiple slots 2238 let non_empty_slots = slots.iter().filter(|s| !s.is_empty()).count(); 2239 assert!(non_empty_slots >= 1, "Should use at least 1 slot"); 2240 2241 // Count total messages distributed 2242 let total: usize = slots.iter().map(|s| s.len()).sum(); 2243 // At least most messages should fit (10 slots * ~10 msgs per slot = ~100) 2244 assert!(total >= 40, "At least 40 of 50 small messages should fit"); 2245 } 2246 2247 #[test] 2248 fn test_bencode_message_size() { 2249 // Verify encoding sizes for capacity planning 2250 let small_msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![], None); 2251 let small_encoded = bencode_messages(&[small_msg]); 2252 debug_log!("Small message encoded size: {} bytes", small_encoded.len()); 2253 // Small message with empty payload - verify it's reasonable 2254 assert!(small_encoded.len() < 500, "Small message should be < 500 bytes"); 2255 2256 let medium_msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![0u8; 100], None); 2257 let medium_encoded = bencode_messages(&[medium_msg]); 2258 debug_log!("Medium message encoded size: {} bytes", medium_encoded.len()); 2259 // Medium message (100 byte payload) 2260 assert!(medium_encoded.len() < 600, "Medium message should be < 600 bytes"); 2261 2262 // With 950 byte limit and ~200 byte messages, we can fit ~4-5 per slot 2263 // With 10 slots, that's 40-50 messages total capacity 2264 } 2265 2266 #[test] 2267 fn test_slot_count_constants() { 2268 assert_eq!(DHT_MIN_SLOTS, 20); 2269 assert_eq!(DHT_MAX_SLOTS, 100); 2270 assert_eq!(MAX_DHT_VALUE_SIZE, 950); 2271 } 2272 2273 #[test] 2274 fn test_calculate_slot_count() { 2275 // Small message counts use minimum slots 2276 assert_eq!(calculate_slot_count(1), DHT_MIN_SLOTS); 2277 assert_eq!(calculate_slot_count(10), DHT_MIN_SLOTS); 2278 assert_eq!(calculate_slot_count(20), DHT_MIN_SLOTS); 2279 2280 // Large message counts scale up 2281 assert_eq!(calculate_slot_count(50), 50); 2282 assert_eq!(calculate_slot_count(89), 89); 2283 2284 // Capped at max slots 2285 assert_eq!(calculate_slot_count(100), DHT_MAX_SLOTS); 2286 assert_eq!(calculate_slot_count(200), DHT_MAX_SLOTS); 2287 } 2288 2289 #[test] 2290 fn test_chunk_encode_decode() { 2291 let chunk = MessageChunk { 2292 message_id: [0x01; 16], 2293 total_chunks: 3, 2294 chunk_index: 1, 2295 payload: vec![1, 2, 3, 4, 5], 2296 }; 2297 2298 let encoded = encode_chunk(&chunk); 2299 assert!(encoded.starts_with(CHUNKED_MAGIC)); 2300 2301 let decoded = decode_chunk(&encoded).unwrap(); 2302 assert_eq!(decoded.message_id, chunk.message_id); 2303 assert_eq!(decoded.total_chunks, chunk.total_chunks); 2304 assert_eq!(decoded.chunk_index, chunk.chunk_index); 2305 assert_eq!(decoded.payload, chunk.payload); 2306 } 2307 2308 #[test] 2309 fn test_chunk_decode_invalid() { 2310 // Too short 2311 assert!(decode_chunk(&[0u8; 10]).is_none()); 2312 2313 // Wrong magic 2314 assert!(decode_chunk(&[0xFF; 30]).is_none()); 2315 } 2316 2317 #[test] 2318 fn test_split_and_reassemble_large_message() { 2319 // Create a large message that would exceed slot size 2320 let large_payload = vec![0xAB; 2000]; // 2KB payload 2321 let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], large_payload.clone(), None); 2322 2323 // Split into chunks 2324 let chunks = split_message_into_chunks(&msg); 2325 assert!(chunks.len() > 1, "Should split into multiple chunks"); 2326 debug_log!("Large message split into {} chunks", chunks.len()); 2327 2328 // Verify all chunks have correct metadata 2329 for (i, chunk) in chunks.iter().enumerate() { 2330 assert_eq!(chunk.message_id, msg.message_id); 2331 assert_eq!(chunk.total_chunks, chunks.len() as u8); 2332 assert_eq!(chunk.chunk_index, i as u8); 2333 } 2334 2335 // Reassemble 2336 let reassembled = reassemble_chunks(&chunks).unwrap(); 2337 assert_eq!(reassembled.message_id, msg.message_id); 2338 assert_eq!(reassembled.sender_key, msg.sender_key); 2339 assert_eq!(reassembled.payload, large_payload); 2340 } 2341 2342 #[test] 2343 fn test_reassemble_incomplete_chunks() { 2344 let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], vec![0xAB; 2000], None); 2345 let mut chunks = split_message_into_chunks(&msg); 2346 2347 // Remove one chunk 2348 chunks.pop(); 2349 2350 // Should fail to reassemble 2351 assert!(reassemble_chunks(&chunks).is_none()); 2352 } 2353 2354 #[test] 2355 fn test_distribute_large_message_to_slots() { 2356 let transport = DhtTransport::new(DhtConfig::default()); 2357 2358 // Create a message that's too large for a single slot even after compression 2359 // Use XOR-based pseudo-random bytes that resist compression better 2360 // Need ~10KB to ensure even with compression it exceeds 950 bytes 2361 let mut large_payload = vec![0u8; 10000]; 2362 let mut state: u32 = 0xDEADBEEF; 2363 for byte in large_payload.iter_mut() { 2364 // Simple xorshift PRNG for incompressible data 2365 state ^= state << 13; 2366 state ^= state >> 17; 2367 state ^= state << 5; 2368 *byte = state as u8; 2369 } 2370 let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], large_payload, None); 2371 2372 let (slots, _) = transport.distribute_messages_to_slots(&[msg]); 2373 2374 // Should have distributed chunks across slots 2375 let total_entries: usize = slots.iter().map(|s| s.len()).sum(); 2376 assert!(total_entries > 1, "Large message should be split into multiple chunk entries"); 2377 2378 // All entries should be chunk markers 2379 for slot in slots.iter() { 2380 for entry in slot.iter() { 2381 if entry.sender_key == [0xCE; 32] { 2382 // This is a chunk entry 2383 let chunk = decode_chunk(&entry.payload); 2384 assert!(chunk.is_some(), "Chunk entry should be decodable"); 2385 } 2386 } 2387 } 2388 } 2389 2390 #[test] 2391 fn test_compression_effectiveness() { 2392 // Test that compression works well on typical message data 2393 let repetitive_payload = vec![0xAB; 1000]; 2394 let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], repetitive_payload, None); 2395 2396 let encoded = bencode_messages(&[msg]); 2397 // Repetitive data should compress well (< 200 bytes) 2398 assert!(encoded.len() < 200, "Repetitive data should compress well, got {} bytes", encoded.len()); 2399 } 2400 2401 // ========================================================================= 2402 // CONTINUATION CHAIN TESTS 2403 // ========================================================================= 2404 2405 #[test] 2406 fn test_continuation_encode_decode() { 2407 // Basic roundtrip 2408 let encoded = encode_continuation(100); 2409 assert_eq!(encoded.len(), 4); 2410 assert_eq!(&encoded[0..2], CONTINUATION_MAGIC); 2411 assert_eq!(decode_continuation(&encoded), Some(100)); 2412 2413 // Slot 200 2414 let encoded = encode_continuation(200); 2415 assert_eq!(decode_continuation(&encoded), Some(200)); 2416 2417 // Slot 0 2418 let encoded = encode_continuation(0); 2419 assert_eq!(decode_continuation(&encoded), Some(0)); 2420 2421 // Max u16 slot 2422 let encoded = encode_continuation(65535); 2423 assert_eq!(decode_continuation(&encoded), Some(65535)); 2424 } 2425 2426 #[test] 2427 fn test_continuation_decode_invalid() { 2428 // Too short 2429 assert_eq!(decode_continuation(&[b'C', b'N']), None); 2430 assert_eq!(decode_continuation(&[b'C']), None); 2431 assert_eq!(decode_continuation(&[]), None); 2432 2433 // Wrong magic 2434 assert_eq!(decode_continuation(&[b'X', b'Y', 0, 100]), None); 2435 } 2436 2437 #[test] 2438 fn test_is_continuation_message() { 2439 let continuation = make_continuation_message(100); 2440 assert!(is_continuation_message(&continuation)); 2441 assert_eq!(continuation.sender_key, CONTINUATION_MARKER); 2442 assert_eq!(decode_continuation(&continuation.payload), Some(100)); 2443 2444 // Regular message should NOT be a continuation 2445 let regular = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3], None); 2446 assert!(!is_continuation_message(®ular)); 2447 2448 // Chunk marker should NOT be a continuation 2449 let chunk_msg = DhtStoredMessage { 2450 message_id: [0x01; 16], 2451 sender_key: [0xCE; 32], 2452 payload: vec![], 2453 timestamp: 0, 2454 expires_at: 0, 2455 }; 2456 assert!(!is_continuation_message(&chunk_msg)); 2457 } 2458 2459 #[test] 2460 fn test_distribute_no_overflow_no_continuation() { 2461 // Small number of messages should NOT produce a continuation pointer 2462 let transport = DhtTransport::new(DhtConfig::default()); 2463 let messages: Vec<DhtStoredMessage> = (0..5) 2464 .map(|i| { 2465 let mut id = [0u8; 16]; 2466 id[0] = i as u8; 2467 DhtStoredMessage::new(id, [0xAB; 32], vec![1, 2, 3], None) 2468 }) 2469 .collect(); 2470 2471 let (slots, _) = transport.distribute_messages_to_slots(&messages); 2472 2473 // Should fit in a single range (≤ DHT_MAX_SLOTS) 2474 assert!(slots.len() <= DHT_MAX_SLOTS, "Should not overflow into next range"); 2475 2476 // No continuation markers should exist 2477 for slot in &slots { 2478 for msg in slot { 2479 assert!(!is_continuation_message(msg), "Should not have continuation pointer"); 2480 } 2481 } 2482 } 2483 2484 #[test] 2485 fn test_distribute_overflow_creates_continuation() { 2486 // Create enough chunk messages to overflow 100 slots 2487 // Each chunk gets a dedicated slot, so >99 chunks should trigger overflow 2488 // (slot 99 reserved for continuation pointer) 2489 let transport = DhtTransport::new(DhtConfig::default()); 2490 2491 // Create a very large message that produces >99 chunks 2492 // At 400 bytes per chunk payload, we need the serialized message to be >39,600 bytes 2493 // A 40KB payload should produce ~100+ chunks after serialization overhead 2494 let mut large_payload = vec![0u8; 45000]; 2495 let mut state: u32 = 0xDEADBEEF; 2496 for byte in large_payload.iter_mut() { 2497 state ^= state << 13; 2498 state ^= state >> 17; 2499 state ^= state << 5; 2500 *byte = state as u8; 2501 } 2502 let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], large_payload, None); 2503 2504 let (slots, _) = transport.distribute_messages_to_slots(&[msg]); 2505 2506 // Should overflow into more than 100 slots 2507 assert!(slots.len() > DHT_MAX_SLOTS, "Should chain to overflow range, got {} slots", slots.len()); 2508 2509 // Slot 99 (last in base range) should be a continuation pointer 2510 assert_eq!(slots[DHT_MAX_SLOTS - 1].len(), 1, "Slot 99 should have exactly 1 message (continuation)"); 2511 let continuation = &slots[DHT_MAX_SLOTS - 1][0]; 2512 assert!(is_continuation_message(continuation), "Slot 99 should be a continuation pointer"); 2513 assert_eq!( 2514 decode_continuation(&continuation.payload), 2515 Some(DHT_MAX_SLOTS), 2516 "Continuation should point to slot 100" 2517 ); 2518 2519 // Overflow range should have content 2520 let overflow_content: usize = slots[DHT_MAX_SLOTS..].iter().map(|s| s.len()).sum(); 2521 assert!(overflow_content > 0, "Overflow range should have chunks"); 2522 } 2523 2524 #[test] 2525 fn test_chain_depth_limit() { 2526 // MAX_CHAIN_DEPTH is 5, so max 500 slots 2527 assert_eq!(MAX_CHAIN_DEPTH, 5); 2528 // Each range is DHT_MAX_SLOTS (100), so max absolute slot is 499 2529 } 2530 }