/ node / src / p2p.rs
p2p.rs
   1  // Copyright (c) 2025 ADnet Contributors
   2  // SPDX-License-Identifier: Apache-2.0
   3  
   4  //! P2P Networking module for DELTA chain
   5  //!
   6  //! This module provides peer-to-peer networking functionality for the DELTA chain,
   7  //! integrating with the adnet-core TCP stack and implementing message handling
   8  //! for consensus, synchronization, and transaction propagation.
   9  
  10  use crate::config::P2PConfig;
  11  use crate::consensus::{
  12      BftConsensus, BftError, BftVote, CrossChainAttestation, DeltaBlock, DeltaTransaction,
  13      ViewChangeRequest,
  14  };
  15  use crate::storage::Storage;
  16  
  17  use adnet_core_tcp::protocols::{Reading, Writing};
  18  use adnet_core_tcp::{Config as TcpConfig, ConnectionSide, P2P, Tcp};
  19  
  20  use async_trait::async_trait;
  21  use bytes::{Buf, BufMut, BytesMut};
  22  use parking_lot::RwLock;
  23  use serde::{Deserialize, Serialize};
  24  use std::collections::{HashMap, HashSet};
  25  use std::io;
  26  use std::net::SocketAddr;
  27  use std::sync::Arc;
  28  use thiserror::Error;
  29  use tokio::sync::{mpsc, oneshot};
  30  use tokio_util::codec::{Decoder, Encoder};
  31  use tracing::{debug, error, info, trace, warn};
  32  
  33  // ============================================================================
  34  // P2P Message Types
  35  // ============================================================================
  36  
  37  /// P2P message types for DELTA chain communication
  38  #[derive(Clone, Debug, Serialize, Deserialize)]
  39  pub enum DeltaMessage {
  40      // Consensus messages
  41      /// Block proposal from the current round leader
  42      BlockProposal(DeltaBlock),
  43      /// Pre-vote for a proposed block
  44      PreVote(BftVote),
  45      /// Pre-commit for a voted block
  46      PreCommit(BftVote),
  47      /// View change request when leader fails
  48      ViewChange(ViewChangeRequest),
  49  
  50      // Sync messages
  51      /// Request blocks starting from a height
  52      GetBlocks {
  53          /// Starting block height
  54          start_height: u32,
  55          /// Number of blocks to retrieve
  56          count: u32,
  57      },
  58      /// Response containing requested blocks
  59      Blocks(Vec<DeltaBlock>),
  60  
  61      // Transaction propagation
  62      /// A new transaction to be propagated
  63      Transaction(DeltaTransaction),
  64  
  65      // Attestation relay
  66      /// Cross-chain attestation from ALPHA
  67      Attestation(CrossChainAttestation),
  68  
  69      // Peer discovery
  70      /// Request for peer addresses
  71      GetPeers,
  72      /// Response containing peer addresses
  73      Peers(Vec<SocketAddr>),
  74  
  75      // Handshake
  76      /// Initial handshake message
  77      Handshake(HandshakeMessage),
  78      /// Handshake acknowledgment
  79      HandshakeAck(HandshakeAck),
  80  
  81      // Heartbeat
  82      /// Ping message for liveness check
  83      Ping(u64),
  84      /// Pong response to ping
  85      Pong(u64),
  86  }
  87  
  88  /// Handshake message for initial connection setup
  89  #[derive(Clone, Debug, Serialize, Deserialize)]
  90  pub struct HandshakeMessage {
  91      /// Protocol version
  92      pub version: u32,
  93      /// Node's listening address
  94      pub listen_addr: SocketAddr,
  95      /// Node's public key (for identity)
  96      pub node_id: Vec<u8>,
  97      /// Current block height
  98      pub block_height: u32,
  99      /// Genesis block hash (for chain validation)
 100      pub genesis_hash: [u8; 32],
 101  }
 102  
 103  /// Handshake acknowledgment
 104  #[derive(Clone, Debug, Serialize, Deserialize)]
 105  pub struct HandshakeAck {
 106      /// Whether handshake was accepted
 107      pub accepted: bool,
 108      /// Reason if rejected
 109      pub reason: Option<String>,
 110      /// Responder's listening address
 111      pub listen_addr: SocketAddr,
 112      /// Responder's block height
 113      pub block_height: u32,
 114  }
 115  
 116  // ============================================================================
 117  // Message Codec
 118  // ============================================================================
 119  
 120  /// Protocol version for message compatibility
 121  pub const PROTOCOL_VERSION: u32 = 1;
 122  
 123  /// Maximum message size (16 MB)
 124  pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
 125  
 126  /// Codec for encoding/decoding DeltaMessages
 127  #[derive(Clone)]
 128  pub struct DeltaCodec {
 129      /// Maximum allowed message size
 130      max_message_size: usize,
 131  }
 132  
 133  impl Default for DeltaCodec {
 134      fn default() -> Self {
 135          Self {
 136              max_message_size: MAX_MESSAGE_SIZE,
 137          }
 138      }
 139  }
 140  
 141  impl DeltaCodec {
 142      /// Create a new codec with custom max message size
 143      pub fn new(max_message_size: usize) -> Self {
 144          Self { max_message_size }
 145      }
 146  }
 147  
 148  impl Decoder for DeltaCodec {
 149      type Item = DeltaMessage;
 150      type Error = io::Error;
 151  
 152      fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
 153          // Need at least 4 bytes for length prefix
 154          if src.len() < 4 {
 155              return Ok(None);
 156          }
 157  
 158          // Read length prefix (big-endian u32)
 159          let len = u32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
 160  
 161          // Validate message size
 162          if len > self.max_message_size {
 163              return Err(io::Error::new(
 164                  io::ErrorKind::InvalidData,
 165                  format!(
 166                      "Message size {} exceeds maximum {}",
 167                      len, self.max_message_size
 168                  ),
 169              ));
 170          }
 171  
 172          // Check if we have the full message
 173          if src.len() < 4 + len {
 174              // Reserve space for the full message
 175              src.reserve(4 + len - src.len());
 176              return Ok(None);
 177          }
 178  
 179          // Consume length prefix
 180          src.advance(4);
 181  
 182          // Extract message bytes
 183          let msg_bytes = src.split_to(len);
 184  
 185          // Deserialize message
 186          let message: DeltaMessage = bincode::deserialize(&msg_bytes).map_err(|e| {
 187              io::Error::new(
 188                  io::ErrorKind::InvalidData,
 189                  format!("Failed to deserialize message: {}", e),
 190              )
 191          })?;
 192  
 193          Ok(Some(message))
 194      }
 195  }
 196  
 197  impl Encoder<DeltaMessage> for DeltaCodec {
 198      type Error = io::Error;
 199  
 200      fn encode(&mut self, item: DeltaMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
 201          // Serialize message
 202          let msg_bytes = bincode::serialize(&item).map_err(|e| {
 203              io::Error::new(
 204                  io::ErrorKind::InvalidData,
 205                  format!("Failed to serialize message: {}", e),
 206              )
 207          })?;
 208  
 209          // Validate size
 210          if msg_bytes.len() > self.max_message_size {
 211              return Err(io::Error::new(
 212                  io::ErrorKind::InvalidData,
 213                  format!(
 214                      "Serialized message size {} exceeds maximum {}",
 215                      msg_bytes.len(),
 216                      self.max_message_size
 217                  ),
 218              ));
 219          }
 220  
 221          // Write length prefix (big-endian u32)
 222          dst.reserve(4 + msg_bytes.len());
 223          dst.put_u32(msg_bytes.len() as u32);
 224          dst.extend_from_slice(&msg_bytes);
 225  
 226          Ok(())
 227      }
 228  }
 229  
 230  // ============================================================================
 231  // P2P Error Types
 232  // ============================================================================
 233  
 234  /// Errors that can occur in P2P operations
 235  #[derive(Error, Debug)]
 236  pub enum P2PError {
 237      /// IO error
 238      #[error("IO error: {0}")]
 239      Io(#[from] io::Error),
 240  
 241      /// Connection error
 242      #[error("Connection error: {0}")]
 243      Connection(String),
 244  
 245      /// Peer not found
 246      #[error("Peer not found: {0}")]
 247      PeerNotFound(SocketAddr),
 248  
 249      /// Invalid message
 250      #[error("Invalid message: {0}")]
 251      InvalidMessage(String),
 252  
 253      /// Handshake failed
 254      #[error("Handshake failed: {0}")]
 255      HandshakeFailed(String),
 256  
 257      /// Protocol version mismatch
 258      #[error("Protocol version mismatch: expected {expected}, got {got}")]
 259      VersionMismatch { expected: u32, got: u32 },
 260  
 261      /// Genesis hash mismatch
 262      #[error("Genesis hash mismatch")]
 263      GenesisMismatch,
 264  
 265      /// Maximum peers reached
 266      #[error("Maximum peers reached: {0}")]
 267      MaxPeersReached(usize),
 268  
 269      /// BFT consensus error
 270      #[error("BFT error: {0}")]
 271      Bft(#[from] BftError),
 272  
 273      /// Peer already connected
 274      #[error("Already connected to peer: {0}")]
 275      AlreadyConnected(SocketAddr),
 276  
 277      /// Send error
 278      #[error("Failed to send message: {0}")]
 279      SendError(String),
 280  }
 281  
 282  // ============================================================================
 283  // Peer State
 284  // ============================================================================
 285  
 286  /// State of a connected peer
 287  #[derive(Clone, Debug)]
 288  pub struct PeerState {
 289      /// Peer's listening address
 290      pub listen_addr: SocketAddr,
 291      /// Peer's connected address (may differ from listen)
 292      pub connected_addr: SocketAddr,
 293      /// Peer's node ID
 294      pub node_id: Vec<u8>,
 295      /// Peer's current block height
 296      pub block_height: u32,
 297      /// Connection side (initiator or responder)
 298      pub side: ConnectionSide,
 299      /// Last seen timestamp (unix millis)
 300      pub last_seen: u64,
 301      /// Number of messages received from this peer
 302      pub messages_received: u64,
 303      /// Number of messages sent to this peer
 304      pub messages_sent: u64,
 305      /// Whether handshake is complete
 306      pub handshake_complete: bool,
 307  }
 308  
 309  impl PeerState {
 310      /// Create a new peer state
 311      pub fn new(connected_addr: SocketAddr, side: ConnectionSide) -> Self {
 312          Self {
 313              listen_addr: connected_addr,
 314              connected_addr,
 315              node_id: Vec::new(),
 316              block_height: 0,
 317              side,
 318              last_seen: current_time_millis(),
 319              messages_received: 0,
 320              messages_sent: 0,
 321              handshake_complete: false,
 322          }
 323      }
 324  
 325      /// Update last seen timestamp
 326      pub fn touch(&mut self) {
 327          self.last_seen = current_time_millis();
 328      }
 329  
 330      /// Update from handshake
 331      pub fn update_from_handshake(&mut self, msg: &HandshakeMessage) {
 332          self.listen_addr = msg.listen_addr;
 333          self.node_id = msg.node_id.clone();
 334          self.block_height = msg.block_height;
 335          self.handshake_complete = true;
 336          self.touch();
 337      }
 338  }
 339  
 340  /// Get current time in milliseconds
 341  fn current_time_millis() -> u64 {
 342      std::time::SystemTime::now()
 343          .duration_since(std::time::UNIX_EPOCH)
 344          .unwrap_or_default()
 345          .as_millis() as u64
 346  }
 347  
 348  // ============================================================================
 349  // DeltaP2P Implementation
 350  // ============================================================================
 351  
 352  /// P2P network handler for DELTA chain
 353  #[derive(Clone)]
 354  pub struct DeltaP2P {
 355      inner: Arc<InnerDeltaP2P>,
 356  }
 357  
 358  struct InnerDeltaP2P {
 359      /// TCP stack from adnet-core
 360      tcp: Tcp,
 361      /// P2P configuration
 362      config: P2PConfig,
 363      /// Connected peers state
 364      peers: RwLock<HashMap<SocketAddr, PeerState>>,
 365      /// Known peer addresses (for discovery)
 366      known_peers: RwLock<HashSet<SocketAddr>>,
 367      /// BFT consensus handler (if validator)
 368      consensus: RwLock<Option<BftConsensus>>,
 369      /// Channel for incoming blocks (from consensus)
 370      block_sender: mpsc::Sender<DeltaBlock>,
 371      /// Channel for incoming transactions
 372      tx_sender: mpsc::Sender<DeltaTransaction>,
 373      /// Channel for incoming attestations
 374      attestation_sender: mpsc::Sender<CrossChainAttestation>,
 375      /// Node's listening address
 376      listen_addr: RwLock<Option<SocketAddr>>,
 377      /// Node's identity (public key)
 378      node_id: Vec<u8>,
 379      /// Current block height
 380      block_height: RwLock<u32>,
 381      /// Genesis block hash
 382      genesis_hash: [u8; 32],
 383      /// Storage reference for block sync
 384      storage: RwLock<Option<Arc<Storage>>>,
 385  }
 386  
 387  impl DeltaP2P {
 388      /// Create a new DeltaP2P instance
 389      pub fn new(
 390          config: P2PConfig,
 391          block_sender: mpsc::Sender<DeltaBlock>,
 392          tx_sender: mpsc::Sender<DeltaTransaction>,
 393          attestation_sender: mpsc::Sender<CrossChainAttestation>,
 394      ) -> Self {
 395          // Create TCP configuration
 396          let tcp_config = TcpConfig {
 397              name: Some("delta-p2p".to_string()),
 398              listener_ip: Some(config.listen_addr.ip()),
 399              desired_listening_port: Some(config.listen_addr.port()),
 400              allow_random_port: false,
 401              max_connections: config.max_connections,
 402              connection_timeout_ms: 5000,
 403              ..Default::default()
 404          };
 405  
 406          let tcp = Tcp::new(tcp_config);
 407  
 408          // Generate a simple node ID (in production, use proper key generation)
 409          let node_id = {
 410              let mut hasher = blake3::Hasher::new();
 411              hasher.update(config.listen_addr.ip().to_string().as_bytes());
 412              hasher.update(&config.listen_addr.port().to_le_bytes());
 413              hasher.update(&current_time_millis().to_le_bytes());
 414              hasher.finalize().as_bytes().to_vec()
 415          };
 416  
 417          Self {
 418              inner: Arc::new(InnerDeltaP2P {
 419                  tcp,
 420                  config,
 421                  peers: RwLock::new(HashMap::new()),
 422                  known_peers: RwLock::new(HashSet::new()),
 423                  consensus: RwLock::new(None),
 424                  block_sender,
 425                  tx_sender,
 426                  attestation_sender,
 427                  listen_addr: RwLock::new(None),
 428                  node_id,
 429                  block_height: RwLock::new(0),
 430                  genesis_hash: [0u8; 32], // Default genesis hash
 431                  storage: RwLock::new(None),
 432              }),
 433          }
 434      }
 435  
 436      /// Set the storage reference for block sync
 437      pub fn set_storage(&self, storage: Arc<Storage>) {
 438          *self.inner.storage.write() = Some(storage);
 439      }
 440  
 441      /// Start the P2P network
 442      pub async fn start(&self) -> Result<SocketAddr, P2PError> {
 443          info!("Starting DELTA P2P network...");
 444  
 445          // Enable TCP listener
 446          let listen_addr = self.inner.tcp.enable_listener().await?;
 447          *self.inner.listen_addr.write() = Some(listen_addr);
 448          info!("P2P listening on {}", listen_addr);
 449  
 450          // Add bootstrap peers to known peers
 451          {
 452              let mut known = self.inner.known_peers.write();
 453              for peer in &self.inner.config.bootstrap_peers {
 454                  known.insert(*peer);
 455              }
 456          }
 457  
 458          // Connect to bootstrap peers
 459          self.connect_to_bootstrap_peers().await;
 460  
 461          Ok(listen_addr)
 462      }
 463  
 464      /// Connect to bootstrap peers
 465      async fn connect_to_bootstrap_peers(&self) {
 466          let bootstrap_peers: Vec<SocketAddr> = self.inner.config.bootstrap_peers.to_vec();
 467  
 468          for peer_addr in bootstrap_peers {
 469              if let Err(e) = self.connect_peer(peer_addr).await {
 470                  warn!("Failed to connect to bootstrap peer {}: {}", peer_addr, e);
 471              }
 472          }
 473      }
 474  
 475      /// Connect to a peer
 476      pub async fn connect_peer(&self, addr: SocketAddr) -> Result<(), P2PError> {
 477          // Check if already connected
 478          if self.inner.peers.read().contains_key(&addr) {
 479              return Err(P2PError::AlreadyConnected(addr));
 480          }
 481  
 482          // Check max peers
 483          let peer_count = self.inner.peers.read().len();
 484          if peer_count >= self.inner.config.max_connections as usize {
 485              return Err(P2PError::MaxPeersReached(peer_count));
 486          }
 487  
 488          info!("Connecting to peer: {}", addr);
 489  
 490          // Attempt TCP connection
 491          self.inner
 492              .tcp
 493              .connect(addr)
 494              .await
 495              .map_err(|e| P2PError::Connection(format!("TCP connect failed: {}", e)))?;
 496  
 497          // Add to peers with initial state
 498          {
 499              let mut peers = self.inner.peers.write();
 500              peers.insert(addr, PeerState::new(addr, ConnectionSide::Initiator));
 501          }
 502  
 503          // Add to known peers
 504          self.inner.known_peers.write().insert(addr);
 505  
 506          debug!("Connected to peer: {}", addr);
 507          Ok(())
 508      }
 509  
 510      /// Disconnect from a peer
 511      pub async fn disconnect_peer(&self, addr: SocketAddr) -> bool {
 512          let disconnected = self.inner.tcp.disconnect(addr).await;
 513          if disconnected {
 514              self.inner.peers.write().remove(&addr);
 515              info!("Disconnected from peer: {}", addr);
 516          }
 517          disconnected
 518      }
 519  
 520      /// Add a peer (called when accepting inbound connection)
 521      pub fn add_peer(&self, addr: SocketAddr, side: ConnectionSide) {
 522          let mut peers = self.inner.peers.write();
 523          peers.insert(addr, PeerState::new(addr, side));
 524          self.inner.known_peers.write().insert(addr);
 525          debug!("Added peer: {} (side: {:?})", addr, side);
 526      }
 527  
 528      /// Remove a peer
 529      pub fn remove_peer(&self, addr: SocketAddr) {
 530          self.inner.peers.write().remove(&addr);
 531          debug!("Removed peer: {}", addr);
 532      }
 533  
 534      /// Get all connected peers
 535      pub fn get_peers(&self) -> Vec<SocketAddr> {
 536          self.inner.peers.read().keys().copied().collect()
 537      }
 538  
 539      /// Get peer state
 540      pub fn get_peer_state(&self, addr: &SocketAddr) -> Option<PeerState> {
 541          self.inner.peers.read().get(addr).cloned()
 542      }
 543  
 544      /// Get number of connected peers
 545      pub fn peer_count(&self) -> usize {
 546          self.inner.peers.read().len()
 547      }
 548  
 549      /// Set the BFT consensus handler
 550      pub fn set_consensus(&self, consensus: BftConsensus) {
 551          *self.inner.consensus.write() = Some(consensus);
 552      }
 553  
 554      /// Update current block height
 555      pub fn set_block_height(&self, height: u32) {
 556          *self.inner.block_height.write() = height;
 557      }
 558  
 559      /// Get current block height
 560      pub fn block_height(&self) -> u32 {
 561          *self.inner.block_height.read()
 562      }
 563  
 564      // ========================================================================
 565      // Broadcasting Methods
 566      // ========================================================================
 567  
 568      /// Broadcast a block proposal to all peers
 569      pub fn broadcast_block(&self, block: DeltaBlock) -> Result<(), P2PError> {
 570          let message = DeltaMessage::BlockProposal(block);
 571          self.broadcast_message(message)
 572      }
 573  
 574      /// Broadcast a vote (pre-vote or pre-commit) to all peers
 575      pub fn broadcast_vote(&self, vote: BftVote) -> Result<(), P2PError> {
 576          let message = match vote.vote_type {
 577              crate::consensus::VoteType::PreVote => DeltaMessage::PreVote(vote),
 578              crate::consensus::VoteType::PreCommit => DeltaMessage::PreCommit(vote),
 579          };
 580          self.broadcast_message(message)
 581      }
 582  
 583      /// Broadcast a transaction to all peers
 584      pub fn broadcast_transaction(&self, tx: DeltaTransaction) -> Result<(), P2PError> {
 585          let message = DeltaMessage::Transaction(tx);
 586          self.broadcast_message(message)
 587      }
 588  
 589      /// Broadcast an attestation to all peers
 590      pub fn broadcast_attestation(
 591          &self,
 592          attestation: CrossChainAttestation,
 593      ) -> Result<(), P2PError> {
 594          let message = DeltaMessage::Attestation(attestation);
 595          self.broadcast_message(message)
 596      }
 597  
 598      /// Broadcast a view change request
 599      pub fn broadcast_view_change(&self, request: ViewChangeRequest) -> Result<(), P2PError> {
 600          let message = DeltaMessage::ViewChange(request);
 601          self.broadcast_message(message)
 602      }
 603  
 604      /// Send a message to a specific peer
 605      pub fn send_to_peer(
 606          &self,
 607          addr: SocketAddr,
 608          message: DeltaMessage,
 609      ) -> Result<oneshot::Receiver<io::Result<()>>, P2PError> {
 610          if !self.inner.peers.read().contains_key(&addr) {
 611              return Err(P2PError::PeerNotFound(addr));
 612          }
 613  
 614          self.unicast(addr, message)
 615              .map_err(|e| P2PError::SendError(format!("Failed to send to {}: {}", addr, e)))
 616      }
 617  
 618      /// Request blocks from a peer
 619      pub fn request_blocks(
 620          &self,
 621          peer: SocketAddr,
 622          start_height: u32,
 623          count: u32,
 624      ) -> Result<oneshot::Receiver<io::Result<()>>, P2PError> {
 625          let message = DeltaMessage::GetBlocks {
 626              start_height,
 627              count,
 628          };
 629          self.send_to_peer(peer, message)
 630      }
 631  
 632      /// Initiate block sync with a peer
 633      /// Requests blocks from our current height+1 to peer's height
 634      pub fn sync_with_peer(&self, peer: SocketAddr, peer_height: u32) -> Result<(), P2PError> {
 635          let our_height = *self.inner.block_height.read();
 636  
 637          if peer_height <= our_height {
 638              debug!(
 639                  "No sync needed with {} (peer height {} <= our height {})",
 640                  peer, peer_height, our_height
 641              );
 642              return Ok(());
 643          }
 644  
 645          let blocks_behind = peer_height - our_height;
 646          info!(
 647              "Syncing with {}: {} blocks behind (our height {}, peer height {})",
 648              peer, blocks_behind, our_height, peer_height
 649          );
 650  
 651          // Request blocks in batches of 100
 652          let batch_size = 100u32;
 653          let start_height = our_height + 1;
 654          let count = blocks_behind.min(batch_size);
 655  
 656          self.request_blocks(peer, start_height, count)?;
 657          Ok(())
 658      }
 659  
 660      /// Request peers from a connected peer
 661      pub fn request_peers(
 662          &self,
 663          peer: SocketAddr,
 664      ) -> Result<oneshot::Receiver<io::Result<()>>, P2PError> {
 665          self.send_to_peer(peer, DeltaMessage::GetPeers)
 666      }
 667  
 668      /// Broadcast a message to all connected peers
 669      fn broadcast_message(&self, message: DeltaMessage) -> Result<(), P2PError> {
 670          self.broadcast(message)
 671              .map_err(|e| P2PError::SendError(format!("Broadcast failed: {}", e)))
 672      }
 673  
 674      // ========================================================================
 675      // Message Handling
 676      // ========================================================================
 677  
 678      /// Handle an incoming message
 679      pub async fn handle_message(
 680          &self,
 681          source: SocketAddr,
 682          message: DeltaMessage,
 683      ) -> Result<(), P2PError> {
 684          // Update peer last seen
 685          if let Some(peer) = self.inner.peers.write().get_mut(&source) {
 686              peer.touch();
 687              peer.messages_received += 1;
 688          }
 689  
 690          match message {
 691              DeltaMessage::BlockProposal(block) => self.handle_block_proposal(source, block).await,
 692              DeltaMessage::PreVote(vote) => self.handle_prevote(source, vote).await,
 693              DeltaMessage::PreCommit(vote) => self.handle_precommit(source, vote).await,
 694              DeltaMessage::ViewChange(request) => self.handle_view_change(source, request).await,
 695              DeltaMessage::GetBlocks {
 696                  start_height,
 697                  count,
 698              } => self.handle_get_blocks(source, start_height, count).await,
 699              DeltaMessage::Blocks(blocks) => self.handle_blocks(source, blocks).await,
 700              DeltaMessage::Transaction(tx) => self.handle_transaction(source, tx).await,
 701              DeltaMessage::Attestation(attestation) => {
 702                  self.handle_attestation(source, attestation).await
 703              }
 704              DeltaMessage::GetPeers => self.handle_get_peers(source).await,
 705              DeltaMessage::Peers(peers) => self.handle_peers(source, peers).await,
 706              DeltaMessage::Handshake(msg) => self.handle_handshake(source, msg).await,
 707              DeltaMessage::HandshakeAck(ack) => self.handle_handshake_ack(source, ack).await,
 708              DeltaMessage::Ping(nonce) => self.handle_ping(source, nonce).await,
 709              DeltaMessage::Pong(nonce) => self.handle_pong(source, nonce).await,
 710          }
 711      }
 712  
 713      async fn handle_block_proposal(
 714          &self,
 715          source: SocketAddr,
 716          block: DeltaBlock,
 717      ) -> Result<(), P2PError> {
 718          debug!(
 719              "Received block proposal height={} from {}",
 720              block.height, source
 721          );
 722  
 723          // Forward to consensus if available
 724          if let Some(ref mut consensus) = *self.inner.consensus.write() {
 725              let proposer = block.producer.clone();
 726              match consensus.handle_proposal(block.clone(), &proposer) {
 727                  Ok(()) => {
 728                      // Proposal accepted, send to block channel
 729                      if let Err(e) = self.inner.block_sender.try_send(block) {
 730                          error!("Failed to forward block: {}", e);
 731                      }
 732                  }
 733                  Err(e) => {
 734                      warn!("Block proposal rejected: {}", e);
 735                      return Err(P2PError::Bft(e));
 736                  }
 737              }
 738          } else {
 739              // No consensus, just forward the block
 740              if let Err(e) = self.inner.block_sender.try_send(block) {
 741                  error!("Failed to forward block: {}", e);
 742              }
 743          }
 744  
 745          Ok(())
 746      }
 747  
 748      async fn handle_prevote(&self, source: SocketAddr, vote: BftVote) -> Result<(), P2PError> {
 749          debug!(
 750              "Received pre-vote height={} round={} from {}",
 751              vote.height, vote.round, source
 752          );
 753  
 754          if let Some(ref mut consensus) = *self.inner.consensus.write() {
 755              match consensus.handle_prevote(vote) {
 756                  Ok(quorum_reached) => {
 757                      if quorum_reached {
 758                          info!("Pre-vote quorum reached");
 759                          // Create and broadcast pre-commit
 760                          if let Some(precommit) = consensus.create_precommit()
 761                              && let Err(e) = self.broadcast_vote(precommit)
 762                          {
 763                              error!("Failed to broadcast pre-commit: {}", e);
 764                          }
 765                      }
 766                  }
 767                  Err(e) => {
 768                      warn!("Pre-vote handling error: {}", e);
 769                  }
 770              }
 771          }
 772  
 773          Ok(())
 774      }
 775  
 776      async fn handle_precommit(&self, source: SocketAddr, vote: BftVote) -> Result<(), P2PError> {
 777          debug!(
 778              "Received pre-commit height={} round={} from {}",
 779              vote.height, vote.round, source
 780          );
 781  
 782          if let Some(ref mut consensus) = *self.inner.consensus.write() {
 783              match consensus.handle_precommit(vote) {
 784                  Ok(Some(committed_block)) => {
 785                      info!("Block committed at height {}", committed_block.height);
 786                      // Forward committed block
 787                      if let Err(e) = self.inner.block_sender.try_send(committed_block) {
 788                          error!("Failed to forward committed block: {}", e);
 789                      }
 790                  }
 791                  Ok(None) => {
 792                      // Waiting for more pre-commits
 793                  }
 794                  Err(e) => {
 795                      warn!("Pre-commit handling error: {}", e);
 796                  }
 797              }
 798          }
 799  
 800          Ok(())
 801      }
 802  
 803      async fn handle_view_change(
 804          &self,
 805          source: SocketAddr,
 806          request: ViewChangeRequest,
 807      ) -> Result<(), P2PError> {
 808          debug!(
 809              "Received view change request height={} new_round={} from {}",
 810              request.height, request.new_round, source
 811          );
 812  
 813          if let Some(ref mut consensus) = *self.inner.consensus.write() {
 814              match consensus.handle_view_change(request) {
 815                  Ok(triggered) => {
 816                      if triggered {
 817                          info!("View change triggered");
 818                      }
 819                  }
 820                  Err(e) => {
 821                      warn!("View change handling error: {}", e);
 822                  }
 823              }
 824          }
 825  
 826          Ok(())
 827      }
 828  
 829      async fn handle_get_blocks(
 830          &self,
 831          source: SocketAddr,
 832          start_height: u32,
 833          count: u32,
 834      ) -> Result<(), P2PError> {
 835          debug!(
 836              "Received GetBlocks request start={} count={} from {}",
 837              start_height, count, source
 838          );
 839  
 840          // Retrieve blocks from storage
 841          let blocks = {
 842              let storage_guard = self.inner.storage.read();
 843              if let Some(ref storage) = *storage_guard {
 844                  let mut blocks = Vec::new();
 845                  let max_blocks = count.min(100); // Cap at 100 blocks per request
 846  
 847                  for height in start_height..(start_height + max_blocks) {
 848                      match storage.get_block(height) {
 849                          Ok(block) => blocks.push(block),
 850                          Err(_) => break, // Stop at first missing block
 851                      }
 852                  }
 853  
 854                  debug!(
 855                      "Retrieved {} blocks (heights {}-{}) for {}",
 856                      blocks.len(),
 857                      start_height,
 858                      start_height + blocks.len() as u32 - 1,
 859                      source
 860                  );
 861                  blocks
 862              } else {
 863                  warn!("GetBlocks request but storage not available");
 864                  Vec::new()
 865              }
 866          };
 867  
 868          let _ = self.send_to_peer(source, DeltaMessage::Blocks(blocks));
 869          Ok(())
 870      }
 871  
 872      async fn handle_blocks(
 873          &self,
 874          source: SocketAddr,
 875          blocks: Vec<DeltaBlock>,
 876      ) -> Result<(), P2PError> {
 877          debug!("Received {} blocks from {}", blocks.len(), source);
 878  
 879          // Forward blocks to consensus/sync
 880          for block in blocks {
 881              if let Err(e) = self.inner.block_sender.try_send(block) {
 882                  error!("Failed to forward synced block: {}", e);
 883              }
 884          }
 885  
 886          Ok(())
 887      }
 888  
 889      async fn handle_transaction(
 890          &self,
 891          source: SocketAddr,
 892          tx: DeltaTransaction,
 893      ) -> Result<(), P2PError> {
 894          trace!("Received transaction from {}", source);
 895  
 896          // Forward to transaction handler
 897          if let Err(e) = self.inner.tx_sender.try_send(tx) {
 898              error!("Failed to forward transaction: {}", e);
 899          }
 900  
 901          Ok(())
 902      }
 903  
 904      async fn handle_attestation(
 905          &self,
 906          source: SocketAddr,
 907          attestation: CrossChainAttestation,
 908      ) -> Result<(), P2PError> {
 909          debug!(
 910              "Received attestation alpha_height={} from {}",
 911              attestation.alpha_height, source
 912          );
 913  
 914          // Forward to attestation handler
 915          if let Err(e) = self.inner.attestation_sender.try_send(attestation) {
 916              error!("Failed to forward attestation: {}", e);
 917          }
 918  
 919          Ok(())
 920      }
 921  
 922      async fn handle_get_peers(&self, source: SocketAddr) -> Result<(), P2PError> {
 923          debug!("Received GetPeers request from {}", source);
 924  
 925          // Respond with known peers (excluding the requester)
 926          let peers: Vec<SocketAddr> = self
 927              .inner
 928              .peers
 929              .read()
 930              .keys()
 931              .filter(|addr| **addr != source)
 932              .copied()
 933              .take(20) // Limit response size
 934              .collect();
 935  
 936          let _ = self.send_to_peer(source, DeltaMessage::Peers(peers));
 937  
 938          Ok(())
 939      }
 940  
 941      async fn handle_peers(
 942          &self,
 943          source: SocketAddr,
 944          peers: Vec<SocketAddr>,
 945      ) -> Result<(), P2PError> {
 946          debug!("Received {} peer addresses from {}", peers.len(), source);
 947  
 948          // Add to known peers
 949          let mut known = self.inner.known_peers.write();
 950          for peer in peers {
 951              known.insert(peer);
 952          }
 953  
 954          Ok(())
 955      }
 956  
 957      async fn handle_handshake(
 958          &self,
 959          source: SocketAddr,
 960          msg: HandshakeMessage,
 961      ) -> Result<(), P2PError> {
 962          debug!(
 963              "Received handshake from {} (version={})",
 964              source, msg.version
 965          );
 966  
 967          // Validate protocol version
 968          if msg.version != PROTOCOL_VERSION {
 969              let ack = HandshakeAck {
 970                  accepted: false,
 971                  reason: Some(format!(
 972                      "Protocol version mismatch: expected {}, got {}",
 973                      PROTOCOL_VERSION, msg.version
 974                  )),
 975                  listen_addr: self.inner.listen_addr.read().unwrap_or(source),
 976                  block_height: *self.inner.block_height.read(),
 977              };
 978              let _ = self.send_to_peer(source, DeltaMessage::HandshakeAck(ack));
 979              return Err(P2PError::VersionMismatch {
 980                  expected: PROTOCOL_VERSION,
 981                  got: msg.version,
 982              });
 983          }
 984  
 985          // Validate genesis hash
 986          if msg.genesis_hash != self.inner.genesis_hash {
 987              let ack = HandshakeAck {
 988                  accepted: false,
 989                  reason: Some("Genesis hash mismatch".to_string()),
 990                  listen_addr: self.inner.listen_addr.read().unwrap_or(source),
 991                  block_height: *self.inner.block_height.read(),
 992              };
 993              let _ = self.send_to_peer(source, DeltaMessage::HandshakeAck(ack));
 994              return Err(P2PError::GenesisMismatch);
 995          }
 996  
 997          // Update peer state
 998          if let Some(peer) = self.inner.peers.write().get_mut(&source) {
 999              peer.update_from_handshake(&msg);
1000          }
1001  
1002          // Send acknowledgment
1003          let ack = HandshakeAck {
1004              accepted: true,
1005              reason: None,
1006              listen_addr: self.inner.listen_addr.read().unwrap_or(source),
1007              block_height: *self.inner.block_height.read(),
1008          };
1009          let _ = self.send_to_peer(source, DeltaMessage::HandshakeAck(ack));
1010  
1011          info!("Handshake complete with {}", source);
1012          Ok(())
1013      }
1014  
1015      async fn handle_handshake_ack(
1016          &self,
1017          source: SocketAddr,
1018          ack: HandshakeAck,
1019      ) -> Result<(), P2PError> {
1020          if ack.accepted {
1021              debug!("Handshake accepted by {}", source);
1022  
1023              // Update peer state
1024              if let Some(peer) = self.inner.peers.write().get_mut(&source) {
1025                  peer.listen_addr = ack.listen_addr;
1026                  peer.block_height = ack.block_height;
1027                  peer.handshake_complete = true;
1028              }
1029  
1030              Ok(())
1031          } else {
1032              let reason = ack.reason.unwrap_or_else(|| "Unknown".to_string());
1033              warn!("Handshake rejected by {}: {}", source, reason);
1034              Err(P2PError::HandshakeFailed(reason))
1035          }
1036      }
1037  
1038      async fn handle_ping(&self, source: SocketAddr, nonce: u64) -> Result<(), P2PError> {
1039          trace!("Received ping from {} with nonce {}", source, nonce);
1040          let _ = self.send_to_peer(source, DeltaMessage::Pong(nonce));
1041          Ok(())
1042      }
1043  
1044      async fn handle_pong(&self, source: SocketAddr, nonce: u64) -> Result<(), P2PError> {
1045          trace!("Received pong from {} with nonce {}", source, nonce);
1046          // Could be used for latency measurement
1047          Ok(())
1048      }
1049  
1050      /// Initiate handshake with a peer
1051      pub async fn send_handshake(&self, peer: SocketAddr) -> Result<(), P2PError> {
1052          let handshake = HandshakeMessage {
1053              version: PROTOCOL_VERSION,
1054              listen_addr: self.inner.listen_addr.read().unwrap_or(peer),
1055              node_id: self.inner.node_id.clone(),
1056              block_height: *self.inner.block_height.read(),
1057              genesis_hash: self.inner.genesis_hash,
1058          };
1059          self.send_to_peer(peer, DeltaMessage::Handshake(handshake))?
1060              .await
1061              .map_err(|e| P2PError::Connection(format!("Handshake recv failed: {}", e)))?
1062              .map_err(|e| P2PError::Connection(format!("Handshake IO failed: {}", e)))?;
1063          Ok(())
1064      }
1065  
1066      /// Shutdown the P2P network
1067      pub async fn shutdown(&self) {
1068          info!("Shutting down DELTA P2P network...");
1069          self.inner.tcp.shut_down().await;
1070      }
1071  }
1072  
1073  // ============================================================================
1074  // P2P Trait Implementation
1075  // ============================================================================
1076  
1077  impl P2P for DeltaP2P {
1078      fn tcp(&self) -> &Tcp {
1079          &self.inner.tcp
1080      }
1081  }
1082  
1083  // ============================================================================
1084  // Reading Protocol Implementation
1085  // ============================================================================
1086  
1087  #[async_trait]
1088  impl Reading for DeltaP2P {
1089      type Message = DeltaMessage;
1090      type Codec = DeltaCodec;
1091  
1092      fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1093          DeltaCodec::default()
1094      }
1095  
1096      async fn process_message(&self, source: SocketAddr, message: Self::Message) -> io::Result<()> {
1097          self.handle_message(source, message)
1098              .await
1099              .map_err(|e| io::Error::other(e.to_string()))
1100      }
1101  }
1102  
1103  // ============================================================================
1104  // Writing Protocol Implementation
1105  // ============================================================================
1106  
1107  #[async_trait]
1108  impl Writing for DeltaP2P {
1109      type Message = DeltaMessage;
1110      type Codec = DeltaCodec;
1111  
1112      fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1113          DeltaCodec::default()
1114      }
1115  }
1116  
1117  // ============================================================================
1118  // Unit Tests
1119  // ============================================================================
1120  
1121  #[cfg(test)]
1122  mod tests {
1123      use super::*;
1124      use std::net::{IpAddr, Ipv4Addr};
1125  
1126      fn test_config() -> P2PConfig {
1127          P2PConfig {
1128              listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1129              bootstrap_peers: vec![],
1130              max_connections: 10,
1131          }
1132      }
1133  
1134      #[test]
1135      fn test_delta_message_serialization() {
1136          // Test GetBlocks message
1137          let msg = DeltaMessage::GetBlocks {
1138              start_height: 100,
1139              count: 10,
1140          };
1141          let bytes = bincode::serialize(&msg).unwrap();
1142          let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap();
1143  
1144          match decoded {
1145              DeltaMessage::GetBlocks {
1146                  start_height,
1147                  count,
1148              } => {
1149                  assert_eq!(start_height, 100);
1150                  assert_eq!(count, 10);
1151              }
1152              _ => panic!("Wrong message type"),
1153          }
1154      }
1155  
1156      #[test]
1157      fn test_delta_codec_encode_decode() {
1158          let mut codec = DeltaCodec::default();
1159          let msg = DeltaMessage::Ping(12345);
1160  
1161          // Encode
1162          let mut buf = BytesMut::new();
1163          codec.encode(msg.clone(), &mut buf).unwrap();
1164  
1165          // Decode
1166          let decoded = codec.decode(&mut buf).unwrap().unwrap();
1167  
1168          match decoded {
1169              DeltaMessage::Ping(nonce) => assert_eq!(nonce, 12345),
1170              _ => panic!("Wrong message type"),
1171          }
1172      }
1173  
1174      #[test]
1175      fn test_delta_codec_partial_read() {
1176          let mut codec = DeltaCodec::default();
1177          let msg = DeltaMessage::GetPeers;
1178  
1179          // Encode
1180          let mut buf = BytesMut::new();
1181          codec.encode(msg, &mut buf).unwrap();
1182  
1183          // Simulate partial read (only length prefix)
1184          let mut partial = buf.split_to(4);
1185  
1186          // Should return None (waiting for more data)
1187          assert!(codec.decode(&mut partial).unwrap().is_none());
1188      }
1189  
1190      #[test]
1191      fn test_delta_codec_oversized_message() {
1192          let mut codec = DeltaCodec::new(100); // Small max size
1193  
1194          // Create a message that will serialize to more than 100 bytes
1195          let large_data = vec![0u8; 200];
1196          let tx = DeltaTransaction {
1197              id: [0u8; 32],
1198              tx_type: crate::consensus::TransactionType::Trading,
1199              sender: Some("test".to_string()),
1200              sender_pubkey: Some(large_data),
1201              nonce: 0,
1202              amount: 0,
1203              fee_limit: 1000,
1204              data: vec![],
1205              signature: vec![],
1206          };
1207          let msg = DeltaMessage::Transaction(tx);
1208  
1209          let mut buf = BytesMut::new();
1210          let result = codec.encode(msg, &mut buf);
1211          assert!(result.is_err());
1212      }
1213  
1214      #[test]
1215      fn test_peer_state() {
1216          let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4130);
1217          let mut peer = PeerState::new(addr, ConnectionSide::Initiator);
1218  
1219          assert_eq!(peer.connected_addr, addr);
1220          assert!(!peer.handshake_complete);
1221          assert_eq!(peer.messages_received, 0);
1222  
1223          // Test handshake update
1224          let handshake = HandshakeMessage {
1225              version: PROTOCOL_VERSION,
1226              listen_addr: addr,
1227              node_id: vec![1, 2, 3],
1228              block_height: 100,
1229              genesis_hash: [0u8; 32],
1230          };
1231          peer.update_from_handshake(&handshake);
1232  
1233          assert!(peer.handshake_complete);
1234          assert_eq!(peer.block_height, 100);
1235          assert_eq!(peer.node_id, vec![1, 2, 3]);
1236      }
1237  
1238      #[test]
1239      fn test_handshake_message() {
1240          let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4130);
1241          let msg = HandshakeMessage {
1242              version: PROTOCOL_VERSION,
1243              listen_addr: addr,
1244              node_id: vec![1, 2, 3, 4],
1245              block_height: 50,
1246              genesis_hash: [42u8; 32],
1247          };
1248  
1249          let bytes = bincode::serialize(&msg).unwrap();
1250          let decoded: HandshakeMessage = bincode::deserialize(&bytes).unwrap();
1251  
1252          assert_eq!(decoded.version, PROTOCOL_VERSION);
1253          assert_eq!(decoded.listen_addr, addr);
1254          assert_eq!(decoded.block_height, 50);
1255      }
1256  
1257      #[tokio::test]
1258      async fn test_deltap2p_creation() {
1259          let config = test_config();
1260          let (block_tx, _) = mpsc::channel(100);
1261          let (tx_tx, _) = mpsc::channel(100);
1262          let (att_tx, _) = mpsc::channel(100);
1263  
1264          let p2p = DeltaP2P::new(config.clone(), block_tx, tx_tx, att_tx);
1265  
1266          assert_eq!(p2p.peer_count(), 0);
1267          assert_eq!(p2p.block_height(), 0);
1268      }
1269  
1270      #[test]
1271      fn test_protocol_version() {
1272          assert_eq!(PROTOCOL_VERSION, 1);
1273      }
1274  
1275      #[test]
1276      fn test_max_message_size() {
1277          assert_eq!(MAX_MESSAGE_SIZE, 16 * 1024 * 1024);
1278      }
1279  
1280      #[test]
1281      fn test_vote_message_types() {
1282          let vote = BftVote {
1283              height: 10,
1284              round: 0,
1285              block_hash: Some([1u8; 32]),
1286              validator: "validator1".to_string(),
1287              signature: vec![0u8; 64],
1288              vote_type: crate::consensus::VoteType::PreVote,
1289          };
1290  
1291          let msg = DeltaMessage::PreVote(vote.clone());
1292          let bytes = bincode::serialize(&msg).unwrap();
1293          let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap();
1294  
1295          match decoded {
1296              DeltaMessage::PreVote(v) => {
1297                  assert_eq!(v.height, 10);
1298                  assert_eq!(v.round, 0);
1299                  assert_eq!(v.validator, "validator1");
1300              }
1301              _ => panic!("Wrong message type"),
1302          }
1303      }
1304  
1305      #[test]
1306      fn test_block_proposal_message() {
1307          let block = DeltaBlock {
1308              height: 100,
1309              previous_hash: [0u8; 32],
1310              timestamp: 1234567890,
1311              producer: "producer1".to_string(),
1312              transactions: vec![],
1313              attestations: vec![],
1314              hash: [1u8; 32],
1315          };
1316  
1317          let msg = DeltaMessage::BlockProposal(block.clone());
1318          let bytes = bincode::serialize(&msg).unwrap();
1319          let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap();
1320  
1321          match decoded {
1322              DeltaMessage::BlockProposal(b) => {
1323                  assert_eq!(b.height, 100);
1324                  assert_eq!(b.producer, "producer1");
1325              }
1326              _ => panic!("Wrong message type"),
1327          }
1328      }
1329  
1330      #[test]
1331      fn test_attestation_message() {
1332          let attestation = CrossChainAttestation {
1333              alpha_height: 500,
1334              state_root: [42u8; 32],
1335              signatures: vec![vec![1, 2, 3]],
1336          };
1337  
1338          let msg = DeltaMessage::Attestation(attestation.clone());
1339          let bytes = bincode::serialize(&msg).unwrap();
1340          let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap();
1341  
1342          match decoded {
1343              DeltaMessage::Attestation(a) => {
1344                  assert_eq!(a.alpha_height, 500);
1345                  assert_eq!(a.signatures.len(), 1);
1346              }
1347              _ => panic!("Wrong message type"),
1348          }
1349      }
1350  
1351      #[test]
1352      fn test_peers_message() {
1353          let peers = vec![
1354              SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 4130),
1355              SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 4130),
1356          ];
1357  
1358          let msg = DeltaMessage::Peers(peers.clone());
1359          let bytes = bincode::serialize(&msg).unwrap();
1360          let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap();
1361  
1362          match decoded {
1363              DeltaMessage::Peers(p) => {
1364                  assert_eq!(p.len(), 2);
1365              }
1366              _ => panic!("Wrong message type"),
1367          }
1368      }
1369  }