/ core / src / transport / dht.rs
dht.rs
   1  //! DHT (Distributed Hash Table) transport using Mainline DHT (BEP44).
   2  //!
   3  //! This module provides DHT-based message storage using BitTorrent's Mainline DHT
   4  //! with BEP44 mutable items. Unlike libp2p Kademlia, Mainline DHT public nodes
   5  //! accept arbitrary data storage.
   6  //!
   7  //! # Architecture
   8  //!
   9  //! ```text
  10  //! ┌─────────────────────────────────────────────────────────────┐
  11  //! │                    DhtTransport                             │
  12  //! │  - Client-mode only (no storage on mobile)                  │
  13  //! │  - Uses BEP44 mutable items for message storage             │
  14  //! │  - Key = SHA1(ed25519_pubkey || salt)                       │
  15  //! └─────────────────────────────────────────────────────────────┘
  16  //!                          │
  17  //!                          ▼
  18  //! ┌─────────────────────────────────────────────────────────────┐
  19  //! │                  Mainline DHT (BEP44)                       │
  20  //! │  - Public bootstrap nodes: router.bittorrent.com:6881      │
  21  //! │  - Stores mutable items with ed25519 signatures            │
  22  //! │  - ~1000 byte value limit per item                         │
  23  //! └─────────────────────────────────────────────────────────────┘
  24  //! ```
  25  //!
  26  //! # Usage
  27  //!
  28  //! ```ignore
  29  //! let config = DhtConfig {
  30  //!     bootstrap_nodes: vec!["router.bittorrent.com:6881".to_string()],
  31  //! };
  32  //!
  33  //! let mut dht = DhtTransport::new(config);
  34  //! dht.initialize().await?;
  35  //!
  36  //! // Publish messages for a recipient
  37  //! let messages = vec![DhtStoredMessage::new(...)];
  38  //! dht.publish_messages(recipient_key, messages).await?;
  39  //!
  40  //! // Fetch messages for our key
  41  //! let messages = dht.fetch_messages(our_public_key).await?;
  42  //! ```
  43  
  44  use std::collections::HashMap;
  45  use std::sync::Arc;
  46  use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
  47  
  48  use async_trait::async_trait;
  49  use bytes::Bytes;
  50  use ed25519_dalek::SigningKey;
  51  use mainline::{Dht, MutableItem};
  52  use serde::{Deserialize, Serialize};
  53  use sha2::{Digest, Sha512};
  54  use tokio::sync::{Mutex, RwLock};
  55  
  56  use super::{Transport, TransportAddress, TransportError, TransportResult, TransportType};
  57  
  58  // ============================================================================
  59  // CONSTANTS
  60  // ============================================================================
  61  
  62  /// Default TTL for messages stored in DHT (48 hours).
  63  pub const DEFAULT_MESSAGE_TTL_SECS: u64 = 48 * 60 * 60;
  64  
  65  /// Domain separator for deriving ed25519 keys from X25519 keys.
  66  const ED25519_DERIVATION_DOMAIN: &[u8] = b"deaddrop/dht/ed25519/v1";
  67  
  68  /// Salt prefix for BEP44 mutable items (slot number appended).
  69  const DHT_SALT_PREFIX: &str = "deaddrop/v1/slot/";
  70  
  71  /// Salt for the rendezvous slot (peer discovery of .onion addresses).
  72  const DHT_RENDEZVOUS_SALT: &str = "deaddrop/v1/rendezvous";
  73  
  74  /// DHT salt for iroh endpoint ID rendezvous (separate from Tor rendezvous).
  75  const DHT_IROH_RENDEZVOUS_SALT: &str = "deaddrop/v1/iroh-rendezvous";
  76  
  77  /// Minimum number of DHT slots for message distribution.
  78  /// Used for small message counts and as the baseline.
  79  const DHT_MIN_SLOTS: usize = 20;
  80  
  81  /// Maximum number of DHT slots for large file transfers.
  82  /// 100 slots allows transferring files up to ~100 chunks in one round.
  83  const DHT_MAX_SLOTS: usize = 100;
  84  
  85  /// Marker sender_key for continuation pointers in chained overflow slots.
  86  /// 0xCC = "continuation chain" — distinguishes from chunk markers (0xCE) and real messages.
  87  const CONTINUATION_MARKER: [u8; 32] = [0xCC; 32];
  88  
  89  /// Magic bytes identifying a continuation pointer payload.
  90  const CONTINUATION_MAGIC: &[u8] = b"CN";
  91  
  92  /// Maximum number of chained ranges (each range = DHT_MAX_SLOTS slots).
  93  /// 5 ranges = 500 slots total, preventing runaway publishing.
  94  const MAX_CHAIN_DEPTH: usize = 5;
  95  
  96  /// Calculate dynamic slot count based on message count.
  97  /// For small transfers, use minimum slots. For large transfers, scale up.
  98  fn calculate_slot_count(message_count: usize) -> usize {
  99      if message_count <= DHT_MIN_SLOTS {
 100          DHT_MIN_SLOTS
 101      } else {
 102          message_count.min(DHT_MAX_SLOTS)
 103      }
 104  }
 105  
 106  /// Maximum size for BEP44 values (~1000 bytes).
 107  const MAX_DHT_VALUE_SIZE: usize = 950;
 108  
 109  /// Overhead for chunk metadata (magic + message_id + total + index + length).
 110  /// Magic (2) + message_id (16) + total_chunks (1) + chunk_index (1) + payload_len (2) = 22 bytes
 111  const CHUNK_OVERHEAD: usize = 22;
 112  
 113  /// Maximum payload per chunk.
 114  /// Each chunk needs: payload + 24 bytes (chunk encoding) + 114 bytes (DhtStoredMessage wrapper).
 115  /// SLOT_BUDGET is 800 bytes, so max payload = 800 - 24 - 114 = 662 bytes.
 116  /// Using 600 bytes per chunk payload for safety margin.
 117  /// This gives ~50 micro-chunks per 30KB document chunk.
 118  const MAX_CHUNK_PAYLOAD: usize = 400;
 119  
 120  /// Magic bytes to identify chunked DHT values.
 121  const CHUNKED_MAGIC: &[u8] = b"CK";
 122  
 123  /// Default timeout for DHT operations.
 124  const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
 125  
 126  /// Maximum retry attempts for DHT operations.
 127  const MAX_RETRY_ATTEMPTS: u32 = 3;
 128  
 129  /// Base delay for exponential backoff (1 second).
 130  const RETRY_BASE_DELAY: Duration = Duration::from_secs(1);
 131  
 132  /// Delay before retry to avoid DHT query collisions (2 seconds).
 133  const RETRY_COLLISION_DELAY: Duration = Duration::from_secs(2);
 134  
 135  /// TTL for cached incomplete chunk sets (5 minutes).
 136  const CHUNK_CACHE_TTL: Duration = Duration::from_secs(300);
 137  
 138  // ============================================================================
 139  // PUBLISH OUTCOME
 140  // ============================================================================
 141  
 142  /// Result of a DHT publish operation with per-message success/failure tracking.
 143  #[derive(Debug, Clone, Default)]
 144  pub struct DhtPublishOutcome {
 145      /// Message IDs that were successfully published (all chunks stored).
 146      pub succeeded: Vec<[u8; 16]>,
 147      /// Message IDs that failed to publish (at least one chunk failed).
 148      pub failed: Vec<[u8; 16]>,
 149      /// Error messages for failed slots.
 150      pub errors: Vec<String>,
 151      /// Message IDs that were split into chunks at the transport layer.
 152      /// These should stay Queued for republish since partial chunk propagation
 153      /// means the receiver can't reassemble the message.
 154      pub chunked: Vec<[u8; 16]>,
 155  }
 156  
 157  // ============================================================================
 158  // CONFIGURATION
 159  // ============================================================================
 160  
 161  /// Configuration for the DHT transport.
 162  #[derive(Debug, Clone)]
 163  pub struct DhtConfig {
 164      /// Bootstrap node addresses (host:port format).
 165      pub bootstrap_nodes: Vec<String>,
 166      /// Query timeout in seconds.
 167      pub query_timeout_secs: u64,
 168  }
 169  
 170  impl Default for DhtConfig {
 171      fn default() -> Self {
 172          Self {
 173              bootstrap_nodes: vec![
 174                  "router.bittorrent.com:6881".to_string(),
 175                  "router.utorrent.com:6881".to_string(),
 176                  "dht.transmissionbt.com:6881".to_string(),
 177              ],
 178              query_timeout_secs: 30,
 179          }
 180      }
 181  }
 182  
 183  // ============================================================================
 184  // DHT STATE
 185  // ============================================================================
 186  
 187  /// State of the DHT transport.
 188  #[derive(Debug, Clone, PartialEq, Eq)]
 189  pub enum DhtState {
 190      /// Not connected to the DHT network.
 191      Disconnected,
 192      /// Bootstrapping - connecting to initial peers.
 193      Bootstrapping,
 194      /// Connected to the DHT network.
 195      Connected {
 196          /// Number of known peers.
 197          peer_count: usize,
 198      },
 199      /// Failed to connect.
 200      Failed {
 201          /// Error description.
 202          error: String,
 203      },
 204  }
 205  
 206  // ============================================================================
 207  // STORED MESSAGE
 208  // ============================================================================
 209  
 210  /// A message stored in the DHT for a recipient.
 211  ///
 212  /// Messages are stored at a key derived from the recipient's public key.
 213  /// Multiple messages can be stored for the same recipient.
 214  #[derive(Debug, Clone, Serialize, Deserialize)]
 215  pub struct DhtStoredMessage {
 216      /// Unique message identifier.
 217      pub message_id: [u8; 16],
 218      /// Sender's public key.
 219      pub sender_key: [u8; 32],
 220      /// Encrypted message payload (serialized EncryptedMessage).
 221      pub payload: Vec<u8>,
 222      /// Unix timestamp when message was stored.
 223      pub timestamp: u64,
 224      /// Unix timestamp when message expires.
 225      pub expires_at: u64,
 226  }
 227  
 228  impl DhtStoredMessage {
 229      /// Create a new DHT stored message.
 230      pub fn new(
 231          message_id: [u8; 16],
 232          sender_key: [u8; 32],
 233          payload: Vec<u8>,
 234          ttl_secs: Option<u64>,
 235      ) -> Self {
 236          let now = SystemTime::now()
 237              .duration_since(UNIX_EPOCH)
 238              .unwrap()
 239              .as_secs();
 240          let ttl = ttl_secs.unwrap_or(DEFAULT_MESSAGE_TTL_SECS);
 241  
 242          Self {
 243              message_id,
 244              sender_key,
 245              payload,
 246              timestamp: now,
 247              expires_at: now + ttl,
 248          }
 249      }
 250  
 251      /// Check if this message has expired.
 252      pub fn is_expired(&self) -> bool {
 253          let now = SystemTime::now()
 254              .duration_since(UNIX_EPOCH)
 255              .unwrap()
 256              .as_secs();
 257          now >= self.expires_at
 258      }
 259  
 260      /// Serialize the message for storage.
 261      pub fn serialize(&self) -> TransportResult<Vec<u8>> {
 262          bincode::serialize(self).map_err(|e| {
 263              TransportError::Internal(format!("Failed to serialize DhtStoredMessage: {}", e))
 264          })
 265      }
 266  
 267      /// Deserialize a message from storage.
 268      pub fn deserialize(data: &[u8]) -> TransportResult<Self> {
 269          bincode::deserialize(data).map_err(|e| {
 270              TransportError::Internal(format!("Failed to deserialize DhtStoredMessage: {}", e))
 271          })
 272      }
 273  }
 274  
 275  // ============================================================================
 276  // RENDEZVOUS VALUE
 277  // ============================================================================
 278  
 279  /// Value stored in the DHT rendezvous slot for peer discovery.
 280  /// Contains a .onion address and timestamp for freshness checking.
 281  #[derive(Debug, Clone, Serialize, Deserialize)]
 282  struct RendezvousValue {
 283      /// The .onion address (e.g., "abc123...xyz.onion")
 284      #[serde(rename = "a")]
 285      address: String,
 286      /// Unix timestamp when this was published
 287      #[serde(rename = "t")]
 288      timestamp: u64,
 289  }
 290  
 291  // ============================================================================
 292  // KEY DERIVATION
 293  // ============================================================================
 294  
 295  /// Derive ed25519 signing key from X25519 public key for BEP44 signing.
 296  ///
 297  /// Uses SHA-512 with domain separation to deterministically derive an ed25519
 298  /// seed from an X25519 key. This allows recipients to sign DHT entries using
 299  /// their exchange key.
 300  fn derive_ed25519_from_x25519(x25519_key: &[u8; 32]) -> SigningKey {
 301      let mut hasher = Sha512::new();
 302      hasher.update(ED25519_DERIVATION_DOMAIN);
 303      hasher.update(x25519_key);
 304      let hash = hasher.finalize();
 305  
 306      let mut seed = [0u8; 32];
 307      seed.copy_from_slice(&hash[..32]);
 308      SigningKey::from_bytes(&seed)
 309  }
 310  
 311  /// Generate salt for a specific DHT slot.
 312  ///
 313  /// Each slot uses a unique salt: "deaddrop/v1/slot/0", "deaddrop/v1/slot/1", etc.
 314  /// This allows storing up to DHT_MAX_SLOTS * MAX_DHT_VALUE_SIZE bytes per recipient.
 315  fn generate_slot_salt(slot: usize) -> Bytes {
 316      let salt = format!("{}{}", DHT_SALT_PREFIX, slot);
 317      Bytes::from(salt.into_bytes())
 318  }
 319  
 320  /// Encode a continuation pointer payload: "CN" + u16_be(next_slot_start).
 321  fn encode_continuation(next_slot_start: usize) -> Vec<u8> {
 322      let mut buf = Vec::with_capacity(4);
 323      buf.extend_from_slice(CONTINUATION_MAGIC);
 324      buf.extend_from_slice(&(next_slot_start as u16).to_be_bytes());
 325      buf
 326  }
 327  
 328  /// Decode a continuation pointer payload. Returns the next range's starting slot.
 329  fn decode_continuation(payload: &[u8]) -> Option<usize> {
 330      if payload.len() >= 4 && &payload[0..2] == CONTINUATION_MAGIC {
 331          let start = u16::from_be_bytes([payload[2], payload[3]]) as usize;
 332          Some(start)
 333      } else {
 334          None
 335      }
 336  }
 337  
 338  /// Check if a DhtStoredMessage is a continuation pointer.
 339  fn is_continuation_message(msg: &DhtStoredMessage) -> bool {
 340      msg.sender_key == CONTINUATION_MARKER
 341  }
 342  
 343  /// Create a continuation pointer message pointing to the next range start.
 344  fn make_continuation_message(next_slot_start: usize) -> DhtStoredMessage {
 345      DhtStoredMessage {
 346          message_id: [0xCC; 16], // Deterministic ID for continuation pointers
 347          sender_key: CONTINUATION_MARKER,
 348          payload: encode_continuation(next_slot_start),
 349          timestamp: 0,
 350          expires_at: SystemTime::now()
 351              .duration_since(UNIX_EPOCH)
 352              .unwrap()
 353              .as_secs()
 354              + DEFAULT_MESSAGE_TTL_SECS,
 355      }
 356  }
 357  
 358  // ============================================================================
 359  // MESSAGE ENCODING (Bencode for BEP44 with compression)
 360  // ============================================================================
 361  
 362  /// Magic bytes to identify compressed DHT values.
 363  const COMPRESSED_MAGIC: &[u8] = b"ZS";
 364  
 365  /// Encode messages for DHT storage using bencode with zstd compression.
 366  fn bencode_messages(messages: &[DhtStoredMessage]) -> Vec<u8> {
 367      // Compact format: list of [message_id, sender_key, payload, expires_at]
 368      let compact: Vec<(Vec<u8>, Vec<u8>, Vec<u8>, u64)> = messages
 369          .iter()
 370          .map(|m| {
 371              (
 372                  m.message_id.to_vec(),
 373                  m.sender_key.to_vec(),
 374                  m.payload.clone(),
 375                  m.expires_at,
 376              )
 377          })
 378          .collect();
 379  
 380      let bencoded = serde_bencode::to_bytes(&compact).unwrap_or_default();
 381  
 382      // Try to compress with zstd (level 3 is a good balance of speed/compression)
 383      match zstd::encode_all(bencoded.as_slice(), 3) {
 384          Ok(compressed) => {
 385              // Only use compression if it actually saves space
 386              if compressed.len() + COMPRESSED_MAGIC.len() < bencoded.len() {
 387                  let mut result = Vec::with_capacity(COMPRESSED_MAGIC.len() + compressed.len());
 388                  result.extend_from_slice(COMPRESSED_MAGIC);
 389                  result.extend_from_slice(&compressed);
 390                  debug_log!(
 391                      "[DHT DEBUG] Compressed {} -> {} bytes ({}% reduction)",
 392                      bencoded.len(),
 393                      result.len(),
 394                      100 - (result.len() * 100 / bencoded.len())
 395                  );
 396                  result
 397              } else {
 398                  bencoded
 399              }
 400          }
 401          Err(_) => bencoded,
 402      }
 403  }
 404  
 405  /// A chunk of a large message that was split for DHT storage.
 406  #[derive(Debug, Clone, Serialize, Deserialize)]
 407  struct MessageChunk {
 408      /// Original message ID (for reassembly).
 409      message_id: [u8; 16],
 410      /// Total number of chunks.
 411      total_chunks: u8,
 412      /// Index of this chunk (0-based).
 413      chunk_index: u8,
 414      /// Chunk payload.
 415      payload: Vec<u8>,
 416  }
 417  
 418  /// Encode a chunk for DHT storage.
 419  fn encode_chunk(chunk: &MessageChunk) -> Vec<u8> {
 420      let mut result = Vec::with_capacity(CHUNK_OVERHEAD + chunk.payload.len());
 421      result.extend_from_slice(CHUNKED_MAGIC);
 422      result.extend_from_slice(&chunk.message_id);
 423      result.push(chunk.total_chunks);
 424      result.push(chunk.chunk_index);
 425      result.extend_from_slice(&(chunk.payload.len() as u16).to_le_bytes());
 426      result.extend_from_slice(&chunk.payload);
 427      result
 428  }
 429  
 430  /// Decode a chunk from DHT storage.
 431  fn decode_chunk(data: &[u8]) -> Option<MessageChunk> {
 432      if data.len() < CHUNK_OVERHEAD || !data.starts_with(CHUNKED_MAGIC) {
 433          return None;
 434      }
 435      let mut message_id = [0u8; 16];
 436      message_id.copy_from_slice(&data[2..18]);
 437      let total_chunks = data[18];
 438      let chunk_index = data[19];
 439      let payload_len = u16::from_le_bytes([data[20], data[21]]) as usize;
 440      if data.len() < CHUNK_OVERHEAD + payload_len {
 441          return None;
 442      }
 443      let payload = data[22..22 + payload_len].to_vec();
 444      Some(MessageChunk {
 445          message_id,
 446          total_chunks,
 447          chunk_index,
 448          payload,
 449      })
 450  }
 451  
 452  /// Split a large message into chunks that fit in DHT slots.
 453  fn split_message_into_chunks(msg: &DhtStoredMessage) -> Vec<MessageChunk> {
 454      // Serialize the full message
 455      let full_data = msg.serialize().unwrap_or_default();
 456      let total_chunks = (full_data.len() + MAX_CHUNK_PAYLOAD - 1) / MAX_CHUNK_PAYLOAD;
 457  
 458      if total_chunks > 255 {
 459          debug_log!("[DHT WARN] Message too large to chunk (would need {} chunks)", total_chunks);
 460          return Vec::new();
 461      }
 462  
 463      let total_chunks = total_chunks as u8;
 464      let mut chunks = Vec::with_capacity(total_chunks as usize);
 465  
 466      for (i, chunk_data) in full_data.chunks(MAX_CHUNK_PAYLOAD).enumerate() {
 467          chunks.push(MessageChunk {
 468              message_id: msg.message_id,
 469              total_chunks,
 470              chunk_index: i as u8,
 471              payload: chunk_data.to_vec(),
 472          });
 473      }
 474  
 475      debug_log!(
 476          "[DHT DEBUG] Split message {} into {} chunks ({} bytes total)",
 477          hex::encode(&msg.message_id),
 478          chunks.len(),
 479          full_data.len()
 480      );
 481  
 482      chunks
 483  }
 484  
 485  /// Reassemble chunks into a complete message.
 486  fn reassemble_chunks(chunks: &[MessageChunk]) -> Option<DhtStoredMessage> {
 487      if chunks.is_empty() {
 488          return None;
 489      }
 490  
 491      let message_id = chunks[0].message_id;
 492      let total_chunks = chunks[0].total_chunks;
 493  
 494      // Verify all chunks belong to same message and we have all of them
 495      if chunks.len() != total_chunks as usize {
 496          debug_log!(
 497              "[DHT DEBUG] Incomplete chunks for {}: have {}/{}",
 498              hex::encode(&message_id),
 499              chunks.len(),
 500              total_chunks
 501          );
 502          return None;
 503      }
 504  
 505      // Sort by chunk index and verify continuity
 506      let mut sorted: Vec<_> = chunks.iter().collect();
 507      sorted.sort_by_key(|c| c.chunk_index);
 508  
 509      for (i, chunk) in sorted.iter().enumerate() {
 510          if chunk.chunk_index != i as u8 || chunk.message_id != message_id {
 511              debug_log!("[DHT WARN] Chunk mismatch during reassembly");
 512              return None;
 513          }
 514      }
 515  
 516      // Concatenate payloads
 517      let mut full_data = Vec::new();
 518      for chunk in sorted {
 519          full_data.extend_from_slice(&chunk.payload);
 520      }
 521  
 522      // Deserialize
 523      match DhtStoredMessage::deserialize(&full_data) {
 524          Ok(msg) => {
 525              debug_log!(
 526                  "[DHT DEBUG] Reassembled message {} from {} chunks",
 527                  hex::encode(&msg.message_id),
 528                  total_chunks
 529              );
 530              Some(msg)
 531          }
 532          Err(e) => {
 533              debug_log!("[DHT WARN] Failed to deserialize reassembled message: {}", e);
 534              None
 535          }
 536      }
 537  }
 538  
 539  /// Decode messages from DHT storage (handles both compressed and uncompressed).
 540  fn decode_messages(data: &[u8]) -> Vec<DhtStoredMessage> {
 541      // Check for compression magic bytes
 542      let decoded_data = if data.starts_with(COMPRESSED_MAGIC) {
 543          match zstd::decode_all(&data[COMPRESSED_MAGIC.len()..]) {
 544              Ok(decompressed) => decompressed,
 545              Err(e) => {
 546                  debug_log!("[DHT WARN] Failed to decompress DHT data: {}", e);
 547                  return Vec::new();
 548              }
 549          }
 550      } else {
 551          data.to_vec()
 552      };
 553  
 554      if let Ok(compact) =
 555          serde_bencode::from_bytes::<Vec<(Vec<u8>, Vec<u8>, Vec<u8>, u64)>>(&decoded_data)
 556      {
 557          compact
 558              .into_iter()
 559              .filter_map(|(id, sender, payload, expires)| {
 560                  if id.len() == 16 && sender.len() == 32 {
 561                      let mut message_id = [0u8; 16];
 562                      let mut sender_key = [0u8; 32];
 563                      message_id.copy_from_slice(&id);
 564                      sender_key.copy_from_slice(&sender);
 565  
 566                      Some(DhtStoredMessage {
 567                          message_id,
 568                          sender_key,
 569                          payload,
 570                          timestamp: 0, // Not stored in DHT
 571                          expires_at: expires,
 572                      })
 573                  } else {
 574                      None
 575                  }
 576              })
 577              .collect()
 578      } else {
 579          vec![]
 580      }
 581  }
 582  
 583  // ============================================================================
 584  // DHT TRANSPORT
 585  // ============================================================================
 586  
 587  /// DHT transport using Mainline DHT with BEP44 mutable items.
 588  ///
 589  /// This transport uses BitTorrent's Mainline DHT to store and retrieve
 590  /// encrypted messages. Unlike libp2p Kademlia, Mainline DHT public nodes
 591  /// accept arbitrary data storage via BEP44.
 592  pub struct DhtTransport {
 593      /// Transport configuration.
 594      config: DhtConfig,
 595      /// Current state.
 596      state: Arc<RwLock<DhtState>>,
 597      /// Whether initialized.
 598      initialized: Arc<RwLock<bool>>,
 599      /// Mainline DHT client.
 600      dht: Arc<RwLock<Option<Dht>>>,
 601      /// Cached messages for recipients (keyed by X25519 public key).
 602      message_cache: Arc<RwLock<HashMap<[u8; 32], Vec<DhtStoredMessage>>>>,
 603      /// Mutex to serialize PUT operations (prevents PutQueryIsInflight errors).
 604      put_mutex: Arc<Mutex<()>>,
 605      /// Cache for incomplete chunk sets across fetch cycles.
 606      /// Key: message_id, Value: (chunks by index, first_seen timestamp).
 607      chunk_cache: Arc<RwLock<HashMap<[u8; 16], (HashMap<u8, MessageChunk>, Instant)>>>,
 608  }
 609  
 610  impl DhtTransport {
 611      /// Create a new DHT transport.
 612      pub fn new(config: DhtConfig) -> Self {
 613          Self {
 614              config,
 615              state: Arc::new(RwLock::new(DhtState::Disconnected)),
 616              initialized: Arc::new(RwLock::new(false)),
 617              dht: Arc::new(RwLock::new(None)),
 618              message_cache: Arc::new(RwLock::new(HashMap::new())),
 619              put_mutex: Arc::new(Mutex::new(())),
 620              chunk_cache: Arc::new(RwLock::new(HashMap::new())),
 621          }
 622      }
 623  
 624      /// Get the current DHT state.
 625      pub async fn state(&self) -> DhtState {
 626          self.state.read().await.clone()
 627      }
 628  
 629      /// Get the number of connected peers.
 630      pub async fn peer_count(&self) -> usize {
 631          match &*self.state.read().await {
 632              DhtState::Connected { peer_count } => *peer_count,
 633              _ => 0,
 634          }
 635      }
 636  
 637      /// Check if the DHT is ready (initialized and connected).
 638      pub async fn is_ready(&self) -> bool {
 639          let initialized = *self.initialized.read().await;
 640          let state = self.state.read().await;
 641          initialized && matches!(&*state, DhtState::Connected { .. })
 642      }
 643  
 644      // =========================================================================
 645      // MESSAGE STORAGE METHODS
 646      // =========================================================================
 647  
 648      /// Publish messages to the DHT for a recipient.
 649      ///
 650      /// Messages are distributed across multiple DHT slots to overcome the
 651      /// ~1000 byte BEP44 value limit. With 10 slots, we can store ~50x more
 652      /// messages per recipient.
 653      ///
 654      /// # Arguments
 655      ///
 656      /// * `recipient_key` - The recipient's X25519 public key
 657      /// * `messages` - Messages to store for this recipient
 658      ///
 659      /// # Returns
 660      ///
 661      /// `DhtPublishOutcome` containing lists of succeeded and failed message IDs.
 662      /// A message is only considered successful if ALL its slots/chunks were stored.
 663      pub async fn publish_messages(
 664          &self,
 665          recipient_key: [u8; 32],
 666          messages: Vec<DhtStoredMessage>,
 667      ) -> TransportResult<DhtPublishOutcome> {
 668          if !*self.initialized.read().await {
 669              return Err(TransportError::NotInitialized);
 670          }
 671  
 672          // Collect all message IDs being published
 673          let all_message_ids: std::collections::HashSet<[u8; 16]> =
 674              messages.iter().map(|m| m.message_id).collect();
 675  
 676          if messages.is_empty() {
 677              return Ok(DhtPublishOutcome::default());
 678          }
 679  
 680          // Cache locally first
 681          {
 682              let mut cache = self.message_cache.write().await;
 683              let entry = cache.entry(recipient_key).or_insert_with(Vec::new);
 684  
 685              for msg in &messages {
 686                  // Don't add duplicates
 687                  if !entry.iter().any(|m| m.message_id == msg.message_id) {
 688                      entry.push(msg.clone());
 689                  }
 690              }
 691  
 692              // Remove expired messages
 693              entry.retain(|m| !m.is_expired());
 694          }
 695  
 696          // Try to publish to DHT network
 697          let dht_guard = self.dht.read().await;
 698          if dht_guard.is_some() {
 699              drop(dht_guard); // Release lock before async operations
 700  
 701              // Get all cached messages for this recipient
 702              let mut all_messages = {
 703                  let cache = self.message_cache.read().await;
 704                  cache.get(&recipient_key).cloned().unwrap_or_default()
 705              };
 706  
 707              // Limit messages to prevent slot overflow
 708              // With 10 slots at ~800 bytes each, we can fit ~10 small messages or ~2 large chunked messages
 709              // Prioritize large messages (which need chunking) first, then most recent small messages
 710              const MAX_MESSAGES_PER_PUBLISH: usize = 5;
 711              if all_messages.len() > MAX_MESSAGES_PER_PUBLISH {
 712                  // Partition into large (need chunking) and small messages
 713                  let (large, mut small): (Vec<_>, Vec<_>) = all_messages
 714                      .into_iter()
 715                      .partition(|m| {
 716                          let encoded_size = bencode_messages(&[m.clone()]).len();
 717                          encoded_size >= MAX_DHT_VALUE_SIZE
 718                      });
 719                  // Sort small messages by most recent first
 720                  small.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
 721                  // Large (chunked) messages always get priority
 722                  let remaining_slots = MAX_MESSAGES_PER_PUBLISH.saturating_sub(large.len());
 723                  small.truncate(remaining_slots);
 724                  all_messages = large;
 725                  all_messages.extend(small);
 726                  debug_log!(
 727                      "[DHT INFO] Limiting publish to {} messages ({} large + {} small)",
 728                      all_messages.len(),
 729                      all_messages.len() - all_messages.iter().filter(|m| bencode_messages(&[(*m).clone()]).len() < MAX_DHT_VALUE_SIZE).count(),
 730                      all_messages.iter().filter(|m| bencode_messages(&[(*m).clone()]).len() < MAX_DHT_VALUE_SIZE).count()
 731                  );
 732              }
 733  
 734              // Distribute messages across slots
 735              let (slot_assignments, chunked_message_ids) = self.distribute_messages_to_slots(&all_messages);
 736  
 737              // Build slot -> message_ids mapping to track which messages are affected by slot failures
 738              let slot_count = slot_assignments.len();
 739              let mut slot_to_messages: Vec<std::collections::HashSet<[u8; 16]>> =
 740                  vec![std::collections::HashSet::new(); slot_count];
 741              for (slot, slot_msgs) in slot_assignments.iter().enumerate() {
 742                  for msg in slot_msgs {
 743                      slot_to_messages[slot].insert(msg.message_id);
 744                  }
 745              }
 746  
 747              // Track which slots failed
 748              let mut failed_slots: std::collections::HashSet<usize> = std::collections::HashSet::new();
 749  
 750              // Publish each slot (could parallelize, but keeping sequential for reliability)
 751              let mut errors = Vec::new();
 752              for (slot, slot_messages) in slot_assignments.into_iter().enumerate() {
 753                  if slot_messages.is_empty() {
 754                      continue;
 755                  }
 756  
 757                  let value = bencode_messages(&slot_messages);
 758                  let slot_failed;
 759                  if value.len() > MAX_DHT_VALUE_SIZE {
 760                      debug_log!(
 761                          "[DHT WARN] Slot {} value too large ({} bytes), truncating",
 762                          slot, value.len()
 763                      );
 764                      // Keep only messages that fit in this slot
 765                      let mut kept = Vec::new();
 766                      let mut size = 0;
 767                      for msg in slot_messages.iter() {
 768                          let encoded = bencode_messages(&[msg.clone()]);
 769                          if size + encoded.len() < MAX_DHT_VALUE_SIZE {
 770                              kept.push(msg.clone());
 771                              size += encoded.len();
 772                          }
 773                      }
 774                      let value = bencode_messages(&kept);
 775                      slot_failed = self.put_mutable_item_to_slot(&recipient_key, &value, slot).await.is_err();
 776                      if slot_failed {
 777                          errors.push(format!("Slot {}: PUT failed", slot));
 778                      }
 779                  } else {
 780                      slot_failed = match self.put_mutable_item_to_slot(&recipient_key, &value, slot).await {
 781                          Ok(()) => false,
 782                          Err(e) => {
 783                              errors.push(format!("Slot {}: {}", slot, e));
 784                              true
 785                          }
 786                      };
 787                  }
 788  
 789                  if slot_failed {
 790                      failed_slots.insert(slot);
 791                  }
 792              }
 793  
 794              // Verification: Try to read back what we just wrote to slot 0
 795              // This helps diagnose whether data is being stored at all
 796              tokio::time::sleep(Duration::from_millis(500)).await; // Brief delay for propagation
 797  
 798              let signing_key = derive_ed25519_from_x25519(&recipient_key);
 799              let verifying_key = signing_key.verifying_key();
 800              let pk_bytes: [u8; 32] = verifying_key.to_bytes();
 801              let salt = generate_slot_salt(0);
 802  
 803              let dht_guard = self.dht.read().await;
 804              if let Some(dht) = dht_guard.as_ref() {
 805                  let dht_clone = dht.clone();
 806                  drop(dht_guard);
 807  
 808                  match tokio::task::spawn_blocking(move || {
 809                      let mut iter = dht_clone.get_mutable(&pk_bytes, Some(salt.as_ref()), None);
 810                      iter.next()
 811                  })
 812                  .await
 813                  {
 814                      Ok(Some(item)) => {
 815                          debug_log!(
 816                              "[DHT VERIFY] Read-back successful: slot 0 has {} bytes (seq={})",
 817                              item.value().len(),
 818                              item.seq()
 819                          );
 820                      }
 821                      Ok(None) => {
 822                          debug_log!(
 823                              "[DHT VERIFY] Read-back FAILED: slot 0 returned None - data may not have propagated"
 824                          );
 825                      }
 826                      Err(e) => {
 827                          debug_log!("[DHT VERIFY] Read-back error: {:?}", e);
 828                      }
 829                  }
 830              }
 831  
 832              // Determine which messages succeeded vs failed based on slot failures
 833              // A message fails if ANY of its slots failed (including all chunks for chunked messages)
 834              let mut failed_message_ids: std::collections::HashSet<[u8; 16]> = std::collections::HashSet::new();
 835              for failed_slot in &failed_slots {
 836                  for msg_id in &slot_to_messages[*failed_slot] {
 837                      failed_message_ids.insert(*msg_id);
 838                  }
 839              }
 840  
 841              // Succeeded = messages we were trying to publish that didn't fail
 842              let succeeded: Vec<[u8; 16]> = all_message_ids
 843                  .iter()
 844                  .filter(|id| !failed_message_ids.contains(*id))
 845                  .copied()
 846                  .collect();
 847              let failed: Vec<[u8; 16]> = all_message_ids
 848                  .iter()
 849                  .filter(|id| failed_message_ids.contains(*id))
 850                  .copied()
 851                  .collect();
 852  
 853              debug_log!(
 854                  "[DHT INFO] Publish outcome: {} succeeded, {} failed",
 855                  succeeded.len(),
 856                  failed.len()
 857              );
 858  
 859              if !errors.is_empty() {
 860                  debug_log!("[DHT WARN] Slot errors: {:?}", errors);
 861              }
 862  
 863              Ok(DhtPublishOutcome {
 864                  succeeded,
 865                  failed,
 866                  errors,
 867                  chunked: chunked_message_ids,
 868              })
 869          } else {
 870              debug_log!("[DHT WARN] DHT not available, using local cache only");
 871              // When DHT not available, mark all as failed
 872              Ok(DhtPublishOutcome {
 873                  succeeded: Vec::new(),
 874                  failed: all_message_ids.into_iter().collect(),
 875                  errors: vec!["DHT not available".to_string()],
 876                  chunked: Vec::new(),
 877              })
 878          }
 879      }
 880  
 881      /// Distribute messages across DHT slots with chained overflow.
 882      ///
 883      /// Messages are distributed across slots while respecting the size limit
 884      /// per slot. Large messages are split into chunks. If messages overflow
 885      /// the base range (slots 0–99), continuation pointers chain to additional
 886      /// ranges (100–199, 200–299, etc.) up to MAX_CHAIN_DEPTH ranges.
 887      /// Returns (slot_assignments, chunked_message_ids).
 888      fn distribute_messages_to_slots(
 889          &self,
 890          messages: &[DhtStoredMessage],
 891      ) -> (Vec<Vec<DhtStoredMessage>>, Vec<[u8; 16]>) {
 892          self.distribute_messages_to_range(messages, 0, 1)
 893      }
 894  
 895      /// Internal: distribute messages into a single range starting at `range_start`.
 896      /// Returns a flat slot vec indexed from 0 (representing absolute slots starting at `range_start`).
 897      /// If overflow occurs, chains to the next range via continuation pointer.
 898      /// Returns (slot_assignments, chunked_message_ids).
 899      fn distribute_messages_to_range(
 900          &self,
 901          messages: &[DhtStoredMessage],
 902          range_start: usize,
 903          chain_depth: usize,
 904      ) -> (Vec<Vec<DhtStoredMessage>>, Vec<[u8; 16]>) {
 905          // Use dynamic slot count based on number of messages (capped at DHT_MAX_SLOTS)
 906          let slot_count = calculate_slot_count(messages.len());
 907          debug_log!(
 908              "[DHT DEBUG] Range {}: using {} slots for {} messages (chain depth {})",
 909              range_start, slot_count, messages.len(), chain_depth
 910          );
 911  
 912          let mut slots: Vec<Vec<DhtStoredMessage>> = (0..slot_count).map(|_| Vec::new()).collect();
 913          let mut slot_sizes: Vec<usize> = vec![0; slot_count];
 914          let mut pending_chunks: Vec<MessageChunk> = Vec::new();
 915          let mut small_messages: Vec<(DhtStoredMessage, usize)> = Vec::new();
 916          let mut overflow_messages: Vec<DhtStoredMessage> = Vec::new();
 917          let mut chunked_ids: Vec<[u8; 16]> = Vec::new();
 918  
 919          // Phase 1: Separate large (need chunking) from small messages
 920          for msg in messages {
 921              let encoded_size = bencode_messages(&[msg.clone()]).len();
 922  
 923              if encoded_size >= MAX_DHT_VALUE_SIZE {
 924                  debug_log!(
 925                      "[DHT INFO] Message {} too large ({} bytes), splitting into chunks",
 926                      hex::encode(&msg.message_id),
 927                      encoded_size
 928                  );
 929                  chunked_ids.push(msg.message_id);
 930                  let chunks = split_message_into_chunks(msg);
 931                  pending_chunks.extend(chunks);
 932              } else {
 933                  small_messages.push((msg.clone(), encoded_size));
 934              }
 935          }
 936  
 937          // Phase 2: Place chunks FIRST in dedicated slots (highest priority)
 938          const SLOT_BUDGET: usize = 1500;
 939  
 940          // Track overflow chunks that don't fit in this range
 941          let mut overflow_chunks: Vec<MessageChunk> = Vec::new();
 942  
 943          if !pending_chunks.is_empty() {
 944              debug_log!(
 945                  "[DHT DEBUG] Distributing {} chunks across slots (budget: {} bytes/slot)",
 946                  pending_chunks.len(),
 947                  SLOT_BUDGET
 948              );
 949  
 950              // Group chunks by message for efficient distribution
 951              let mut chunk_map: HashMap<[u8; 16], Vec<MessageChunk>> = HashMap::new();
 952              for chunk in pending_chunks {
 953                  chunk_map.entry(chunk.message_id).or_default().push(chunk);
 954              }
 955  
 956              let chunk_overhead: usize = 114; // DhtStoredMessage wrapper overhead
 957  
 958              // Reserve the last slot for a potential continuation pointer
 959              // (we'll only use it if there's actual overflow)
 960              let usable_slots = if chain_depth < MAX_CHAIN_DEPTH { slot_count - 1 } else { slot_count };
 961  
 962              for (_msg_id, chunks) in chunk_map {
 963                  for chunk in chunks {
 964                      let chunk_encoded = encode_chunk(&chunk);
 965                      let chunk_size = chunk_encoded.len() + chunk_overhead;
 966  
 967                      // Find an EMPTY slot for this chunk (dedicated, not shared)
 968                      // Only search within usable_slots (excluding reserved last slot)
 969                      let best_slot = slot_sizes[..usable_slots]
 970                          .iter()
 971                          .enumerate()
 972                          .filter(|(_, &size)| size == 0)
 973                          .next()
 974                          .map(|(idx, _)| idx)
 975                          .or_else(|| {
 976                              // Fallback: find any slot with space within usable range
 977                              slot_sizes[..usable_slots]
 978                                  .iter()
 979                                  .enumerate()
 980                                  .filter(|(_, &size)| size + chunk_size < SLOT_BUDGET)
 981                                  .min_by_key(|(_, &size)| size)
 982                                  .map(|(idx, _)| idx)
 983                          });
 984  
 985                      if let Some(slot) = best_slot {
 986                          let chunk_msg = DhtStoredMessage {
 987                              message_id: chunk.message_id,
 988                              sender_key: [0xCE; 32],
 989                              payload: chunk_encoded,
 990                              timestamp: 0,
 991                              expires_at: SystemTime::now()
 992                                  .duration_since(UNIX_EPOCH)
 993                                  .unwrap()
 994                                  .as_secs()
 995                                  + DEFAULT_MESSAGE_TTL_SECS,
 996                          };
 997                          slots[slot].push(chunk_msg);
 998                          slot_sizes[slot] = SLOT_BUDGET;
 999                          debug_log!(
1000                              "[DHT DEBUG] Chunk {}/{} assigned to slot {} (absolute {})",
1001                              chunk.chunk_index + 1,
1002                              chunk.total_chunks,
1003                              slot,
1004                              range_start + slot
1005                          );
1006                      } else {
1007                          // No space — this chunk overflows to next range
1008                          overflow_chunks.push(chunk);
1009                      }
1010                  }
1011              }
1012          }
1013  
1014          // Phase 3: Place small messages in remaining slots (after chunks have priority)
1015          let usable_slots = if chain_depth < MAX_CHAIN_DEPTH { slot_count - 1 } else { slot_count };
1016          for (msg, encoded_size) in small_messages {
1017              let best_slot = slot_sizes[..usable_slots]
1018                  .iter()
1019                  .enumerate()
1020                  .filter(|(_, &size)| size + encoded_size < MAX_DHT_VALUE_SIZE)
1021                  .min_by_key(|(_, &size)| size)
1022                  .map(|(idx, _)| idx);
1023  
1024              if let Some(slot) = best_slot {
1025                  slots[slot].push(msg);
1026                  slot_sizes[slot] += encoded_size;
1027              } else {
1028                  // Overflow small message to next range
1029                  overflow_messages.push(msg);
1030              }
1031          }
1032  
1033          // Convert overflow chunks back into DhtStoredMessages for the next range
1034          for chunk in overflow_chunks {
1035              let chunk_encoded = encode_chunk(&chunk);
1036              let chunk_msg = DhtStoredMessage {
1037                  message_id: chunk.message_id,
1038                  sender_key: [0xCE; 32],
1039                  payload: chunk_encoded,
1040                  timestamp: 0,
1041                  expires_at: SystemTime::now()
1042                      .duration_since(UNIX_EPOCH)
1043                      .unwrap()
1044                      .as_secs()
1045                      + DEFAULT_MESSAGE_TTL_SECS,
1046              };
1047              overflow_messages.push(chunk_msg);
1048          }
1049  
1050          // Phase 4: Chain overflow to next range if needed
1051          if !overflow_messages.is_empty() && chain_depth < MAX_CHAIN_DEPTH {
1052              let next_range_start = range_start + DHT_MAX_SLOTS;
1053              debug_log!(
1054                  "[DHT INFO] Overflow: {} messages/chunks don't fit in range {}–{}. Chaining to range starting at slot {}",
1055                  overflow_messages.len(),
1056                  range_start,
1057                  range_start + slot_count - 1,
1058                  next_range_start
1059              );
1060  
1061              // Place continuation pointer in the last slot of this range
1062              let continuation = make_continuation_message(next_range_start);
1063              // Ensure we have enough slots to place the continuation pointer
1064              while slots.len() < DHT_MAX_SLOTS {
1065                  slots.push(Vec::new());
1066                  slot_sizes.push(0);
1067              }
1068              slots[DHT_MAX_SLOTS - 1] = vec![continuation];
1069  
1070              // Recursively distribute overflow into next range
1071              let (overflow_slots, overflow_chunked) = self.distribute_messages_to_range(
1072                  &overflow_messages,
1073                  next_range_start,
1074                  chain_depth + 1,
1075              );
1076              chunked_ids.extend(overflow_chunked);
1077  
1078              // Pad current range to full DHT_MAX_SLOTS and append overflow
1079              while slots.len() < DHT_MAX_SLOTS {
1080                  slots.push(Vec::new());
1081              }
1082              slots.extend(overflow_slots);
1083          } else if !overflow_messages.is_empty() {
1084              debug_log!(
1085                  "[DHT WARN] Chain depth limit ({}) reached. {} messages/chunks dropped.",
1086                  MAX_CHAIN_DEPTH,
1087                  overflow_messages.len()
1088              );
1089          }
1090  
1091          (slots, chunked_ids)
1092      }
1093  
1094      /// Put a mutable item to a specific DHT slot with retry logic.
1095      async fn put_mutable_item_to_slot(
1096          &self,
1097          recipient_key: &[u8; 32],
1098          value: &[u8],
1099          slot: usize,
1100      ) -> TransportResult<()> {
1101          // Acquire mutex to serialize PUT operations (prevents PutQueryIsInflight errors)
1102          let _put_lock = self.put_mutex.lock().await;
1103  
1104          let dht_guard = self.dht.read().await;
1105          let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?;
1106  
1107          // Derive ed25519 signing key from recipient's X25519 key
1108          let signing_key = derive_ed25519_from_x25519(recipient_key);
1109          let verifying_key = signing_key.verifying_key();
1110          let ed25519_pubkey: [u8; 32] = verifying_key.to_bytes();
1111  
1112          // Use current timestamp as sequence number
1113          let seq = SystemTime::now()
1114              .duration_since(UNIX_EPOCH)
1115              .unwrap()
1116              .as_secs() as i64;
1117  
1118          debug_log!(
1119              "[DHT DEBUG] PUT slot {}: X25519={}, ed25519={}, seq={}, value_len={}",
1120              slot,
1121              hex::encode(recipient_key),
1122              hex::encode(&ed25519_pubkey),
1123              seq,
1124              value.len()
1125          );
1126  
1127          // Create mutable item with slot-specific salt
1128          let salt_bytes = generate_slot_salt(slot);
1129          let item = MutableItem::new(signing_key, value, seq, Some(salt_bytes.as_ref()));
1130  
1131          // Retry with exponential backoff
1132          let mut last_error = None;
1133          for attempt in 0..MAX_RETRY_ATTEMPTS {
1134              if attempt > 0 {
1135                  let delay = RETRY_BASE_DELAY * (1 << (attempt - 1)); // 1s, 2s, 4s
1136                  tokio::time::sleep(delay).await;
1137                  debug_log!("[DHT INFO] Retry attempt {} for slot {}", attempt + 1, slot);
1138              }
1139  
1140              let dht_clone = dht.clone();
1141              let item_clone = item.clone();
1142              let salt_for_log = generate_slot_salt(slot);
1143              match tokio::task::spawn_blocking(move || dht_clone.put_mutable(item_clone, None)).await {
1144                  Ok(Ok(stored_id)) => {
1145                      // Log what was actually stored - Id is the infohash target
1146                      debug_log!(
1147                          "[DHT INFO] Published to slot {} (attempt {}): id={:?}, salt={}, seq={}",
1148                          slot, attempt + 1,
1149                          stored_id,
1150                          String::from_utf8_lossy(salt_for_log.as_ref()),
1151                          seq
1152                      );
1153                      // Small delay to let the PUT complete before next operation
1154                      tokio::time::sleep(Duration::from_millis(100)).await;
1155                      return Ok(());
1156                  }
1157                  Ok(Err(e)) => {
1158                      debug_log!("[DHT WARN] PUT to slot {} failed (attempt {}): {:?}", slot, attempt + 1, e);
1159                      last_error = Some(format!("DHT PUT failed: {:?}", e));
1160                  }
1161                  Err(e) => {
1162                      debug_log!("[DHT WARN] Task failed for slot {} (attempt {}): {:?}", slot, attempt + 1, e);
1163                      last_error = Some(format!("Task failed: {:?}", e));
1164                  }
1165              }
1166          }
1167  
1168          Err(TransportError::Internal(last_error.unwrap_or_else(|| "Unknown error".to_string())))
1169      }
1170  
1171      /// Fetch messages from the DHT for our public key.
1172      ///
1173      /// Retrieves all unexpired messages from all DHT slots concurrently.
1174      /// This provides ~50x capacity compared to single-slot implementation.
1175      ///
1176      /// # Arguments
1177      ///
1178      /// * `our_public_key` - Our X25519 public key to fetch messages for
1179      pub async fn fetch_messages(
1180          &self,
1181          our_public_key: [u8; 32],
1182      ) -> TransportResult<Vec<DhtStoredMessage>> {
1183          if !*self.initialized.read().await {
1184              return Err(TransportError::NotInitialized);
1185          }
1186  
1187          // Always fetch from DHT network - don't use cache to short-circuit
1188          // The cache caused stale data issues where new messages were never fetched
1189          let dht_guard = self.dht.read().await;
1190          if dht_guard.is_some() {
1191              drop(dht_guard); // Release lock before async operations
1192  
1193              let all_messages = self.fetch_all_slots(our_public_key).await;
1194              return Ok(all_messages);
1195          }
1196  
1197          Ok(Vec::new())
1198      }
1199  
1200      /// Fetch messages from all DHT slots concurrently, following continuation chains.
1201      ///
1202      /// Queries the base range (slots 0–99) in parallel. If a continuation pointer
1203      /// is found, fetches the next range, and so on up to MAX_CHAIN_DEPTH ranges.
1204      async fn fetch_all_slots(&self, our_public_key: [u8; 32]) -> Vec<DhtStoredMessage> {
1205          let dht_guard = self.dht.read().await;
1206          let dht = match dht_guard.as_ref() {
1207              Some(d) => d.clone(),
1208              None => return Vec::new(),
1209          };
1210          drop(dht_guard);
1211  
1212          // Derive ed25519 public key from our X25519 key
1213          let signing_key = derive_ed25519_from_x25519(&our_public_key);
1214          let public_key = signing_key.verifying_key();
1215          let public_key_bytes: [u8; 32] = public_key.to_bytes();
1216  
1217          debug_log!(
1218              "[DHT DEBUG] FETCH: X25519={}, ed25519={}",
1219              hex::encode(&our_public_key),
1220              hex::encode(&public_key_bytes)
1221          );
1222  
1223          let mut all_messages = Vec::new();
1224          let mut seen_ids = std::collections::HashSet::new();
1225          let mut pending_chunks: HashMap<[u8; 16], HashMap<u8, MessageChunk>> = HashMap::new();
1226          let mut total_slots_fetched: usize = 0;
1227  
1228          // Marker for chunk messages
1229          const CHUNK_MARKER: [u8; 32] = [0xCE; 32];
1230  
1231          // Follow continuation chain: start with base range, follow pointers
1232          let mut range_start: usize = 0;
1233          let mut chain_depth: usize = 0;
1234  
1235          loop {
1236              let range_end = range_start + DHT_MAX_SLOTS;
1237              debug_log!(
1238                  "[DHT DEBUG] Fetching range {}–{} (chain depth {})",
1239                  range_start, range_end - 1, chain_depth
1240              );
1241  
1242              // Spawn concurrent fetch tasks for this range
1243              let mut handles = Vec::new();
1244              for slot in range_start..range_end {
1245                  let dht_clone = dht.clone();
1246                  let pk_bytes = public_key_bytes;
1247                  let salt = generate_slot_salt(slot);
1248  
1249                  let handle = tokio::spawn(async move {
1250                      tokio::time::timeout(
1251                          DEFAULT_OPERATION_TIMEOUT,
1252                          tokio::task::spawn_blocking(move || {
1253                              let mut iter = dht_clone.get_mutable(&pk_bytes, Some(salt.as_ref()), None);
1254                              iter.next()
1255                          }),
1256                      )
1257                      .await
1258                  });
1259                  handles.push((slot, handle));
1260              }
1261  
1262              // Collect results from this range
1263              let mut next_range_start: Option<usize> = None;
1264  
1265              for (slot, handle) in handles {
1266                  match handle.await {
1267                      Ok(Ok(Ok(Some(item)))) => {
1268                          let value = item.value();
1269                          debug_log!("[DHT DEBUG] Slot {} returned {} bytes (seq={})", slot, value.len(), item.seq());
1270  
1271                          let messages = decode_messages(value);
1272                          for msg in messages {
1273                              if msg.is_expired() {
1274                                  continue;
1275                              }
1276  
1277                              // Check for continuation pointer
1278                              if is_continuation_message(&msg) {
1279                                  if let Some(next_start) = decode_continuation(&msg.payload) {
1280                                      debug_log!(
1281                                          "[DHT INFO] Found continuation pointer in slot {} → next range starts at {}",
1282                                          slot, next_start
1283                                      );
1284                                      next_range_start = Some(next_start);
1285                                  }
1286                                  continue; // Don't add continuation pointers as regular messages
1287                              }
1288  
1289                              // Check if this is a chunk message
1290                              if msg.sender_key == CHUNK_MARKER {
1291                                  if let Some(chunk) = decode_chunk(&msg.payload) {
1292                                      debug_log!(
1293                                          "[DHT DEBUG] Found chunk {}/{} for message {}",
1294                                          chunk.chunk_index + 1,
1295                                          chunk.total_chunks,
1296                                          hex::encode(&chunk.message_id)
1297                                      );
1298                                      pending_chunks
1299                                          .entry(chunk.message_id)
1300                                          .or_default()
1301                                          .insert(chunk.chunk_index, chunk);
1302                                  }
1303                              } else if seen_ids.insert(msg.message_id) {
1304                                  all_messages.push(msg);
1305                              }
1306                          }
1307                      }
1308                      Ok(Ok(Ok(None))) => {
1309                          // Only log for base range to avoid noise
1310                          if chain_depth == 0 {
1311                              debug_log!("[DHT DEBUG] Slot {} returned None (no data)", slot);
1312                          }
1313                      }
1314                      Ok(Ok(Err(e))) => {
1315                          debug_log!("[DHT WARN] Slot {} task failed: {:?}", slot, e);
1316                      }
1317                      Ok(Err(_)) => {
1318                          debug_log!("[DHT WARN] Slot {} timed out", slot);
1319                      }
1320                      Err(e) => {
1321                          debug_log!("[DHT WARN] Slot {} join error: {:?}", slot, e);
1322                      }
1323                  }
1324              }
1325  
1326              total_slots_fetched += DHT_MAX_SLOTS;
1327              chain_depth += 1;
1328  
1329              // Follow continuation chain or stop
1330              match next_range_start {
1331                  Some(next_start) if chain_depth < MAX_CHAIN_DEPTH => {
1332                      debug_log!(
1333                          "[DHT INFO] Following continuation chain to range starting at slot {}",
1334                          next_start
1335                      );
1336                      range_start = next_start;
1337                  }
1338                  Some(_) => {
1339                      debug_log!(
1340                          "[DHT WARN] Continuation chain depth limit ({}) reached, stopping",
1341                          MAX_CHAIN_DEPTH
1342                      );
1343                      break;
1344                  }
1345                  None => break, // No continuation pointer found, done
1346              }
1347          }
1348  
1349          // Try to reassemble chunked messages, merging with cached chunks from previous fetches
1350          {
1351              let mut cache = self.chunk_cache.write().await;
1352  
1353              if !pending_chunks.is_empty() || !cache.is_empty() {
1354                  debug_log!(
1355                      "[DHT DEBUG] Attempting to reassemble {} chunked messages ({} cached)",
1356                      pending_chunks.len(),
1357                      cache.len()
1358                  );
1359              }
1360  
1361              // Merge fetched chunks into the cache
1362              for (msg_id, fetched_chunks) in pending_chunks {
1363                  let entry = cache.entry(msg_id).or_insert_with(|| (HashMap::new(), Instant::now()));
1364                  entry.0.extend(fetched_chunks);
1365              }
1366  
1367              // Try to reassemble all cached chunk sets
1368              let mut completed = Vec::new();
1369              for (msg_id, (chunk_map, _)) in cache.iter() {
1370                  let chunks: Vec<MessageChunk> = chunk_map.values().cloned().collect();
1371                  if let Some(reassembled) = reassemble_chunks(&chunks) {
1372                      if seen_ids.insert(reassembled.message_id) {
1373                          all_messages.push(reassembled);
1374                      }
1375                      completed.push(*msg_id);
1376                  } else {
1377                      let have = chunks.len();
1378                      let total = chunks.first().map(|c| c.total_chunks).unwrap_or(0);
1379                      debug_log!(
1380                          "[DHT DEBUG] Caching incomplete message {}, have {}/{} chunks",
1381                          hex::encode(msg_id),
1382                          have,
1383                          total
1384                      );
1385                  }
1386              }
1387  
1388              // Remove successfully reassembled entries
1389              for msg_id in &completed {
1390                  cache.remove(msg_id);
1391              }
1392  
1393              // Expire stale cache entries
1394              cache.retain(|msg_id, (_, first_seen)| {
1395                  let keep = first_seen.elapsed() < CHUNK_CACHE_TTL;
1396                  if !keep {
1397                      debug_log!(
1398                          "[DHT DEBUG] Expiring cached chunks for message {}",
1399                          hex::encode(msg_id)
1400                      );
1401                  }
1402                  keep
1403              });
1404          }
1405  
1406          debug_log!(
1407              "[DHT INFO] Fetched {} unique messages from {} slots ({} ranges)",
1408              all_messages.len(),
1409              total_slots_fetched,
1410              chain_depth
1411          );
1412          all_messages
1413      }
1414  
1415      /// Fetch messages from a single DHT slot.
1416      #[allow(dead_code)]
1417      async fn fetch_slot(
1418          &self,
1419          our_public_key: [u8; 32],
1420          slot: usize,
1421      ) -> TransportResult<Vec<DhtStoredMessage>> {
1422          let dht_guard = self.dht.read().await;
1423          let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?;
1424  
1425          let signing_key = derive_ed25519_from_x25519(&our_public_key);
1426          let public_key = signing_key.verifying_key();
1427          let public_key_bytes: [u8; 32] = public_key.to_bytes();
1428  
1429          let dht_clone = dht.clone();
1430          let salt = generate_slot_salt(slot);
1431  
1432          match tokio::time::timeout(
1433              DEFAULT_OPERATION_TIMEOUT,
1434              tokio::task::spawn_blocking(move || {
1435                  let mut iter = dht_clone.get_mutable(&public_key_bytes, Some(salt.as_ref()), None);
1436                  iter.next()
1437              }),
1438          )
1439          .await
1440          {
1441              Ok(Ok(Some(item))) => {
1442                  let messages = decode_messages(item.value());
1443                  Ok(messages.into_iter().filter(|m| !m.is_expired()).collect())
1444              }
1445              Ok(Ok(None)) => Ok(Vec::new()),
1446              Ok(Err(e)) => Err(TransportError::Internal(format!("Task failed: {:?}", e))),
1447              Err(_) => Err(TransportError::Timeout),
1448          }
1449      }
1450  
1451      /// Acknowledge messages (remove them from cache).
1452      ///
1453      /// After successfully receiving and storing messages locally,
1454      /// acknowledge them to prevent re-fetching. Remaining messages
1455      /// are redistributed across slots.
1456      ///
1457      /// # Arguments
1458      ///
1459      /// * `our_public_key` - Our X25519 public key
1460      /// * `message_ids` - IDs of messages to acknowledge
1461      pub async fn acknowledge_messages(
1462          &self,
1463          our_public_key: [u8; 32],
1464          message_ids: &[[u8; 16]],
1465      ) -> TransportResult<()> {
1466          if !*self.initialized.read().await {
1467              return Err(TransportError::NotInitialized);
1468          }
1469  
1470          if message_ids.is_empty() {
1471              return Ok(());
1472          }
1473  
1474          // Remove from local cache
1475          let mut cache = self.message_cache.write().await;
1476          if let Some(messages) = cache.get_mut(&our_public_key) {
1477              messages.retain(|m| !message_ids.contains(&m.message_id));
1478          }
1479  
1480          // Republish remaining messages to DHT across slots
1481          let remaining = cache.get(&our_public_key).cloned().unwrap_or_default();
1482          drop(cache);
1483  
1484          if !remaining.is_empty() {
1485              // Redistribute remaining messages across slots
1486              let (slot_assignments, _chunked) = self.distribute_messages_to_slots(&remaining);
1487              for (slot, slot_messages) in slot_assignments.into_iter().enumerate() {
1488                  if !slot_messages.is_empty() {
1489                      let value = bencode_messages(&slot_messages);
1490                      if let Err(e) = self.put_mutable_item_to_slot(&our_public_key, &value, slot).await {
1491                          debug_log!("[DHT WARN] Failed to republish slot {} after acknowledge: {:?}", slot, e);
1492                      }
1493                  }
1494              }
1495          }
1496  
1497          Ok(())
1498      }
1499  
1500      /// Get the count of cached messages for a recipient.
1501      pub async fn cached_message_count(&self, recipient_key: [u8; 32]) -> usize {
1502          let cache = self.message_cache.read().await;
1503          cache.get(&recipient_key).map(|v| v.len()).unwrap_or(0)
1504      }
1505  
1506      /// Clear expired messages from the cache.
1507      pub async fn cleanup_expired_messages(&self) {
1508          let mut cache = self.message_cache.write().await;
1509          for messages in cache.values_mut() {
1510              messages.retain(|m| !m.is_expired());
1511          }
1512          // Remove empty entries
1513          cache.retain(|_, v| !v.is_empty());
1514      }
1515  
1516      // ========================================================================
1517      // RENDEZVOUS: Publish/fetch .onion addresses via DHT
1518      // ========================================================================
1519  
1520      /// Publish our .onion address to the DHT for peer discovery.
1521      ///
1522      /// Stores the onion address in a BEP44 mutable item at a dedicated
1523      /// rendezvous salt, signed with our derived ed25519 key. Contacts who
1524      /// know our public key can fetch this to establish direct Tor connections.
1525      pub async fn publish_onion_address(
1526          &self,
1527          our_public_key: [u8; 32],
1528          onion_address: &str,
1529      ) -> TransportResult<()> {
1530          if !*self.initialized.read().await {
1531              return Err(TransportError::NotInitialized);
1532          }
1533  
1534          let _put_lock = self.put_mutex.lock().await;
1535  
1536          let dht_guard = self.dht.read().await;
1537          let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?;
1538  
1539          let signing_key = derive_ed25519_from_x25519(&our_public_key);
1540  
1541          let seq = SystemTime::now()
1542              .duration_since(UNIX_EPOCH)
1543              .unwrap()
1544              .as_secs() as i64;
1545  
1546          // Bencode the rendezvous value: { "a": "<onion_address>", "t": <timestamp> }
1547          let value = serde_bencode::to_bytes(&RendezvousValue {
1548              address: onion_address.to_string(),
1549              timestamp: seq as u64,
1550          })
1551          .map_err(|e| TransportError::Internal(format!("Failed to encode rendezvous: {}", e)))?;
1552  
1553          if value.len() > MAX_DHT_VALUE_SIZE {
1554              return Err(TransportError::Internal(
1555                  "Rendezvous value too large for DHT".to_string(),
1556              ));
1557          }
1558  
1559          let salt = Bytes::from(DHT_RENDEZVOUS_SALT.as_bytes().to_vec());
1560          let item = MutableItem::new(signing_key, &value, seq, Some(salt.as_ref()));
1561  
1562          debug_log!(
1563              "[DHT RENDEZVOUS] Publishing onion address: {} ({} bytes, seq={})",
1564              onion_address,
1565              value.len(),
1566              seq
1567          );
1568  
1569          // Retry with exponential backoff (same as slot publishing)
1570          let mut last_error = None;
1571          for attempt in 0..MAX_RETRY_ATTEMPTS {
1572              if attempt > 0 {
1573                  let delay = RETRY_BASE_DELAY * (1 << (attempt - 1)); // 1s, 2s, 4s
1574                  tokio::time::sleep(delay).await;
1575                  debug_log!("[DHT RENDEZVOUS] Retry attempt {} for publish", attempt + 1);
1576              }
1577  
1578              let dht_clone = dht.clone();
1579              let item_clone = item.clone();
1580              match tokio::task::spawn_blocking(move || dht_clone.put_mutable(item_clone, None)).await
1581              {
1582                  Ok(Ok(_)) => {
1583                      debug_log!(
1584                          "[DHT RENDEZVOUS] Published successfully (attempt {})",
1585                          attempt + 1
1586                      );
1587                      tokio::time::sleep(Duration::from_millis(100)).await;
1588                      return Ok(());
1589                  }
1590                  Ok(Err(e)) => {
1591                      debug_log!(
1592                          "[DHT RENDEZVOUS] Publish failed (attempt {}): {:?}",
1593                          attempt + 1,
1594                          e
1595                      );
1596                      last_error = Some(format!("DHT PUT rendezvous failed: {:?}", e));
1597                  }
1598                  Err(e) => {
1599                      debug_log!(
1600                          "[DHT RENDEZVOUS] Task failed (attempt {}): {:?}",
1601                          attempt + 1,
1602                          e
1603                      );
1604                      last_error = Some(format!("Task failed: {:?}", e));
1605                  }
1606              }
1607          }
1608  
1609          Err(TransportError::Internal(
1610              last_error.unwrap_or_else(|| "Unknown error".to_string()),
1611          ))
1612      }
1613  
1614      /// Fetch a contact's .onion address from the DHT.
1615      ///
1616      /// Looks up the rendezvous slot for the given contact's public key.
1617      /// Returns the onion address if found and not expired.
1618      pub async fn fetch_onion_address(
1619          &self,
1620          contact_public_key: [u8; 32],
1621      ) -> TransportResult<Option<String>> {
1622          if !*self.initialized.read().await {
1623              return Err(TransportError::NotInitialized);
1624          }
1625  
1626          let dht_guard = self.dht.read().await;
1627          let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?;
1628  
1629          let signing_key = derive_ed25519_from_x25519(&contact_public_key);
1630          let verifying_key = signing_key.verifying_key();
1631          let pk_bytes: [u8; 32] = verifying_key.to_bytes();
1632          let salt = Bytes::from(DHT_RENDEZVOUS_SALT.as_bytes().to_vec());
1633  
1634          let dht_clone = dht.clone();
1635          let salt_ref = salt.to_vec();
1636  
1637          let result = tokio::task::spawn_blocking(move || {
1638              let mut iter = dht_clone.get_mutable(&pk_bytes, Some(&salt_ref), None);
1639              iter.next()
1640          })
1641          .await
1642          .map_err(|e| TransportError::Internal(format!("Task join error: {:?}", e)))?;
1643  
1644          match result {
1645              Some(item) => {
1646                  let value_bytes = item.value().to_vec();
1647                  match serde_bencode::from_bytes::<RendezvousValue>(&value_bytes) {
1648                      Ok(rv) => {
1649                          // Check freshness: reject if older than 1 hour
1650                          let now = SystemTime::now()
1651                              .duration_since(UNIX_EPOCH)
1652                              .unwrap()
1653                              .as_secs();
1654                          if now - rv.timestamp > 3600 {
1655                              debug_log!(
1656                                  "[DHT RENDEZVOUS] Stale address for {}: age={}s",
1657                                  hex::encode(&contact_public_key[..8]),
1658                                  now - rv.timestamp
1659                              );
1660                              return Ok(None);
1661                          }
1662  
1663                          debug_log!(
1664                              "[DHT RENDEZVOUS] Found address for {}: {}",
1665                              hex::encode(&contact_public_key[..8]),
1666                              rv.address
1667                          );
1668                          Ok(Some(rv.address))
1669                      }
1670                      Err(e) => {
1671                          debug_log!("[DHT RENDEZVOUS] Failed to decode: {:?}", e);
1672                          Ok(None)
1673                      }
1674                  }
1675              }
1676              None => {
1677                  debug_log!(
1678                      "[DHT RENDEZVOUS] No address found for {}",
1679                      hex::encode(&contact_public_key[..8])
1680                  );
1681                  Ok(None)
1682              }
1683          }
1684      }
1685  
1686      /// Publish our iroh endpoint ID to the DHT for peer discovery.
1687      ///
1688      /// Uses a separate salt from Tor rendezvous so both can coexist.
1689      pub async fn publish_iroh_endpoint_id(
1690          &self,
1691          our_public_key: [u8; 32],
1692          endpoint_id: &str,
1693      ) -> TransportResult<()> {
1694          if !*self.initialized.read().await {
1695              return Err(TransportError::NotInitialized);
1696          }
1697  
1698          let _put_lock = self.put_mutex.lock().await;
1699  
1700          let dht_guard = self.dht.read().await;
1701          let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?;
1702  
1703          let signing_key = derive_ed25519_from_x25519(&our_public_key);
1704  
1705          let seq = SystemTime::now()
1706              .duration_since(UNIX_EPOCH)
1707              .unwrap()
1708              .as_secs() as i64;
1709  
1710          // Reuse RendezvousValue format: { "a": "<endpoint_id>", "t": <timestamp> }
1711          let value = serde_bencode::to_bytes(&RendezvousValue {
1712              address: endpoint_id.to_string(),
1713              timestamp: seq as u64,
1714          })
1715          .map_err(|e| TransportError::Internal(format!("Failed to encode iroh rendezvous: {}", e)))?;
1716  
1717          if value.len() > MAX_DHT_VALUE_SIZE {
1718              return Err(TransportError::Internal(
1719                  "Iroh rendezvous value too large for DHT".to_string(),
1720              ));
1721          }
1722  
1723          let salt = Bytes::from(DHT_IROH_RENDEZVOUS_SALT.as_bytes().to_vec());
1724          let item = MutableItem::new(signing_key, &value, seq, Some(salt.as_ref()));
1725  
1726          debug_log!(
1727              "[DHT IROH] Publishing endpoint ID: {} ({} bytes, seq={})",
1728              endpoint_id,
1729              value.len(),
1730              seq
1731          );
1732  
1733          // Retry with exponential backoff
1734          let mut last_error = None;
1735          for attempt in 0..MAX_RETRY_ATTEMPTS {
1736              if attempt > 0 {
1737                  let delay = RETRY_BASE_DELAY * (1 << (attempt - 1));
1738                  tokio::time::sleep(delay).await;
1739                  debug_log!("[DHT IROH] Retry attempt {} for publish", attempt + 1);
1740              }
1741  
1742              let dht_clone = dht.clone();
1743              let item_clone = item.clone();
1744              match tokio::task::spawn_blocking(move || dht_clone.put_mutable(item_clone, None)).await
1745              {
1746                  Ok(Ok(_)) => {
1747                      debug_log!(
1748                          "[DHT IROH] Published successfully (attempt {})",
1749                          attempt + 1
1750                      );
1751                      tokio::time::sleep(Duration::from_millis(100)).await;
1752                      return Ok(());
1753                  }
1754                  Ok(Err(e)) => {
1755                      debug_log!(
1756                          "[DHT IROH] Publish failed (attempt {}): {:?}",
1757                          attempt + 1,
1758                          e
1759                      );
1760                      last_error = Some(format!("DHT PUT iroh rendezvous failed: {:?}", e));
1761                  }
1762                  Err(e) => {
1763                      debug_log!(
1764                          "[DHT IROH] Task failed (attempt {}): {:?}",
1765                          attempt + 1,
1766                          e
1767                      );
1768                      last_error = Some(format!("Task failed: {:?}", e));
1769                  }
1770              }
1771          }
1772  
1773          Err(TransportError::Internal(
1774              last_error.unwrap_or_else(|| "Unknown error".to_string()),
1775          ))
1776      }
1777  
1778      /// Fetch a contact's iroh endpoint ID from the DHT.
1779      ///
1780      /// Returns the endpoint ID if found and not expired (1-hour freshness).
1781      pub async fn fetch_iroh_endpoint_id(
1782          &self,
1783          contact_public_key: [u8; 32],
1784      ) -> TransportResult<Option<String>> {
1785          if !*self.initialized.read().await {
1786              return Err(TransportError::NotInitialized);
1787          }
1788  
1789          let dht_guard = self.dht.read().await;
1790          let dht = dht_guard.as_ref().ok_or(TransportError::NotInitialized)?;
1791  
1792          let signing_key = derive_ed25519_from_x25519(&contact_public_key);
1793          let verifying_key = signing_key.verifying_key();
1794          let pk_bytes: [u8; 32] = verifying_key.to_bytes();
1795          let salt = Bytes::from(DHT_IROH_RENDEZVOUS_SALT.as_bytes().to_vec());
1796  
1797          let dht_clone = dht.clone();
1798          let salt_ref = salt.to_vec();
1799  
1800          // Take the first response from the DHT. BEP44 nodes should prefer
1801          // higher sequence numbers, so the first response is typically the latest.
1802          let result = tokio::task::spawn_blocking(move || {
1803              let mut iter = dht_clone.get_mutable(&pk_bytes, Some(&salt_ref), None);
1804              iter.next()
1805          })
1806          .await
1807          .map_err(|e| TransportError::Internal(format!("Task join error: {:?}", e)))?;
1808  
1809          match result {
1810              Some(item) => {
1811                  let value_bytes = item.value().to_vec();
1812                  match serde_bencode::from_bytes::<RendezvousValue>(&value_bytes) {
1813                      Ok(rv) => {
1814                          let now = SystemTime::now()
1815                              .duration_since(UNIX_EPOCH)
1816                              .unwrap()
1817                              .as_secs();
1818                          if now - rv.timestamp > 3600 {
1819                              debug_log!(
1820                                  "[DHT IROH] Stale endpoint ID for {}: age={}s",
1821                                  hex::encode(&contact_public_key[..8]),
1822                                  now - rv.timestamp
1823                              );
1824                              return Ok(None);
1825                          }
1826  
1827                          debug_log!(
1828                              "[DHT IROH] Found endpoint ID for {} (seq={}): {}",
1829                              hex::encode(&contact_public_key[..8]),
1830                              item.seq(),
1831                              rv.address
1832                          );
1833                          Ok(Some(rv.address))
1834                      }
1835                      Err(e) => {
1836                          debug_log!("[DHT IROH] Failed to decode: {:?}", e);
1837                          Ok(None)
1838                      }
1839                  }
1840              }
1841              None => {
1842                  debug_log!(
1843                      "[DHT IROH] No endpoint ID found for {}",
1844                      hex::encode(&contact_public_key[..8])
1845                  );
1846                  Ok(None)
1847              }
1848          }
1849      }
1850  }
1851  
1852  impl Default for DhtTransport {
1853      fn default() -> Self {
1854          Self::new(DhtConfig::default())
1855      }
1856  }
1857  
1858  impl Clone for DhtTransport {
1859      fn clone(&self) -> Self {
1860          Self {
1861              config: self.config.clone(),
1862              state: Arc::clone(&self.state),
1863              initialized: Arc::clone(&self.initialized),
1864              dht: Arc::clone(&self.dht),
1865              message_cache: Arc::clone(&self.message_cache),
1866              put_mutex: Arc::clone(&self.put_mutex),
1867              chunk_cache: Arc::clone(&self.chunk_cache),
1868          }
1869      }
1870  }
1871  
1872  impl std::fmt::Debug for DhtTransport {
1873      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1874          f.debug_struct("DhtTransport")
1875              .field("config", &self.config)
1876              .finish()
1877      }
1878  }
1879  
1880  #[async_trait]
1881  impl Transport for DhtTransport {
1882      async fn initialize(&mut self) -> TransportResult<()> {
1883          if self.config.bootstrap_nodes.is_empty() {
1884              return Err(TransportError::NotAvailable(
1885                  "No DHT bootstrap nodes configured".to_string(),
1886              ));
1887          }
1888  
1889          // Update state
1890          {
1891              let mut state = self.state.write().await;
1892              *state = DhtState::Bootstrapping;
1893          }
1894  
1895          debug_log!(
1896              "[DHT INFO] Initializing Mainline DHT with {} bootstrap nodes",
1897              self.config.bootstrap_nodes.len()
1898          );
1899  
1900          // Create DHT client with a random port (0 = OS assigns available port)
1901          // This avoids conflicts with other apps using the default port 6881
1902          let dht = match Dht::builder().port(0).build() {
1903              Ok(dht) => dht,
1904              Err(e) => {
1905                  let mut state = self.state.write().await;
1906                  *state = DhtState::Failed {
1907                      error: format!("Failed to create DHT client: {:?}", e),
1908                  };
1909                  return Err(TransportError::Internal(format!(
1910                      "Failed to create DHT: {:?}",
1911                      e
1912                  )));
1913              }
1914          };
1915  
1916          // Store DHT client
1917          {
1918              let mut dht_guard = self.dht.write().await;
1919              *dht_guard = Some(dht.clone());
1920          }
1921  
1922          // Wait for connection to complete using the proper v5 API
1923          let dht_clone = dht.clone();
1924          let bootstrap_result = tokio::task::spawn_blocking(move || {
1925              debug_log!("[DHT INFO] Connecting to DHT network...");
1926              let success = dht_clone.bootstrapped();
1927              debug_log!("[DHT INFO] Connection result: success={}", success);
1928              success
1929          })
1930          .await;
1931  
1932          let bootstrap_success = match bootstrap_result {
1933              Ok(true) => {
1934                  debug_log!("[DHT INFO] Connection complete - DHT network reachable");
1935                  true
1936              }
1937              Ok(false) => {
1938                  debug_log!("[DHT WARN] Connection failed - routing table may be empty");
1939                  false
1940              }
1941              Err(e) => {
1942                  debug_log!("[DHT WARN] Connection task failed: {:?}", e);
1943                  false
1944              }
1945          };
1946  
1947          // Get node info for diagnostics
1948          let dht_clone = dht.clone();
1949          if let Ok(info) = tokio::task::spawn_blocking(move || dht_clone.info()).await {
1950              debug_log!("[DHT INFO] Node info: local_addr={:?}, firewalled={}",
1951                  info.local_addr(), info.firewalled());
1952          }
1953  
1954          if bootstrap_success {
1955              // Mark as connected
1956              {
1957                  let mut state = self.state.write().await;
1958                  *state = DhtState::Connected { peer_count: 0 };
1959              }
1960  
1961              // Mark as initialized
1962              {
1963                  let mut initialized = self.initialized.write().await;
1964                  *initialized = true;
1965              }
1966  
1967              debug_log!("[DHT INFO] DHT transport initialized and ready");
1968              Ok(())
1969          } else {
1970              // Bootstrap failed - mark as failed and return error
1971              {
1972                  let mut state = self.state.write().await;
1973                  *state = DhtState::Failed {
1974                      error: "Connection failed - could not join DHT network".to_string(),
1975                  };
1976              }
1977  
1978              debug_log!("[DHT WARN] DHT transport failed to connect");
1979              Err(TransportError::NotAvailable(
1980                  "DHT connection failed - routing table empty".to_string(),
1981              ))
1982          }
1983      }
1984  
1985      async fn shutdown(&mut self) -> TransportResult<()> {
1986          // Drop DHT client
1987          {
1988              let mut dht_guard = self.dht.write().await;
1989              *dht_guard = None;
1990          }
1991  
1992          {
1993              let mut initialized = self.initialized.write().await;
1994              *initialized = false;
1995          }
1996  
1997          {
1998              let mut state = self.state.write().await;
1999              *state = DhtState::Disconnected;
2000          }
2001  
2002          debug_log!("[DHT INFO] DHT transport shut down");
2003          Ok(())
2004      }
2005  
2006      async fn send(&self, _recipient: &TransportAddress, _message: &[u8]) -> TransportResult<()> {
2007          // DHT is not used for direct message sending
2008          // Use publish_messages() instead for DHT storage
2009          Err(TransportError::NotAvailable(
2010              "DHT transport is for storage only, not direct sending. Use publish_messages().".to_string(),
2011          ))
2012      }
2013  
2014      async fn receive(&self) -> TransportResult<(TransportAddress, Vec<u8>)> {
2015          // DHT is not used for receiving messages directly
2016          // Use fetch_messages() instead for DHT retrieval
2017          Err(TransportError::NotAvailable(
2018              "DHT transport is for storage only, not direct receiving. Use fetch_messages().".to_string(),
2019          ))
2020      }
2021  
2022      fn is_available(&self) -> bool {
2023          self.initialized.try_read().map(|g| *g).unwrap_or(false)
2024      }
2025  
2026      fn priority(&self) -> u8 {
2027          // DHT is used for storage, lower priority than direct transports
2028          20
2029      }
2030  
2031      fn transport_type(&self) -> TransportType {
2032          TransportType::DHT
2033      }
2034  
2035      fn name(&self) -> &str {
2036          "DHT (Mainline BEP44)"
2037      }
2038  
2039      fn local_address(&self) -> Option<TransportAddress> {
2040          // DHT doesn't have a local address for messaging
2041          None
2042      }
2043  }
2044  
2045  #[cfg(test)]
2046  mod tests {
2047      use super::*;
2048  
2049      #[test]
2050      fn test_dht_config_default() {
2051          let config = DhtConfig::default();
2052          assert!(!config.bootstrap_nodes.is_empty());
2053          assert!(config.bootstrap_nodes[0].contains("bittorrent"));
2054          assert_eq!(config.query_timeout_secs, 30);
2055      }
2056  
2057      #[test]
2058      fn test_dht_transport_new() {
2059          let transport = DhtTransport::new(DhtConfig::default());
2060          assert!(!transport.is_available());
2061          assert_eq!(transport.transport_type(), TransportType::DHT);
2062          assert_eq!(transport.priority(), 20);
2063          assert_eq!(transport.name(), "DHT (Mainline BEP44)");
2064      }
2065  
2066      #[tokio::test]
2067      async fn test_dht_transport_state() {
2068          let transport = DhtTransport::new(DhtConfig::default());
2069          let state = transport.state().await;
2070          assert_eq!(state, DhtState::Disconnected);
2071      }
2072  
2073      #[tokio::test]
2074      async fn test_dht_transport_no_bootstrap() {
2075          let config = DhtConfig {
2076              bootstrap_nodes: vec![],
2077              ..Default::default()
2078          };
2079          let mut transport = DhtTransport::new(config);
2080          let result = transport.initialize().await;
2081          assert!(matches!(result, Err(TransportError::NotAvailable(_))));
2082      }
2083  
2084      #[test]
2085      fn test_dht_stored_message_new() {
2086          let msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3, 4], None);
2087  
2088          assert_eq!(msg.message_id, [0x01; 16]);
2089          assert_eq!(msg.sender_key, [0xAB; 32]);
2090          assert_eq!(msg.payload, vec![1, 2, 3, 4]);
2091          assert!(!msg.is_expired());
2092          assert!(msg.expires_at > msg.timestamp);
2093          assert_eq!(msg.expires_at - msg.timestamp, DEFAULT_MESSAGE_TTL_SECS);
2094      }
2095  
2096      #[test]
2097      fn test_dht_stored_message_custom_ttl() {
2098          let msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3, 4], Some(3600));
2099  
2100          assert_eq!(msg.expires_at - msg.timestamp, 3600);
2101      }
2102  
2103      #[test]
2104      fn test_dht_stored_message_serialize_deserialize() {
2105          let msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3, 4, 5], None);
2106  
2107          let serialized = msg.serialize().unwrap();
2108          let deserialized = DhtStoredMessage::deserialize(&serialized).unwrap();
2109  
2110          assert_eq!(msg.message_id, deserialized.message_id);
2111          assert_eq!(msg.sender_key, deserialized.sender_key);
2112          assert_eq!(msg.payload, deserialized.payload);
2113          assert_eq!(msg.timestamp, deserialized.timestamp);
2114          assert_eq!(msg.expires_at, deserialized.expires_at);
2115      }
2116  
2117      #[test]
2118      fn test_derive_ed25519_deterministic() {
2119          let x25519_key = [0xAB; 32];
2120  
2121          let signing_key1 = derive_ed25519_from_x25519(&x25519_key);
2122          let signing_key2 = derive_ed25519_from_x25519(&x25519_key);
2123  
2124          assert_eq!(signing_key1.to_bytes(), signing_key2.to_bytes());
2125      }
2126  
2127      #[test]
2128      fn test_derive_ed25519_different_keys() {
2129          let key1 = [0xAB; 32];
2130          let key2 = [0xCD; 32];
2131  
2132          let signing_key1 = derive_ed25519_from_x25519(&key1);
2133          let signing_key2 = derive_ed25519_from_x25519(&key2);
2134  
2135          assert_ne!(signing_key1.to_bytes(), signing_key2.to_bytes());
2136      }
2137  
2138      #[test]
2139      fn test_bencode_roundtrip() {
2140          let messages = vec![
2141              DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3], None),
2142              DhtStoredMessage::new([0x02; 16], [0xCD; 32], vec![4, 5, 6], Some(3600)),
2143          ];
2144  
2145          let encoded = bencode_messages(&messages);
2146          let decoded = decode_messages(&encoded);
2147  
2148          assert_eq!(decoded.len(), 2);
2149          assert_eq!(decoded[0].message_id, [0x01; 16]);
2150          assert_eq!(decoded[0].sender_key, [0xAB; 32]);
2151          assert_eq!(decoded[0].payload, vec![1, 2, 3]);
2152          assert_eq!(decoded[1].message_id, [0x02; 16]);
2153          assert_eq!(decoded[1].sender_key, [0xCD; 32]);
2154          assert_eq!(decoded[1].payload, vec![4, 5, 6]);
2155      }
2156  
2157      #[test]
2158      fn test_decode_invalid_data() {
2159          let invalid = b"not valid bencode";
2160          let decoded = decode_messages(invalid);
2161          assert!(decoded.is_empty());
2162      }
2163  
2164      #[tokio::test]
2165      async fn test_dht_transport_message_not_initialized() {
2166          let transport = DhtTransport::new(DhtConfig::default());
2167  
2168          let result = transport.publish_messages([0xCD; 32], vec![]).await;
2169          assert!(matches!(result, Err(TransportError::NotInitialized)));
2170  
2171          let result = transport.fetch_messages([0xCD; 32]).await;
2172          assert!(matches!(result, Err(TransportError::NotInitialized)));
2173      }
2174  
2175      #[test]
2176      fn test_generate_slot_salt() {
2177          let salt0 = generate_slot_salt(0);
2178          let salt1 = generate_slot_salt(1);
2179          let salt9 = generate_slot_salt(9);
2180  
2181          assert_eq!(salt0.as_ref(), b"deaddrop/v1/slot/0");
2182          assert_eq!(salt1.as_ref(), b"deaddrop/v1/slot/1");
2183          assert_eq!(salt9.as_ref(), b"deaddrop/v1/slot/9");
2184      }
2185  
2186      #[test]
2187      fn test_distribute_messages_to_slots_single() {
2188          let transport = DhtTransport::new(DhtConfig::default());
2189          let messages = vec![
2190              DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3], None),
2191          ];
2192  
2193          let (slots, _) = transport.distribute_messages_to_slots(&messages);
2194  
2195          assert_eq!(slots.len(), DHT_MIN_SLOTS); // Single message uses minimum slots
2196          // Single message should go to first available slot
2197          assert_eq!(slots[0].len(), 1);
2198          for i in 1..DHT_MIN_SLOTS {
2199              assert!(slots[i].is_empty());
2200          }
2201      }
2202  
2203      #[test]
2204      fn test_distribute_messages_to_slots_multiple() {
2205          let transport = DhtTransport::new(DhtConfig::default());
2206          let messages: Vec<DhtStoredMessage> = (0..5)
2207              .map(|i| {
2208                  let mut id = [0u8; 16];
2209                  id[0] = i as u8;
2210                  DhtStoredMessage::new(id, [0xAB; 32], vec![1, 2, 3], None)
2211              })
2212              .collect();
2213  
2214          let (slots, _) = transport.distribute_messages_to_slots(&messages);
2215  
2216          // 5 small messages should fit in early slots
2217          let total: usize = slots.iter().map(|s| s.len()).sum();
2218          assert_eq!(total, 5);
2219      }
2220  
2221      #[test]
2222      fn test_distribute_messages_to_slots_across_slots() {
2223          let transport = DhtTransport::new(DhtConfig::default());
2224          // Create small messages that will fit multiple per slot
2225          // Empty payload = ~70 bytes per message (id + sender_key + expires_at + bencode overhead)
2226          let payload = vec![]; // Empty payload for small messages
2227          let messages: Vec<DhtStoredMessage> = (0..50)
2228              .map(|i| {
2229                  let mut id = [0u8; 16];
2230                  id[0] = i as u8;
2231                  DhtStoredMessage::new(id, [0xAB; 32], payload.clone(), None)
2232              })
2233              .collect();
2234  
2235          let (slots, _) = transport.distribute_messages_to_slots(&messages);
2236  
2237          // 50 small messages should spread across multiple slots
2238          let non_empty_slots = slots.iter().filter(|s| !s.is_empty()).count();
2239          assert!(non_empty_slots >= 1, "Should use at least 1 slot");
2240  
2241          // Count total messages distributed
2242          let total: usize = slots.iter().map(|s| s.len()).sum();
2243          // At least most messages should fit (10 slots * ~10 msgs per slot = ~100)
2244          assert!(total >= 40, "At least 40 of 50 small messages should fit");
2245      }
2246  
2247      #[test]
2248      fn test_bencode_message_size() {
2249          // Verify encoding sizes for capacity planning
2250          let small_msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![], None);
2251          let small_encoded = bencode_messages(&[small_msg]);
2252          debug_log!("Small message encoded size: {} bytes", small_encoded.len());
2253          // Small message with empty payload - verify it's reasonable
2254          assert!(small_encoded.len() < 500, "Small message should be < 500 bytes");
2255  
2256          let medium_msg = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![0u8; 100], None);
2257          let medium_encoded = bencode_messages(&[medium_msg]);
2258          debug_log!("Medium message encoded size: {} bytes", medium_encoded.len());
2259          // Medium message (100 byte payload)
2260          assert!(medium_encoded.len() < 600, "Medium message should be < 600 bytes");
2261  
2262          // With 950 byte limit and ~200 byte messages, we can fit ~4-5 per slot
2263          // With 10 slots, that's 40-50 messages total capacity
2264      }
2265  
2266      #[test]
2267      fn test_slot_count_constants() {
2268          assert_eq!(DHT_MIN_SLOTS, 20);
2269          assert_eq!(DHT_MAX_SLOTS, 100);
2270          assert_eq!(MAX_DHT_VALUE_SIZE, 950);
2271      }
2272  
2273      #[test]
2274      fn test_calculate_slot_count() {
2275          // Small message counts use minimum slots
2276          assert_eq!(calculate_slot_count(1), DHT_MIN_SLOTS);
2277          assert_eq!(calculate_slot_count(10), DHT_MIN_SLOTS);
2278          assert_eq!(calculate_slot_count(20), DHT_MIN_SLOTS);
2279  
2280          // Large message counts scale up
2281          assert_eq!(calculate_slot_count(50), 50);
2282          assert_eq!(calculate_slot_count(89), 89);
2283  
2284          // Capped at max slots
2285          assert_eq!(calculate_slot_count(100), DHT_MAX_SLOTS);
2286          assert_eq!(calculate_slot_count(200), DHT_MAX_SLOTS);
2287      }
2288  
2289      #[test]
2290      fn test_chunk_encode_decode() {
2291          let chunk = MessageChunk {
2292              message_id: [0x01; 16],
2293              total_chunks: 3,
2294              chunk_index: 1,
2295              payload: vec![1, 2, 3, 4, 5],
2296          };
2297  
2298          let encoded = encode_chunk(&chunk);
2299          assert!(encoded.starts_with(CHUNKED_MAGIC));
2300  
2301          let decoded = decode_chunk(&encoded).unwrap();
2302          assert_eq!(decoded.message_id, chunk.message_id);
2303          assert_eq!(decoded.total_chunks, chunk.total_chunks);
2304          assert_eq!(decoded.chunk_index, chunk.chunk_index);
2305          assert_eq!(decoded.payload, chunk.payload);
2306      }
2307  
2308      #[test]
2309      fn test_chunk_decode_invalid() {
2310          // Too short
2311          assert!(decode_chunk(&[0u8; 10]).is_none());
2312  
2313          // Wrong magic
2314          assert!(decode_chunk(&[0xFF; 30]).is_none());
2315      }
2316  
2317      #[test]
2318      fn test_split_and_reassemble_large_message() {
2319          // Create a large message that would exceed slot size
2320          let large_payload = vec![0xAB; 2000]; // 2KB payload
2321          let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], large_payload.clone(), None);
2322  
2323          // Split into chunks
2324          let chunks = split_message_into_chunks(&msg);
2325          assert!(chunks.len() > 1, "Should split into multiple chunks");
2326          debug_log!("Large message split into {} chunks", chunks.len());
2327  
2328          // Verify all chunks have correct metadata
2329          for (i, chunk) in chunks.iter().enumerate() {
2330              assert_eq!(chunk.message_id, msg.message_id);
2331              assert_eq!(chunk.total_chunks, chunks.len() as u8);
2332              assert_eq!(chunk.chunk_index, i as u8);
2333          }
2334  
2335          // Reassemble
2336          let reassembled = reassemble_chunks(&chunks).unwrap();
2337          assert_eq!(reassembled.message_id, msg.message_id);
2338          assert_eq!(reassembled.sender_key, msg.sender_key);
2339          assert_eq!(reassembled.payload, large_payload);
2340      }
2341  
2342      #[test]
2343      fn test_reassemble_incomplete_chunks() {
2344          let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], vec![0xAB; 2000], None);
2345          let mut chunks = split_message_into_chunks(&msg);
2346  
2347          // Remove one chunk
2348          chunks.pop();
2349  
2350          // Should fail to reassemble
2351          assert!(reassemble_chunks(&chunks).is_none());
2352      }
2353  
2354      #[test]
2355      fn test_distribute_large_message_to_slots() {
2356          let transport = DhtTransport::new(DhtConfig::default());
2357  
2358          // Create a message that's too large for a single slot even after compression
2359          // Use XOR-based pseudo-random bytes that resist compression better
2360          // Need ~10KB to ensure even with compression it exceeds 950 bytes
2361          let mut large_payload = vec![0u8; 10000];
2362          let mut state: u32 = 0xDEADBEEF;
2363          for byte in large_payload.iter_mut() {
2364              // Simple xorshift PRNG for incompressible data
2365              state ^= state << 13;
2366              state ^= state >> 17;
2367              state ^= state << 5;
2368              *byte = state as u8;
2369          }
2370          let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], large_payload, None);
2371  
2372          let (slots, _) = transport.distribute_messages_to_slots(&[msg]);
2373  
2374          // Should have distributed chunks across slots
2375          let total_entries: usize = slots.iter().map(|s| s.len()).sum();
2376          assert!(total_entries > 1, "Large message should be split into multiple chunk entries");
2377  
2378          // All entries should be chunk markers
2379          for slot in slots.iter() {
2380              for entry in slot.iter() {
2381                  if entry.sender_key == [0xCE; 32] {
2382                      // This is a chunk entry
2383                      let chunk = decode_chunk(&entry.payload);
2384                      assert!(chunk.is_some(), "Chunk entry should be decodable");
2385                  }
2386              }
2387          }
2388      }
2389  
2390      #[test]
2391      fn test_compression_effectiveness() {
2392          // Test that compression works well on typical message data
2393          let repetitive_payload = vec![0xAB; 1000];
2394          let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], repetitive_payload, None);
2395  
2396          let encoded = bencode_messages(&[msg]);
2397          // Repetitive data should compress well (< 200 bytes)
2398          assert!(encoded.len() < 200, "Repetitive data should compress well, got {} bytes", encoded.len());
2399      }
2400  
2401      // =========================================================================
2402      // CONTINUATION CHAIN TESTS
2403      // =========================================================================
2404  
2405      #[test]
2406      fn test_continuation_encode_decode() {
2407          // Basic roundtrip
2408          let encoded = encode_continuation(100);
2409          assert_eq!(encoded.len(), 4);
2410          assert_eq!(&encoded[0..2], CONTINUATION_MAGIC);
2411          assert_eq!(decode_continuation(&encoded), Some(100));
2412  
2413          // Slot 200
2414          let encoded = encode_continuation(200);
2415          assert_eq!(decode_continuation(&encoded), Some(200));
2416  
2417          // Slot 0
2418          let encoded = encode_continuation(0);
2419          assert_eq!(decode_continuation(&encoded), Some(0));
2420  
2421          // Max u16 slot
2422          let encoded = encode_continuation(65535);
2423          assert_eq!(decode_continuation(&encoded), Some(65535));
2424      }
2425  
2426      #[test]
2427      fn test_continuation_decode_invalid() {
2428          // Too short
2429          assert_eq!(decode_continuation(&[b'C', b'N']), None);
2430          assert_eq!(decode_continuation(&[b'C']), None);
2431          assert_eq!(decode_continuation(&[]), None);
2432  
2433          // Wrong magic
2434          assert_eq!(decode_continuation(&[b'X', b'Y', 0, 100]), None);
2435      }
2436  
2437      #[test]
2438      fn test_is_continuation_message() {
2439          let continuation = make_continuation_message(100);
2440          assert!(is_continuation_message(&continuation));
2441          assert_eq!(continuation.sender_key, CONTINUATION_MARKER);
2442          assert_eq!(decode_continuation(&continuation.payload), Some(100));
2443  
2444          // Regular message should NOT be a continuation
2445          let regular = DhtStoredMessage::new([0x01; 16], [0xAB; 32], vec![1, 2, 3], None);
2446          assert!(!is_continuation_message(&regular));
2447  
2448          // Chunk marker should NOT be a continuation
2449          let chunk_msg = DhtStoredMessage {
2450              message_id: [0x01; 16],
2451              sender_key: [0xCE; 32],
2452              payload: vec![],
2453              timestamp: 0,
2454              expires_at: 0,
2455          };
2456          assert!(!is_continuation_message(&chunk_msg));
2457      }
2458  
2459      #[test]
2460      fn test_distribute_no_overflow_no_continuation() {
2461          // Small number of messages should NOT produce a continuation pointer
2462          let transport = DhtTransport::new(DhtConfig::default());
2463          let messages: Vec<DhtStoredMessage> = (0..5)
2464              .map(|i| {
2465                  let mut id = [0u8; 16];
2466                  id[0] = i as u8;
2467                  DhtStoredMessage::new(id, [0xAB; 32], vec![1, 2, 3], None)
2468              })
2469              .collect();
2470  
2471          let (slots, _) = transport.distribute_messages_to_slots(&messages);
2472  
2473          // Should fit in a single range (≤ DHT_MAX_SLOTS)
2474          assert!(slots.len() <= DHT_MAX_SLOTS, "Should not overflow into next range");
2475  
2476          // No continuation markers should exist
2477          for slot in &slots {
2478              for msg in slot {
2479                  assert!(!is_continuation_message(msg), "Should not have continuation pointer");
2480              }
2481          }
2482      }
2483  
2484      #[test]
2485      fn test_distribute_overflow_creates_continuation() {
2486          // Create enough chunk messages to overflow 100 slots
2487          // Each chunk gets a dedicated slot, so >99 chunks should trigger overflow
2488          // (slot 99 reserved for continuation pointer)
2489          let transport = DhtTransport::new(DhtConfig::default());
2490  
2491          // Create a very large message that produces >99 chunks
2492          // At 400 bytes per chunk payload, we need the serialized message to be >39,600 bytes
2493          // A 40KB payload should produce ~100+ chunks after serialization overhead
2494          let mut large_payload = vec![0u8; 45000];
2495          let mut state: u32 = 0xDEADBEEF;
2496          for byte in large_payload.iter_mut() {
2497              state ^= state << 13;
2498              state ^= state >> 17;
2499              state ^= state << 5;
2500              *byte = state as u8;
2501          }
2502          let msg = DhtStoredMessage::new([0x01; 16], [0xCD; 32], large_payload, None);
2503  
2504          let (slots, _) = transport.distribute_messages_to_slots(&[msg]);
2505  
2506          // Should overflow into more than 100 slots
2507          assert!(slots.len() > DHT_MAX_SLOTS, "Should chain to overflow range, got {} slots", slots.len());
2508  
2509          // Slot 99 (last in base range) should be a continuation pointer
2510          assert_eq!(slots[DHT_MAX_SLOTS - 1].len(), 1, "Slot 99 should have exactly 1 message (continuation)");
2511          let continuation = &slots[DHT_MAX_SLOTS - 1][0];
2512          assert!(is_continuation_message(continuation), "Slot 99 should be a continuation pointer");
2513          assert_eq!(
2514              decode_continuation(&continuation.payload),
2515              Some(DHT_MAX_SLOTS),
2516              "Continuation should point to slot 100"
2517          );
2518  
2519          // Overflow range should have content
2520          let overflow_content: usize = slots[DHT_MAX_SLOTS..].iter().map(|s| s.len()).sum();
2521          assert!(overflow_content > 0, "Overflow range should have chunks");
2522      }
2523  
2524      #[test]
2525      fn test_chain_depth_limit() {
2526          // MAX_CHAIN_DEPTH is 5, so max 500 slots
2527          assert_eq!(MAX_CHAIN_DEPTH, 5);
2528          // Each range is DHT_MAX_SLOTS (100), so max absolute slot is 499
2529      }
2530  }