p2p.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // SPDX-License-Identifier: Apache-2.0 3 4 //! P2P Networking module for DELTA chain 5 //! 6 //! This module provides peer-to-peer networking functionality for the DELTA chain, 7 //! integrating with the adnet-core TCP stack and implementing message handling 8 //! for consensus, synchronization, and transaction propagation. 9 10 use crate::config::P2PConfig; 11 use crate::consensus::{ 12 BftConsensus, BftError, BftVote, CrossChainAttestation, DeltaBlock, DeltaTransaction, 13 ViewChangeRequest, 14 }; 15 use crate::storage::Storage; 16 17 use adnet_core_tcp::protocols::{Reading, Writing}; 18 use adnet_core_tcp::{Config as TcpConfig, ConnectionSide, P2P, Tcp}; 19 20 use async_trait::async_trait; 21 use bytes::{Buf, BufMut, BytesMut}; 22 use parking_lot::RwLock; 23 use serde::{Deserialize, Serialize}; 24 use std::collections::{HashMap, HashSet}; 25 use std::io; 26 use std::net::SocketAddr; 27 use std::sync::Arc; 28 use thiserror::Error; 29 use tokio::sync::{mpsc, oneshot}; 30 use tokio_util::codec::{Decoder, Encoder}; 31 use tracing::{debug, error, info, trace, warn}; 32 33 // ============================================================================ 34 // P2P Message Types 35 // ============================================================================ 36 37 /// P2P message types for DELTA chain communication 38 #[derive(Clone, Debug, Serialize, Deserialize)] 39 pub enum DeltaMessage { 40 // Consensus messages 41 /// Block proposal from the current round leader 42 BlockProposal(DeltaBlock), 43 /// Pre-vote for a proposed block 44 PreVote(BftVote), 45 /// Pre-commit for a voted block 46 PreCommit(BftVote), 47 /// View change request when leader fails 48 ViewChange(ViewChangeRequest), 49 50 // Sync messages 51 /// Request blocks starting from a height 52 GetBlocks { 53 /// Starting block height 54 start_height: u32, 55 /// Number of blocks to retrieve 56 count: u32, 57 }, 58 /// Response containing requested blocks 59 Blocks(Vec<DeltaBlock>), 60 61 // Transaction propagation 62 /// A new transaction to be propagated 63 Transaction(DeltaTransaction), 64 65 // Attestation relay 66 /// Cross-chain attestation from ALPHA 67 Attestation(CrossChainAttestation), 68 69 // Peer discovery 70 /// Request for peer addresses 71 GetPeers, 72 /// Response containing peer addresses 73 Peers(Vec<SocketAddr>), 74 75 // Handshake 76 /// Initial handshake message 77 Handshake(HandshakeMessage), 78 /// Handshake acknowledgment 79 HandshakeAck(HandshakeAck), 80 81 // Heartbeat 82 /// Ping message for liveness check 83 Ping(u64), 84 /// Pong response to ping 85 Pong(u64), 86 } 87 88 /// Handshake message for initial connection setup 89 #[derive(Clone, Debug, Serialize, Deserialize)] 90 pub struct HandshakeMessage { 91 /// Protocol version 92 pub version: u32, 93 /// Node's listening address 94 pub listen_addr: SocketAddr, 95 /// Node's public key (for identity) 96 pub node_id: Vec<u8>, 97 /// Current block height 98 pub block_height: u32, 99 /// Genesis block hash (for chain validation) 100 pub genesis_hash: [u8; 32], 101 } 102 103 /// Handshake acknowledgment 104 #[derive(Clone, Debug, Serialize, Deserialize)] 105 pub struct HandshakeAck { 106 /// Whether handshake was accepted 107 pub accepted: bool, 108 /// Reason if rejected 109 pub reason: Option<String>, 110 /// Responder's listening address 111 pub listen_addr: SocketAddr, 112 /// Responder's block height 113 pub block_height: u32, 114 } 115 116 // ============================================================================ 117 // Message Codec 118 // ============================================================================ 119 120 /// Protocol version for message compatibility 121 pub const PROTOCOL_VERSION: u32 = 1; 122 123 /// Maximum message size (16 MB) 124 pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; 125 126 /// Codec for encoding/decoding DeltaMessages 127 #[derive(Clone)] 128 pub struct DeltaCodec { 129 /// Maximum allowed message size 130 max_message_size: usize, 131 } 132 133 impl Default for DeltaCodec { 134 fn default() -> Self { 135 Self { 136 max_message_size: MAX_MESSAGE_SIZE, 137 } 138 } 139 } 140 141 impl DeltaCodec { 142 /// Create a new codec with custom max message size 143 pub fn new(max_message_size: usize) -> Self { 144 Self { max_message_size } 145 } 146 } 147 148 impl Decoder for DeltaCodec { 149 type Item = DeltaMessage; 150 type Error = io::Error; 151 152 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { 153 // Need at least 4 bytes for length prefix 154 if src.len() < 4 { 155 return Ok(None); 156 } 157 158 // Read length prefix (big-endian u32) 159 let len = u32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize; 160 161 // Validate message size 162 if len > self.max_message_size { 163 return Err(io::Error::new( 164 io::ErrorKind::InvalidData, 165 format!( 166 "Message size {} exceeds maximum {}", 167 len, self.max_message_size 168 ), 169 )); 170 } 171 172 // Check if we have the full message 173 if src.len() < 4 + len { 174 // Reserve space for the full message 175 src.reserve(4 + len - src.len()); 176 return Ok(None); 177 } 178 179 // Consume length prefix 180 src.advance(4); 181 182 // Extract message bytes 183 let msg_bytes = src.split_to(len); 184 185 // Deserialize message 186 let message: DeltaMessage = bincode::deserialize(&msg_bytes).map_err(|e| { 187 io::Error::new( 188 io::ErrorKind::InvalidData, 189 format!("Failed to deserialize message: {}", e), 190 ) 191 })?; 192 193 Ok(Some(message)) 194 } 195 } 196 197 impl Encoder<DeltaMessage> for DeltaCodec { 198 type Error = io::Error; 199 200 fn encode(&mut self, item: DeltaMessage, dst: &mut BytesMut) -> Result<(), Self::Error> { 201 // Serialize message 202 let msg_bytes = bincode::serialize(&item).map_err(|e| { 203 io::Error::new( 204 io::ErrorKind::InvalidData, 205 format!("Failed to serialize message: {}", e), 206 ) 207 })?; 208 209 // Validate size 210 if msg_bytes.len() > self.max_message_size { 211 return Err(io::Error::new( 212 io::ErrorKind::InvalidData, 213 format!( 214 "Serialized message size {} exceeds maximum {}", 215 msg_bytes.len(), 216 self.max_message_size 217 ), 218 )); 219 } 220 221 // Write length prefix (big-endian u32) 222 dst.reserve(4 + msg_bytes.len()); 223 dst.put_u32(msg_bytes.len() as u32); 224 dst.extend_from_slice(&msg_bytes); 225 226 Ok(()) 227 } 228 } 229 230 // ============================================================================ 231 // P2P Error Types 232 // ============================================================================ 233 234 /// Errors that can occur in P2P operations 235 #[derive(Error, Debug)] 236 pub enum P2PError { 237 /// IO error 238 #[error("IO error: {0}")] 239 Io(#[from] io::Error), 240 241 /// Connection error 242 #[error("Connection error: {0}")] 243 Connection(String), 244 245 /// Peer not found 246 #[error("Peer not found: {0}")] 247 PeerNotFound(SocketAddr), 248 249 /// Invalid message 250 #[error("Invalid message: {0}")] 251 InvalidMessage(String), 252 253 /// Handshake failed 254 #[error("Handshake failed: {0}")] 255 HandshakeFailed(String), 256 257 /// Protocol version mismatch 258 #[error("Protocol version mismatch: expected {expected}, got {got}")] 259 VersionMismatch { expected: u32, got: u32 }, 260 261 /// Genesis hash mismatch 262 #[error("Genesis hash mismatch")] 263 GenesisMismatch, 264 265 /// Maximum peers reached 266 #[error("Maximum peers reached: {0}")] 267 MaxPeersReached(usize), 268 269 /// BFT consensus error 270 #[error("BFT error: {0}")] 271 Bft(#[from] BftError), 272 273 /// Peer already connected 274 #[error("Already connected to peer: {0}")] 275 AlreadyConnected(SocketAddr), 276 277 /// Send error 278 #[error("Failed to send message: {0}")] 279 SendError(String), 280 } 281 282 // ============================================================================ 283 // Peer State 284 // ============================================================================ 285 286 /// State of a connected peer 287 #[derive(Clone, Debug)] 288 pub struct PeerState { 289 /// Peer's listening address 290 pub listen_addr: SocketAddr, 291 /// Peer's connected address (may differ from listen) 292 pub connected_addr: SocketAddr, 293 /// Peer's node ID 294 pub node_id: Vec<u8>, 295 /// Peer's current block height 296 pub block_height: u32, 297 /// Connection side (initiator or responder) 298 pub side: ConnectionSide, 299 /// Last seen timestamp (unix millis) 300 pub last_seen: u64, 301 /// Number of messages received from this peer 302 pub messages_received: u64, 303 /// Number of messages sent to this peer 304 pub messages_sent: u64, 305 /// Whether handshake is complete 306 pub handshake_complete: bool, 307 } 308 309 impl PeerState { 310 /// Create a new peer state 311 pub fn new(connected_addr: SocketAddr, side: ConnectionSide) -> Self { 312 Self { 313 listen_addr: connected_addr, 314 connected_addr, 315 node_id: Vec::new(), 316 block_height: 0, 317 side, 318 last_seen: current_time_millis(), 319 messages_received: 0, 320 messages_sent: 0, 321 handshake_complete: false, 322 } 323 } 324 325 /// Update last seen timestamp 326 pub fn touch(&mut self) { 327 self.last_seen = current_time_millis(); 328 } 329 330 /// Update from handshake 331 pub fn update_from_handshake(&mut self, msg: &HandshakeMessage) { 332 self.listen_addr = msg.listen_addr; 333 self.node_id = msg.node_id.clone(); 334 self.block_height = msg.block_height; 335 self.handshake_complete = true; 336 self.touch(); 337 } 338 } 339 340 /// Get current time in milliseconds 341 fn current_time_millis() -> u64 { 342 std::time::SystemTime::now() 343 .duration_since(std::time::UNIX_EPOCH) 344 .unwrap_or_default() 345 .as_millis() as u64 346 } 347 348 // ============================================================================ 349 // DeltaP2P Implementation 350 // ============================================================================ 351 352 /// P2P network handler for DELTA chain 353 #[derive(Clone)] 354 pub struct DeltaP2P { 355 inner: Arc<InnerDeltaP2P>, 356 } 357 358 struct InnerDeltaP2P { 359 /// TCP stack from adnet-core 360 tcp: Tcp, 361 /// P2P configuration 362 config: P2PConfig, 363 /// Connected peers state 364 peers: RwLock<HashMap<SocketAddr, PeerState>>, 365 /// Known peer addresses (for discovery) 366 known_peers: RwLock<HashSet<SocketAddr>>, 367 /// BFT consensus handler (if validator) 368 consensus: RwLock<Option<BftConsensus>>, 369 /// Channel for incoming blocks (from consensus) 370 block_sender: mpsc::Sender<DeltaBlock>, 371 /// Channel for incoming transactions 372 tx_sender: mpsc::Sender<DeltaTransaction>, 373 /// Channel for incoming attestations 374 attestation_sender: mpsc::Sender<CrossChainAttestation>, 375 /// Node's listening address 376 listen_addr: RwLock<Option<SocketAddr>>, 377 /// Node's identity (public key) 378 node_id: Vec<u8>, 379 /// Current block height 380 block_height: RwLock<u32>, 381 /// Genesis block hash 382 genesis_hash: [u8; 32], 383 /// Storage reference for block sync 384 storage: RwLock<Option<Arc<Storage>>>, 385 } 386 387 impl DeltaP2P { 388 /// Create a new DeltaP2P instance 389 pub fn new( 390 config: P2PConfig, 391 block_sender: mpsc::Sender<DeltaBlock>, 392 tx_sender: mpsc::Sender<DeltaTransaction>, 393 attestation_sender: mpsc::Sender<CrossChainAttestation>, 394 ) -> Self { 395 // Create TCP configuration 396 let tcp_config = TcpConfig { 397 name: Some("delta-p2p".to_string()), 398 listener_ip: Some(config.listen_addr.ip()), 399 desired_listening_port: Some(config.listen_addr.port()), 400 allow_random_port: false, 401 max_connections: config.max_connections, 402 connection_timeout_ms: 5000, 403 ..Default::default() 404 }; 405 406 let tcp = Tcp::new(tcp_config); 407 408 // Generate a simple node ID (in production, use proper key generation) 409 let node_id = { 410 let mut hasher = blake3::Hasher::new(); 411 hasher.update(config.listen_addr.ip().to_string().as_bytes()); 412 hasher.update(&config.listen_addr.port().to_le_bytes()); 413 hasher.update(¤t_time_millis().to_le_bytes()); 414 hasher.finalize().as_bytes().to_vec() 415 }; 416 417 Self { 418 inner: Arc::new(InnerDeltaP2P { 419 tcp, 420 config, 421 peers: RwLock::new(HashMap::new()), 422 known_peers: RwLock::new(HashSet::new()), 423 consensus: RwLock::new(None), 424 block_sender, 425 tx_sender, 426 attestation_sender, 427 listen_addr: RwLock::new(None), 428 node_id, 429 block_height: RwLock::new(0), 430 genesis_hash: [0u8; 32], // Default genesis hash 431 storage: RwLock::new(None), 432 }), 433 } 434 } 435 436 /// Set the storage reference for block sync 437 pub fn set_storage(&self, storage: Arc<Storage>) { 438 *self.inner.storage.write() = Some(storage); 439 } 440 441 /// Start the P2P network 442 pub async fn start(&self) -> Result<SocketAddr, P2PError> { 443 info!("Starting DELTA P2P network..."); 444 445 // Enable TCP listener 446 let listen_addr = self.inner.tcp.enable_listener().await?; 447 *self.inner.listen_addr.write() = Some(listen_addr); 448 info!("P2P listening on {}", listen_addr); 449 450 // Add bootstrap peers to known peers 451 { 452 let mut known = self.inner.known_peers.write(); 453 for peer in &self.inner.config.bootstrap_peers { 454 known.insert(*peer); 455 } 456 } 457 458 // Connect to bootstrap peers 459 self.connect_to_bootstrap_peers().await; 460 461 Ok(listen_addr) 462 } 463 464 /// Connect to bootstrap peers 465 async fn connect_to_bootstrap_peers(&self) { 466 let bootstrap_peers: Vec<SocketAddr> = self.inner.config.bootstrap_peers.to_vec(); 467 468 for peer_addr in bootstrap_peers { 469 if let Err(e) = self.connect_peer(peer_addr).await { 470 warn!("Failed to connect to bootstrap peer {}: {}", peer_addr, e); 471 } 472 } 473 } 474 475 /// Connect to a peer 476 pub async fn connect_peer(&self, addr: SocketAddr) -> Result<(), P2PError> { 477 // Check if already connected 478 if self.inner.peers.read().contains_key(&addr) { 479 return Err(P2PError::AlreadyConnected(addr)); 480 } 481 482 // Check max peers 483 let peer_count = self.inner.peers.read().len(); 484 if peer_count >= self.inner.config.max_connections as usize { 485 return Err(P2PError::MaxPeersReached(peer_count)); 486 } 487 488 info!("Connecting to peer: {}", addr); 489 490 // Attempt TCP connection 491 self.inner 492 .tcp 493 .connect(addr) 494 .await 495 .map_err(|e| P2PError::Connection(format!("TCP connect failed: {}", e)))?; 496 497 // Add to peers with initial state 498 { 499 let mut peers = self.inner.peers.write(); 500 peers.insert(addr, PeerState::new(addr, ConnectionSide::Initiator)); 501 } 502 503 // Add to known peers 504 self.inner.known_peers.write().insert(addr); 505 506 debug!("Connected to peer: {}", addr); 507 Ok(()) 508 } 509 510 /// Disconnect from a peer 511 pub async fn disconnect_peer(&self, addr: SocketAddr) -> bool { 512 let disconnected = self.inner.tcp.disconnect(addr).await; 513 if disconnected { 514 self.inner.peers.write().remove(&addr); 515 info!("Disconnected from peer: {}", addr); 516 } 517 disconnected 518 } 519 520 /// Add a peer (called when accepting inbound connection) 521 pub fn add_peer(&self, addr: SocketAddr, side: ConnectionSide) { 522 let mut peers = self.inner.peers.write(); 523 peers.insert(addr, PeerState::new(addr, side)); 524 self.inner.known_peers.write().insert(addr); 525 debug!("Added peer: {} (side: {:?})", addr, side); 526 } 527 528 /// Remove a peer 529 pub fn remove_peer(&self, addr: SocketAddr) { 530 self.inner.peers.write().remove(&addr); 531 debug!("Removed peer: {}", addr); 532 } 533 534 /// Get all connected peers 535 pub fn get_peers(&self) -> Vec<SocketAddr> { 536 self.inner.peers.read().keys().copied().collect() 537 } 538 539 /// Get peer state 540 pub fn get_peer_state(&self, addr: &SocketAddr) -> Option<PeerState> { 541 self.inner.peers.read().get(addr).cloned() 542 } 543 544 /// Get number of connected peers 545 pub fn peer_count(&self) -> usize { 546 self.inner.peers.read().len() 547 } 548 549 /// Set the BFT consensus handler 550 pub fn set_consensus(&self, consensus: BftConsensus) { 551 *self.inner.consensus.write() = Some(consensus); 552 } 553 554 /// Update current block height 555 pub fn set_block_height(&self, height: u32) { 556 *self.inner.block_height.write() = height; 557 } 558 559 /// Get current block height 560 pub fn block_height(&self) -> u32 { 561 *self.inner.block_height.read() 562 } 563 564 // ======================================================================== 565 // Broadcasting Methods 566 // ======================================================================== 567 568 /// Broadcast a block proposal to all peers 569 pub fn broadcast_block(&self, block: DeltaBlock) -> Result<(), P2PError> { 570 let message = DeltaMessage::BlockProposal(block); 571 self.broadcast_message(message) 572 } 573 574 /// Broadcast a vote (pre-vote or pre-commit) to all peers 575 pub fn broadcast_vote(&self, vote: BftVote) -> Result<(), P2PError> { 576 let message = match vote.vote_type { 577 crate::consensus::VoteType::PreVote => DeltaMessage::PreVote(vote), 578 crate::consensus::VoteType::PreCommit => DeltaMessage::PreCommit(vote), 579 }; 580 self.broadcast_message(message) 581 } 582 583 /// Broadcast a transaction to all peers 584 pub fn broadcast_transaction(&self, tx: DeltaTransaction) -> Result<(), P2PError> { 585 let message = DeltaMessage::Transaction(tx); 586 self.broadcast_message(message) 587 } 588 589 /// Broadcast an attestation to all peers 590 pub fn broadcast_attestation( 591 &self, 592 attestation: CrossChainAttestation, 593 ) -> Result<(), P2PError> { 594 let message = DeltaMessage::Attestation(attestation); 595 self.broadcast_message(message) 596 } 597 598 /// Broadcast a view change request 599 pub fn broadcast_view_change(&self, request: ViewChangeRequest) -> Result<(), P2PError> { 600 let message = DeltaMessage::ViewChange(request); 601 self.broadcast_message(message) 602 } 603 604 /// Send a message to a specific peer 605 pub fn send_to_peer( 606 &self, 607 addr: SocketAddr, 608 message: DeltaMessage, 609 ) -> Result<oneshot::Receiver<io::Result<()>>, P2PError> { 610 if !self.inner.peers.read().contains_key(&addr) { 611 return Err(P2PError::PeerNotFound(addr)); 612 } 613 614 self.unicast(addr, message) 615 .map_err(|e| P2PError::SendError(format!("Failed to send to {}: {}", addr, e))) 616 } 617 618 /// Request blocks from a peer 619 pub fn request_blocks( 620 &self, 621 peer: SocketAddr, 622 start_height: u32, 623 count: u32, 624 ) -> Result<oneshot::Receiver<io::Result<()>>, P2PError> { 625 let message = DeltaMessage::GetBlocks { 626 start_height, 627 count, 628 }; 629 self.send_to_peer(peer, message) 630 } 631 632 /// Initiate block sync with a peer 633 /// Requests blocks from our current height+1 to peer's height 634 pub fn sync_with_peer(&self, peer: SocketAddr, peer_height: u32) -> Result<(), P2PError> { 635 let our_height = *self.inner.block_height.read(); 636 637 if peer_height <= our_height { 638 debug!( 639 "No sync needed with {} (peer height {} <= our height {})", 640 peer, peer_height, our_height 641 ); 642 return Ok(()); 643 } 644 645 let blocks_behind = peer_height - our_height; 646 info!( 647 "Syncing with {}: {} blocks behind (our height {}, peer height {})", 648 peer, blocks_behind, our_height, peer_height 649 ); 650 651 // Request blocks in batches of 100 652 let batch_size = 100u32; 653 let start_height = our_height + 1; 654 let count = blocks_behind.min(batch_size); 655 656 self.request_blocks(peer, start_height, count)?; 657 Ok(()) 658 } 659 660 /// Request peers from a connected peer 661 pub fn request_peers( 662 &self, 663 peer: SocketAddr, 664 ) -> Result<oneshot::Receiver<io::Result<()>>, P2PError> { 665 self.send_to_peer(peer, DeltaMessage::GetPeers) 666 } 667 668 /// Broadcast a message to all connected peers 669 fn broadcast_message(&self, message: DeltaMessage) -> Result<(), P2PError> { 670 self.broadcast(message) 671 .map_err(|e| P2PError::SendError(format!("Broadcast failed: {}", e))) 672 } 673 674 // ======================================================================== 675 // Message Handling 676 // ======================================================================== 677 678 /// Handle an incoming message 679 pub async fn handle_message( 680 &self, 681 source: SocketAddr, 682 message: DeltaMessage, 683 ) -> Result<(), P2PError> { 684 // Update peer last seen 685 if let Some(peer) = self.inner.peers.write().get_mut(&source) { 686 peer.touch(); 687 peer.messages_received += 1; 688 } 689 690 match message { 691 DeltaMessage::BlockProposal(block) => self.handle_block_proposal(source, block).await, 692 DeltaMessage::PreVote(vote) => self.handle_prevote(source, vote).await, 693 DeltaMessage::PreCommit(vote) => self.handle_precommit(source, vote).await, 694 DeltaMessage::ViewChange(request) => self.handle_view_change(source, request).await, 695 DeltaMessage::GetBlocks { 696 start_height, 697 count, 698 } => self.handle_get_blocks(source, start_height, count).await, 699 DeltaMessage::Blocks(blocks) => self.handle_blocks(source, blocks).await, 700 DeltaMessage::Transaction(tx) => self.handle_transaction(source, tx).await, 701 DeltaMessage::Attestation(attestation) => { 702 self.handle_attestation(source, attestation).await 703 } 704 DeltaMessage::GetPeers => self.handle_get_peers(source).await, 705 DeltaMessage::Peers(peers) => self.handle_peers(source, peers).await, 706 DeltaMessage::Handshake(msg) => self.handle_handshake(source, msg).await, 707 DeltaMessage::HandshakeAck(ack) => self.handle_handshake_ack(source, ack).await, 708 DeltaMessage::Ping(nonce) => self.handle_ping(source, nonce).await, 709 DeltaMessage::Pong(nonce) => self.handle_pong(source, nonce).await, 710 } 711 } 712 713 async fn handle_block_proposal( 714 &self, 715 source: SocketAddr, 716 block: DeltaBlock, 717 ) -> Result<(), P2PError> { 718 debug!( 719 "Received block proposal height={} from {}", 720 block.height, source 721 ); 722 723 // Forward to consensus if available 724 if let Some(ref mut consensus) = *self.inner.consensus.write() { 725 let proposer = block.producer.clone(); 726 match consensus.handle_proposal(block.clone(), &proposer) { 727 Ok(()) => { 728 // Proposal accepted, send to block channel 729 if let Err(e) = self.inner.block_sender.try_send(block) { 730 error!("Failed to forward block: {}", e); 731 } 732 } 733 Err(e) => { 734 warn!("Block proposal rejected: {}", e); 735 return Err(P2PError::Bft(e)); 736 } 737 } 738 } else { 739 // No consensus, just forward the block 740 if let Err(e) = self.inner.block_sender.try_send(block) { 741 error!("Failed to forward block: {}", e); 742 } 743 } 744 745 Ok(()) 746 } 747 748 async fn handle_prevote(&self, source: SocketAddr, vote: BftVote) -> Result<(), P2PError> { 749 debug!( 750 "Received pre-vote height={} round={} from {}", 751 vote.height, vote.round, source 752 ); 753 754 if let Some(ref mut consensus) = *self.inner.consensus.write() { 755 match consensus.handle_prevote(vote) { 756 Ok(quorum_reached) => { 757 if quorum_reached { 758 info!("Pre-vote quorum reached"); 759 // Create and broadcast pre-commit 760 if let Some(precommit) = consensus.create_precommit() 761 && let Err(e) = self.broadcast_vote(precommit) 762 { 763 error!("Failed to broadcast pre-commit: {}", e); 764 } 765 } 766 } 767 Err(e) => { 768 warn!("Pre-vote handling error: {}", e); 769 } 770 } 771 } 772 773 Ok(()) 774 } 775 776 async fn handle_precommit(&self, source: SocketAddr, vote: BftVote) -> Result<(), P2PError> { 777 debug!( 778 "Received pre-commit height={} round={} from {}", 779 vote.height, vote.round, source 780 ); 781 782 if let Some(ref mut consensus) = *self.inner.consensus.write() { 783 match consensus.handle_precommit(vote) { 784 Ok(Some(committed_block)) => { 785 info!("Block committed at height {}", committed_block.height); 786 // Forward committed block 787 if let Err(e) = self.inner.block_sender.try_send(committed_block) { 788 error!("Failed to forward committed block: {}", e); 789 } 790 } 791 Ok(None) => { 792 // Waiting for more pre-commits 793 } 794 Err(e) => { 795 warn!("Pre-commit handling error: {}", e); 796 } 797 } 798 } 799 800 Ok(()) 801 } 802 803 async fn handle_view_change( 804 &self, 805 source: SocketAddr, 806 request: ViewChangeRequest, 807 ) -> Result<(), P2PError> { 808 debug!( 809 "Received view change request height={} new_round={} from {}", 810 request.height, request.new_round, source 811 ); 812 813 if let Some(ref mut consensus) = *self.inner.consensus.write() { 814 match consensus.handle_view_change(request) { 815 Ok(triggered) => { 816 if triggered { 817 info!("View change triggered"); 818 } 819 } 820 Err(e) => { 821 warn!("View change handling error: {}", e); 822 } 823 } 824 } 825 826 Ok(()) 827 } 828 829 async fn handle_get_blocks( 830 &self, 831 source: SocketAddr, 832 start_height: u32, 833 count: u32, 834 ) -> Result<(), P2PError> { 835 debug!( 836 "Received GetBlocks request start={} count={} from {}", 837 start_height, count, source 838 ); 839 840 // Retrieve blocks from storage 841 let blocks = { 842 let storage_guard = self.inner.storage.read(); 843 if let Some(ref storage) = *storage_guard { 844 let mut blocks = Vec::new(); 845 let max_blocks = count.min(100); // Cap at 100 blocks per request 846 847 for height in start_height..(start_height + max_blocks) { 848 match storage.get_block(height) { 849 Ok(block) => blocks.push(block), 850 Err(_) => break, // Stop at first missing block 851 } 852 } 853 854 debug!( 855 "Retrieved {} blocks (heights {}-{}) for {}", 856 blocks.len(), 857 start_height, 858 start_height + blocks.len() as u32 - 1, 859 source 860 ); 861 blocks 862 } else { 863 warn!("GetBlocks request but storage not available"); 864 Vec::new() 865 } 866 }; 867 868 let _ = self.send_to_peer(source, DeltaMessage::Blocks(blocks)); 869 Ok(()) 870 } 871 872 async fn handle_blocks( 873 &self, 874 source: SocketAddr, 875 blocks: Vec<DeltaBlock>, 876 ) -> Result<(), P2PError> { 877 debug!("Received {} blocks from {}", blocks.len(), source); 878 879 // Forward blocks to consensus/sync 880 for block in blocks { 881 if let Err(e) = self.inner.block_sender.try_send(block) { 882 error!("Failed to forward synced block: {}", e); 883 } 884 } 885 886 Ok(()) 887 } 888 889 async fn handle_transaction( 890 &self, 891 source: SocketAddr, 892 tx: DeltaTransaction, 893 ) -> Result<(), P2PError> { 894 trace!("Received transaction from {}", source); 895 896 // Forward to transaction handler 897 if let Err(e) = self.inner.tx_sender.try_send(tx) { 898 error!("Failed to forward transaction: {}", e); 899 } 900 901 Ok(()) 902 } 903 904 async fn handle_attestation( 905 &self, 906 source: SocketAddr, 907 attestation: CrossChainAttestation, 908 ) -> Result<(), P2PError> { 909 debug!( 910 "Received attestation alpha_height={} from {}", 911 attestation.alpha_height, source 912 ); 913 914 // Forward to attestation handler 915 if let Err(e) = self.inner.attestation_sender.try_send(attestation) { 916 error!("Failed to forward attestation: {}", e); 917 } 918 919 Ok(()) 920 } 921 922 async fn handle_get_peers(&self, source: SocketAddr) -> Result<(), P2PError> { 923 debug!("Received GetPeers request from {}", source); 924 925 // Respond with known peers (excluding the requester) 926 let peers: Vec<SocketAddr> = self 927 .inner 928 .peers 929 .read() 930 .keys() 931 .filter(|addr| **addr != source) 932 .copied() 933 .take(20) // Limit response size 934 .collect(); 935 936 let _ = self.send_to_peer(source, DeltaMessage::Peers(peers)); 937 938 Ok(()) 939 } 940 941 async fn handle_peers( 942 &self, 943 source: SocketAddr, 944 peers: Vec<SocketAddr>, 945 ) -> Result<(), P2PError> { 946 debug!("Received {} peer addresses from {}", peers.len(), source); 947 948 // Add to known peers 949 let mut known = self.inner.known_peers.write(); 950 for peer in peers { 951 known.insert(peer); 952 } 953 954 Ok(()) 955 } 956 957 async fn handle_handshake( 958 &self, 959 source: SocketAddr, 960 msg: HandshakeMessage, 961 ) -> Result<(), P2PError> { 962 debug!( 963 "Received handshake from {} (version={})", 964 source, msg.version 965 ); 966 967 // Validate protocol version 968 if msg.version != PROTOCOL_VERSION { 969 let ack = HandshakeAck { 970 accepted: false, 971 reason: Some(format!( 972 "Protocol version mismatch: expected {}, got {}", 973 PROTOCOL_VERSION, msg.version 974 )), 975 listen_addr: self.inner.listen_addr.read().unwrap_or(source), 976 block_height: *self.inner.block_height.read(), 977 }; 978 let _ = self.send_to_peer(source, DeltaMessage::HandshakeAck(ack)); 979 return Err(P2PError::VersionMismatch { 980 expected: PROTOCOL_VERSION, 981 got: msg.version, 982 }); 983 } 984 985 // Validate genesis hash 986 if msg.genesis_hash != self.inner.genesis_hash { 987 let ack = HandshakeAck { 988 accepted: false, 989 reason: Some("Genesis hash mismatch".to_string()), 990 listen_addr: self.inner.listen_addr.read().unwrap_or(source), 991 block_height: *self.inner.block_height.read(), 992 }; 993 let _ = self.send_to_peer(source, DeltaMessage::HandshakeAck(ack)); 994 return Err(P2PError::GenesisMismatch); 995 } 996 997 // Update peer state 998 if let Some(peer) = self.inner.peers.write().get_mut(&source) { 999 peer.update_from_handshake(&msg); 1000 } 1001 1002 // Send acknowledgment 1003 let ack = HandshakeAck { 1004 accepted: true, 1005 reason: None, 1006 listen_addr: self.inner.listen_addr.read().unwrap_or(source), 1007 block_height: *self.inner.block_height.read(), 1008 }; 1009 let _ = self.send_to_peer(source, DeltaMessage::HandshakeAck(ack)); 1010 1011 info!("Handshake complete with {}", source); 1012 Ok(()) 1013 } 1014 1015 async fn handle_handshake_ack( 1016 &self, 1017 source: SocketAddr, 1018 ack: HandshakeAck, 1019 ) -> Result<(), P2PError> { 1020 if ack.accepted { 1021 debug!("Handshake accepted by {}", source); 1022 1023 // Update peer state 1024 if let Some(peer) = self.inner.peers.write().get_mut(&source) { 1025 peer.listen_addr = ack.listen_addr; 1026 peer.block_height = ack.block_height; 1027 peer.handshake_complete = true; 1028 } 1029 1030 Ok(()) 1031 } else { 1032 let reason = ack.reason.unwrap_or_else(|| "Unknown".to_string()); 1033 warn!("Handshake rejected by {}: {}", source, reason); 1034 Err(P2PError::HandshakeFailed(reason)) 1035 } 1036 } 1037 1038 async fn handle_ping(&self, source: SocketAddr, nonce: u64) -> Result<(), P2PError> { 1039 trace!("Received ping from {} with nonce {}", source, nonce); 1040 let _ = self.send_to_peer(source, DeltaMessage::Pong(nonce)); 1041 Ok(()) 1042 } 1043 1044 async fn handle_pong(&self, source: SocketAddr, nonce: u64) -> Result<(), P2PError> { 1045 trace!("Received pong from {} with nonce {}", source, nonce); 1046 // Could be used for latency measurement 1047 Ok(()) 1048 } 1049 1050 /// Initiate handshake with a peer 1051 pub async fn send_handshake(&self, peer: SocketAddr) -> Result<(), P2PError> { 1052 let handshake = HandshakeMessage { 1053 version: PROTOCOL_VERSION, 1054 listen_addr: self.inner.listen_addr.read().unwrap_or(peer), 1055 node_id: self.inner.node_id.clone(), 1056 block_height: *self.inner.block_height.read(), 1057 genesis_hash: self.inner.genesis_hash, 1058 }; 1059 self.send_to_peer(peer, DeltaMessage::Handshake(handshake))? 1060 .await 1061 .map_err(|e| P2PError::Connection(format!("Handshake recv failed: {}", e)))? 1062 .map_err(|e| P2PError::Connection(format!("Handshake IO failed: {}", e)))?; 1063 Ok(()) 1064 } 1065 1066 /// Shutdown the P2P network 1067 pub async fn shutdown(&self) { 1068 info!("Shutting down DELTA P2P network..."); 1069 self.inner.tcp.shut_down().await; 1070 } 1071 } 1072 1073 // ============================================================================ 1074 // P2P Trait Implementation 1075 // ============================================================================ 1076 1077 impl P2P for DeltaP2P { 1078 fn tcp(&self) -> &Tcp { 1079 &self.inner.tcp 1080 } 1081 } 1082 1083 // ============================================================================ 1084 // Reading Protocol Implementation 1085 // ============================================================================ 1086 1087 #[async_trait] 1088 impl Reading for DeltaP2P { 1089 type Message = DeltaMessage; 1090 type Codec = DeltaCodec; 1091 1092 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 1093 DeltaCodec::default() 1094 } 1095 1096 async fn process_message(&self, source: SocketAddr, message: Self::Message) -> io::Result<()> { 1097 self.handle_message(source, message) 1098 .await 1099 .map_err(|e| io::Error::other(e.to_string())) 1100 } 1101 } 1102 1103 // ============================================================================ 1104 // Writing Protocol Implementation 1105 // ============================================================================ 1106 1107 #[async_trait] 1108 impl Writing for DeltaP2P { 1109 type Message = DeltaMessage; 1110 type Codec = DeltaCodec; 1111 1112 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { 1113 DeltaCodec::default() 1114 } 1115 } 1116 1117 // ============================================================================ 1118 // Unit Tests 1119 // ============================================================================ 1120 1121 #[cfg(test)] 1122 mod tests { 1123 use super::*; 1124 use std::net::{IpAddr, Ipv4Addr}; 1125 1126 fn test_config() -> P2PConfig { 1127 P2PConfig { 1128 listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), 1129 bootstrap_peers: vec![], 1130 max_connections: 10, 1131 } 1132 } 1133 1134 #[test] 1135 fn test_delta_message_serialization() { 1136 // Test GetBlocks message 1137 let msg = DeltaMessage::GetBlocks { 1138 start_height: 100, 1139 count: 10, 1140 }; 1141 let bytes = bincode::serialize(&msg).unwrap(); 1142 let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap(); 1143 1144 match decoded { 1145 DeltaMessage::GetBlocks { 1146 start_height, 1147 count, 1148 } => { 1149 assert_eq!(start_height, 100); 1150 assert_eq!(count, 10); 1151 } 1152 _ => panic!("Wrong message type"), 1153 } 1154 } 1155 1156 #[test] 1157 fn test_delta_codec_encode_decode() { 1158 let mut codec = DeltaCodec::default(); 1159 let msg = DeltaMessage::Ping(12345); 1160 1161 // Encode 1162 let mut buf = BytesMut::new(); 1163 codec.encode(msg.clone(), &mut buf).unwrap(); 1164 1165 // Decode 1166 let decoded = codec.decode(&mut buf).unwrap().unwrap(); 1167 1168 match decoded { 1169 DeltaMessage::Ping(nonce) => assert_eq!(nonce, 12345), 1170 _ => panic!("Wrong message type"), 1171 } 1172 } 1173 1174 #[test] 1175 fn test_delta_codec_partial_read() { 1176 let mut codec = DeltaCodec::default(); 1177 let msg = DeltaMessage::GetPeers; 1178 1179 // Encode 1180 let mut buf = BytesMut::new(); 1181 codec.encode(msg, &mut buf).unwrap(); 1182 1183 // Simulate partial read (only length prefix) 1184 let mut partial = buf.split_to(4); 1185 1186 // Should return None (waiting for more data) 1187 assert!(codec.decode(&mut partial).unwrap().is_none()); 1188 } 1189 1190 #[test] 1191 fn test_delta_codec_oversized_message() { 1192 let mut codec = DeltaCodec::new(100); // Small max size 1193 1194 // Create a message that will serialize to more than 100 bytes 1195 let large_data = vec![0u8; 200]; 1196 let tx = DeltaTransaction { 1197 id: [0u8; 32], 1198 tx_type: crate::consensus::TransactionType::Trading, 1199 sender: Some("test".to_string()), 1200 sender_pubkey: Some(large_data), 1201 nonce: 0, 1202 amount: 0, 1203 fee_limit: 1000, 1204 data: vec![], 1205 signature: vec![], 1206 }; 1207 let msg = DeltaMessage::Transaction(tx); 1208 1209 let mut buf = BytesMut::new(); 1210 let result = codec.encode(msg, &mut buf); 1211 assert!(result.is_err()); 1212 } 1213 1214 #[test] 1215 fn test_peer_state() { 1216 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4130); 1217 let mut peer = PeerState::new(addr, ConnectionSide::Initiator); 1218 1219 assert_eq!(peer.connected_addr, addr); 1220 assert!(!peer.handshake_complete); 1221 assert_eq!(peer.messages_received, 0); 1222 1223 // Test handshake update 1224 let handshake = HandshakeMessage { 1225 version: PROTOCOL_VERSION, 1226 listen_addr: addr, 1227 node_id: vec![1, 2, 3], 1228 block_height: 100, 1229 genesis_hash: [0u8; 32], 1230 }; 1231 peer.update_from_handshake(&handshake); 1232 1233 assert!(peer.handshake_complete); 1234 assert_eq!(peer.block_height, 100); 1235 assert_eq!(peer.node_id, vec![1, 2, 3]); 1236 } 1237 1238 #[test] 1239 fn test_handshake_message() { 1240 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4130); 1241 let msg = HandshakeMessage { 1242 version: PROTOCOL_VERSION, 1243 listen_addr: addr, 1244 node_id: vec![1, 2, 3, 4], 1245 block_height: 50, 1246 genesis_hash: [42u8; 32], 1247 }; 1248 1249 let bytes = bincode::serialize(&msg).unwrap(); 1250 let decoded: HandshakeMessage = bincode::deserialize(&bytes).unwrap(); 1251 1252 assert_eq!(decoded.version, PROTOCOL_VERSION); 1253 assert_eq!(decoded.listen_addr, addr); 1254 assert_eq!(decoded.block_height, 50); 1255 } 1256 1257 #[tokio::test] 1258 async fn test_deltap2p_creation() { 1259 let config = test_config(); 1260 let (block_tx, _) = mpsc::channel(100); 1261 let (tx_tx, _) = mpsc::channel(100); 1262 let (att_tx, _) = mpsc::channel(100); 1263 1264 let p2p = DeltaP2P::new(config.clone(), block_tx, tx_tx, att_tx); 1265 1266 assert_eq!(p2p.peer_count(), 0); 1267 assert_eq!(p2p.block_height(), 0); 1268 } 1269 1270 #[test] 1271 fn test_protocol_version() { 1272 assert_eq!(PROTOCOL_VERSION, 1); 1273 } 1274 1275 #[test] 1276 fn test_max_message_size() { 1277 assert_eq!(MAX_MESSAGE_SIZE, 16 * 1024 * 1024); 1278 } 1279 1280 #[test] 1281 fn test_vote_message_types() { 1282 let vote = BftVote { 1283 height: 10, 1284 round: 0, 1285 block_hash: Some([1u8; 32]), 1286 validator: "validator1".to_string(), 1287 signature: vec![0u8; 64], 1288 vote_type: crate::consensus::VoteType::PreVote, 1289 }; 1290 1291 let msg = DeltaMessage::PreVote(vote.clone()); 1292 let bytes = bincode::serialize(&msg).unwrap(); 1293 let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap(); 1294 1295 match decoded { 1296 DeltaMessage::PreVote(v) => { 1297 assert_eq!(v.height, 10); 1298 assert_eq!(v.round, 0); 1299 assert_eq!(v.validator, "validator1"); 1300 } 1301 _ => panic!("Wrong message type"), 1302 } 1303 } 1304 1305 #[test] 1306 fn test_block_proposal_message() { 1307 let block = DeltaBlock { 1308 height: 100, 1309 previous_hash: [0u8; 32], 1310 timestamp: 1234567890, 1311 producer: "producer1".to_string(), 1312 transactions: vec![], 1313 attestations: vec![], 1314 hash: [1u8; 32], 1315 }; 1316 1317 let msg = DeltaMessage::BlockProposal(block.clone()); 1318 let bytes = bincode::serialize(&msg).unwrap(); 1319 let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap(); 1320 1321 match decoded { 1322 DeltaMessage::BlockProposal(b) => { 1323 assert_eq!(b.height, 100); 1324 assert_eq!(b.producer, "producer1"); 1325 } 1326 _ => panic!("Wrong message type"), 1327 } 1328 } 1329 1330 #[test] 1331 fn test_attestation_message() { 1332 let attestation = CrossChainAttestation { 1333 alpha_height: 500, 1334 state_root: [42u8; 32], 1335 signatures: vec![vec![1, 2, 3]], 1336 }; 1337 1338 let msg = DeltaMessage::Attestation(attestation.clone()); 1339 let bytes = bincode::serialize(&msg).unwrap(); 1340 let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap(); 1341 1342 match decoded { 1343 DeltaMessage::Attestation(a) => { 1344 assert_eq!(a.alpha_height, 500); 1345 assert_eq!(a.signatures.len(), 1); 1346 } 1347 _ => panic!("Wrong message type"), 1348 } 1349 } 1350 1351 #[test] 1352 fn test_peers_message() { 1353 let peers = vec![ 1354 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 4130), 1355 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 4130), 1356 ]; 1357 1358 let msg = DeltaMessage::Peers(peers.clone()); 1359 let bytes = bincode::serialize(&msg).unwrap(); 1360 let decoded: DeltaMessage = bincode::deserialize(&bytes).unwrap(); 1361 1362 match decoded { 1363 DeltaMessage::Peers(p) => { 1364 assert_eq!(p.len(), 2); 1365 } 1366 _ => panic!("Wrong message type"), 1367 } 1368 } 1369 }